ParamManagerScripts/backend/script_groups/ObtainIOFromProjectTia/x3.py

1052 lines
49 KiB
Python

"""
export_io_from_CAx :
Script que sirve para exraer los IOs de un proyecto de TIA Portal y
generar un archivo Markdown con la información.
"""
import os
import sys
from tkinter import filedialog
import traceback
from lxml import etree as ET
import json
from pathlib import Path
import re
import math # Needed for ceil
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- extract_aml_data function (Unchanged from v15) ---
def extract_aml_data(root):
"""(v15 logic - Unchanged) Extracts data, correcting PLC network association lookup."""
data = {
"plcs": {},
"networks": {},
"devices": {},
"links_by_source": {},
"links_by_target": {},
"connections": [],
}
device_id_to_parent_device = {}
all_elements = root.xpath(".//*[local-name()='InternalElement']")
print(
f"Pass 1: Found {len(all_elements)} InternalElement(s). Populating device dictionary..."
)
# (Pass 1 logic remains unchanged)
for elem in all_elements:
elem_id = elem.get("ID", None)
if not elem_id:
continue
device_info = {
"name": elem.get("Name", "N/A"),
"id": elem_id,
"class": "N/A",
"type_identifier": "N/A",
"order_number": "N/A",
"type_name": "N/A",
"firmware_version": "N/A",
"position": elem.get("PositionNumber", "N/A"),
"attributes": {},
"interfaces": [],
"network_nodes": [],
"io_addresses": [],
"children_ids": [
c.get("ID")
for c in elem.xpath("./*[local-name()='InternalElement']")
if c.get("ID")
],
"parent_id": (
elem.xpath("parent::*[local-name()='InternalElement']/@ID")[0]
if elem.xpath("parent::*[local-name()='InternalElement']")
else None
),
}
class_tag = elem.xpath("./*[local-name()='SystemUnitClass']")
device_info["class"] = (
class_tag[0].get("Path", elem.get("RefBaseSystemUnitPath", "N/A"))
if class_tag
else elem.get("RefBaseSystemUnitPath", "N/A")
)
attributes = elem.xpath("./*[local-name()='Attribute']")
for attr in attributes:
attr_name = attr.get("Name", "")
value_elem = attr.xpath("./*[local-name()='Value']/text()")
attr_value = value_elem[0] if value_elem else ""
device_info["attributes"][attr_name] = attr_value
if attr_name == "TypeIdentifier":
device_info["type_identifier"] = attr_value
if "OrderNumber:" in attr_value:
device_info["order_number"] = attr_value.split("OrderNumber:")[
-1
].strip()
elif attr_name == "TypeName":
device_info["type_name"] = attr_value
elif attr_name == "FirmwareVersion":
device_info["firmware_version"] = attr_value
elif attr_name == "Address":
address_parts = attr.xpath("./*[local-name()='Attribute']")
for part in address_parts:
addr_details = {
"area": part.get("Name", "?"),
"start": "N/A",
"length": "N/A",
"type": "N/A",
}
start_val = part.xpath(
"./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()"
)
len_val = part.xpath(
"./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()"
)
type_val = part.xpath(
"./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()"
)
if start_val:
addr_details["start"] = start_val[0]
if len_val:
addr_details["length"] = len_val[0]
if type_val:
addr_details["type"] = type_val[0]
if addr_details["start"] != "N/A":
device_info["io_addresses"].append(addr_details)
interfaces = elem.xpath("./*[local-name()='ExternalInterface']")
for interface in interfaces:
device_info["interfaces"].append(
{
"name": interface.get("Name", "N/A"),
"id": interface.get("ID", "N/A"),
"ref_base_class": interface.get("RefBaseClassPath", "N/A"),
}
)
network_node_elements = elem.xpath(
"./*[local-name()='InternalElement'][*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]]"
)
if not network_node_elements and elem.xpath(
"./*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]"
):
network_node_elements = [elem]
for node_elem in network_node_elements:
node_id = node_elem.get("ID")
if not node_id:
continue
node_info = {
"id": node_id,
"name": node_elem.get("Name", device_info["name"]),
"type": "N/A",
"address": "N/A",
}
type_attr = node_elem.xpath(
"./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()"
)
addr_attr = node_elem.xpath(
"./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()"
)
if type_attr:
node_info["type"] = type_attr[0]
if addr_attr:
node_info["address"] = addr_attr[0]
if node_info["address"] == "N/A":
parent_addr_attr = elem.xpath(
"./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()"
)
if parent_addr_attr:
node_info["address"] = parent_addr_attr[0]
if node_info["type"] == "N/A":
parent_type_attr = elem.xpath(
"./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()"
)
if parent_type_attr:
node_info["type"] = parent_type_attr[0]
if node_info["address"] != "N/A":
len_attr = node_elem.xpath(
"./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()"
)
node_info["length"] = len_attr[0] if len_attr else "N/A"
device_info["network_nodes"].append(node_info)
device_id_to_parent_device[node_id] = elem_id
data["devices"][elem_id] = device_info
print("Pass 2: Identifying PLCs and Networks (Refined v2)...")
plc_ids_found = set()
elem_map = {elem.get("ID"): elem for elem in all_elements if elem.get("ID")}
for dev_id, device in data["devices"].items():
is_plc = False
plc_order_prefixes = [
"6ES7 516-3FP03",
"6ES7 151",
"6ES7 31",
"6ES7 41",
"6ES7 51",
]
if any(
device.get("order_number", "N/A").startswith(prefix)
for prefix in plc_order_prefixes
):
is_plc = True
elif (
"CPU" in device.get("type_name", "").upper()
or "PLC" in device.get("type_name", "").upper()
):
is_plc = True
if is_plc:
parent_id = device.get("parent_id")
is_child_of_plc = False
current_parent = parent_id
while current_parent:
if current_parent in plc_ids_found:
is_child_of_plc = True
break
current_parent = (
data["devices"].get(current_parent, {}).get("parent_id")
)
if not is_child_of_plc:
if dev_id not in plc_ids_found:
print(
f" Identified PLC: {device['name']} ({dev_id}) - Type: {device.get('type_name', 'N/A')} OrderNo: {device.get('order_number', 'N/A')}"
)
device["connected_networks"] = {}
data["plcs"][dev_id] = device
plc_ids_found.add(dev_id)
is_network = False
net_type = "Unknown"
elem = elem_map.get(dev_id)
if elem is not None:
role_classes = elem.xpath(
"./*[local-name()='SupportedRoleClass']/@RefRoleClassPath"
)
is_subnet_by_role = any("SUBNET" in rc.upper() for rc in role_classes)
if is_subnet_by_role:
is_network = True
for rc in role_classes:
rc_upper = rc.upper()
if "PROFINET" in rc_upper or "ETHERNET" in rc_upper:
net_type = "Ethernet/Profinet"
break
elif "PROFIBUS" in rc_upper:
net_type = "Profibus"
break
if net_type == "Unknown":
if "PROFIBUS" in device["name"].upper():
net_type = "Profibus"
elif (
"ETHERNET" in device["name"].upper()
or "PROFINET" in device["name"].upper()
):
net_type = "Ethernet/Profinet"
if is_network:
if dev_id not in data["networks"]:
print(
f" Identified Network: {device['name']} ({dev_id}) Type: {net_type}"
)
data["networks"][dev_id] = {
"name": device["name"],
"type": net_type,
"devices_on_net": {},
}
print("Pass 3: Processing InternalLinks (Robust Network Mapping & IO)...")
internal_links = root.xpath(".//*[local-name()='InternalLink']")
print(f"Found {len(internal_links)} InternalLink(s).")
conn_id_counter = 0
for link in internal_links:
conn_id_counter += 1
link_name = link.get("Name", f"Link_{conn_id_counter}")
side_a_ref = link.get("RefPartnerSideA", "")
side_b_ref = link.get("RefPartnerSideB", "")
side_a_match = re.match(r"([^:]+):?(.*)", side_a_ref)
side_b_match = re.match(r"([^:]+):?(.*)", side_b_ref)
side_a_id = side_a_match.group(1) if side_a_match else "N/A"
side_a_suffix = (
side_a_match.group(2)
if side_a_match and side_a_match.group(2)
else side_a_id
)
side_b_id = side_b_match.group(1) if side_b_match else "N/A"
side_b_suffix = (
side_b_match.group(2)
if side_b_match and side_b_match.group(2)
else side_b_id
)
network_id, device_node_id = None, None
side_a_is_network = side_a_id in data["networks"]
side_b_is_network = side_b_id in data["networks"]
if side_a_is_network and not side_b_is_network:
network_id, device_node_id = side_a_id, side_b_id
elif side_b_is_network and not side_a_is_network:
network_id, device_node_id = side_b_id, side_a_id
elif side_a_is_network and side_b_is_network:
continue
elif not side_a_is_network and not side_b_is_network:
pass
if network_id and device_node_id:
linked_device_id = device_id_to_parent_device.get(device_node_id)
if not linked_device_id and device_node_id in data["devices"]:
linked_device_id = device_node_id
if linked_device_id and linked_device_id in data["devices"]:
device_info = data["devices"].get(linked_device_id)
if not device_info:
continue
address = "N/A"
node_info_for_addr = data["devices"].get(device_node_id, {})
for node in node_info_for_addr.get("network_nodes", []):
if node.get("id") == device_node_id:
address = node.get("address", "N/A")
break
if address == "N/A":
address = node_info_for_addr.get("attributes", {}).get(
"NetworkAddress", "N/A"
)
if address == "N/A":
address = device_info.get("attributes", {}).get(
"NetworkAddress", "N/A"
)
node_name_for_log = node_info_for_addr.get("name", device_node_id)
print(
f" Mapping Device/Node '{node_name_for_log}' (NodeID:{device_node_id}, Addr:{address}) to Network '{data['networks'][network_id]['name']}'"
)
data["networks"][network_id]["devices_on_net"][device_node_id] = address
potential_plc_id = None
interface_id = None
interface_info = None
node_check_info = data["devices"].get(device_node_id)
if node_check_info:
if device_node_id in data["plcs"]:
potential_plc_id = device_node_id
else:
interface_id = node_check_info.get("parent_id")
if interface_id and interface_id in data["devices"]:
interface_info = data["devices"].get(interface_id)
if interface_info:
if interface_id in data["plcs"]:
potential_plc_id = interface_id
elif (
interface_info.get("parent_id")
and interface_info["parent_id"] in data["plcs"]
):
potential_plc_id = interface_info["parent_id"]
if potential_plc_id:
plc_object = data["plcs"][potential_plc_id]
if "connected_networks" not in plc_object:
plc_object["connected_networks"] = {}
if network_id not in plc_object.get("connected_networks", {}):
print(
f" --> Associating Network '{data['networks'][network_id]['name']}' with PLC '{plc_object.get('name', 'Unknown PLC')}' (via Node '{node_name_for_log}' Addr: {address})"
)
data["plcs"][potential_plc_id]["connected_networks"][
network_id
] = address
elif (
plc_object["connected_networks"][network_id] == "N/A"
and address != "N/A"
):
print(
f" --> Updating address for Network '{data['networks'][network_id]['name']}' on PLC '{plc_object.get('name', 'Unknown PLC')}' to: {address}"
)
data["plcs"][potential_plc_id]["connected_networks"][
network_id
] = address
else:
print(
f" Warning: Could not map linked device ID {linked_device_id} (from NodeID {device_node_id}) to any known device."
)
continue
if not network_id: # Generic links
source_id, source_suffix, target_id, target_suffix = (
side_a_id,
side_a_suffix,
side_b_id,
side_b_suffix,
)
if ("Channel" in side_b_suffix or "Parameter" in side_b_suffix) and (
"Channel" not in side_a_suffix and "Parameter" not in side_a_suffix
):
source_id, source_suffix, target_id, target_suffix = (
side_b_id,
side_b_suffix,
side_a_id,
side_a_suffix,
)
if source_id != "N/A" and target_id != "N/A":
if source_id not in data["links_by_source"]:
data["links_by_source"][source_id] = []
if target_id not in data["links_by_target"]:
data["links_by_target"][target_id] = []
link_detail = {
"name": link_name,
"source_id": source_id,
"source_suffix": source_suffix,
"target_id": target_id,
"target_suffix": target_suffix,
"source_device_name": data["devices"]
.get(source_id, {})
.get("name", source_id),
"target_device_name": data["devices"]
.get(target_id, {})
.get("name", target_id),
}
data["links_by_source"][source_id].append(link_detail)
data["links_by_target"][target_id].append(link_detail)
data["connections"].append(link_detail)
print("Data extraction and structuring complete.")
return data
# --- Helper Function for Recursive IO Search (Unchanged from v20) ---
def find_io_recursively(device_id, project_data):
"""Recursively finds all IO addresses under a given device ID."""
io_list = []
device_info = project_data.get("devices", {}).get(device_id)
if not device_info:
return io_list
if device_info.get("io_addresses"):
for addr in device_info["io_addresses"]:
io_list.append(
{
"module_name": device_info.get("name", device_id),
"module_pos": device_info.get("position", "N/A"),
**addr,
}
)
children_ids = device_info.get("children_ids", [])
for child_id in children_ids:
if child_id != device_id: # Basic loop prevention
io_list.extend(find_io_recursively(child_id, project_data))
return io_list
# --- generate_markdown_tree function (v26 - Final Cleaned Version) ---
def generate_markdown_tree(project_data, md_file_path):
"""(v26) Generates final hierarchical Markdown with aesthetic improvements."""
markdown_lines = ["# Project Hardware & IO Summary (Tree View v26)", ""]
if not project_data or not project_data.get("plcs"):
markdown_lines.append("*No PLC identified in the project data.*")
try:
with open(md_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(markdown_lines))
print(f"\nMarkdown summary written to: {md_file_path}")
except Exception as e:
print(f"ERROR writing Markdown file {md_file_path}: {e}")
return
markdown_lines.append(f"Identified {len(project_data['plcs'])} PLC(s).")
for plc_id, plc_info in project_data.get("plcs", {}).items():
markdown_lines.append(f"\n## PLC: {plc_info.get('name', plc_id)}")
type_name = plc_info.get("type_name", "N/A")
order_num = plc_info.get("order_number", "N/A")
firmware = plc_info.get("firmware_version", "N/A")
if type_name and type_name != "N/A":
markdown_lines.append(f"- **Type Name:** `{type_name}`")
if order_num and order_num != "N/A":
markdown_lines.append(f"- **Order Number:** `{order_num}`")
if firmware and firmware != "N/A":
markdown_lines.append(f"- **Firmware:** `{firmware}`")
# ID removed
plc_networks = plc_info.get("connected_networks", {})
markdown_lines.append("\n- **Networks:**")
if not plc_networks:
markdown_lines.append(
" - *No network connections found associated with this PLC object.*"
)
else:
sorted_network_items = sorted(
plc_networks.items(),
key=lambda item: project_data.get("networks", {})
.get(item[0], {})
.get("name", item[0]),
)
for net_id, plc_addr_on_net in sorted_network_items:
net_info = project_data.get("networks", {}).get(net_id)
if not net_info:
markdown_lines.append(
f" - !!! Error: Network info missing for ID {net_id} !!!"
)
continue
markdown_lines.append(
f" - ### {net_info.get('name', net_id)} ({net_info.get('type', 'Unknown')})"
)
markdown_lines.append(
f" - PLC Address on this Net: `{plc_addr_on_net}`"
)
markdown_lines.append(f" - **Devices on Network:**")
devices_on_this_net = net_info.get("devices_on_net", {})
def sort_key(item):
node_id, node_addr = item
try:
parts = [int(p) for p in re.findall(r"\d+", node_addr)]
return parts
except:
return [float("inf")]
plc_interface_and_node_ids = set()
for node in plc_info.get("network_nodes", []):
plc_interface_and_node_ids.add(node["id"])
interface_id = (
project_data["devices"].get(node["id"], {}).get("parent_id")
)
if interface_id:
plc_interface_and_node_ids.add(interface_id)
plc_interface_and_node_ids.add(plc_id)
other_device_items = sorted(
[
(node_id, node_addr)
for node_id, node_addr in devices_on_this_net.items()
if node_id not in plc_interface_and_node_ids
],
key=sort_key,
)
if not other_device_items:
markdown_lines.append(" - *None (besides PLC interfaces)*")
else:
# --- Display Logic with Sibling IO Aggregation & Aesthetics ---
for node_id, node_addr in other_device_items:
node_info = project_data.get("devices", {}).get(node_id)
if not node_info:
markdown_lines.append(
f" - !!! Error: Node info missing for ID {node_id} Addr: {node_addr} !!!"
)
continue
interface_id = node_info.get("parent_id")
interface_info = None
actual_device_id = None
actual_device_info = None
rack_id = None
rack_info = None
if interface_id:
interface_info = project_data.get("devices", {}).get(
interface_id
)
if interface_info:
actual_device_id = interface_info.get("parent_id")
if actual_device_id:
actual_device_info = project_data.get(
"devices", {}
).get(actual_device_id)
if actual_device_info:
potential_rack_id = actual_device_info.get(
"parent_id"
)
if potential_rack_id:
potential_rack_info = project_data.get(
"devices", {}
).get(potential_rack_id)
if potential_rack_info and (
"Rack"
in potential_rack_info.get("name", "")
or potential_rack_info.get("position")
is None
):
rack_id = potential_rack_id
rack_info = potential_rack_info
display_info_title = (
actual_device_info
if actual_device_info
else (interface_info if interface_info else node_info)
)
display_id_title = (
actual_device_id
if actual_device_info
else (interface_id if interface_info else node_id)
)
io_search_root_id = (
actual_device_id
if actual_device_info
else (interface_id if interface_info else node_id)
)
io_search_root_info = project_data.get("devices", {}).get(
io_search_root_id
)
# Construct Title
display_name = display_info_title.get("name", display_id_title)
via_node_name = node_info.get("name", node_id)
title_str = f"#### {display_name}"
if display_id_title != node_id:
title_str += f" (via {via_node_name} @ `{node_addr}`)"
else:
title_str += f" (@ `{node_addr}`)"
markdown_lines.append(f" - {title_str}")
# Display Basic Details
markdown_lines.append(
f" - Address (on net): `{node_addr}`"
)
type_name_disp = display_info_title.get("type_name", "N/A")
order_num_disp = display_info_title.get("order_number", "N/A")
pos_disp = display_info_title.get("position", "N/A")
if type_name_disp and type_name_disp != "N/A":
markdown_lines.append(
f" - Type Name: `{type_name_disp}`"
)
if order_num_disp and order_num_disp != "N/A":
markdown_lines.append(
f" - Order No: `{order_num_disp}`"
)
if pos_disp and pos_disp != "N/A":
markdown_lines.append(
f" - Pos (in parent): `{pos_disp}`"
)
ultimate_parent_id = rack_id
if not ultimate_parent_id and actual_device_info:
ultimate_parent_id = actual_device_info.get("parent_id")
if (
ultimate_parent_id
and ultimate_parent_id != display_id_title
):
ultimate_parent_info = project_data.get("devices", {}).get(
ultimate_parent_id
)
ultimate_parent_name = (
ultimate_parent_info.get("name", "?")
if ultimate_parent_info
else "?"
)
markdown_lines.append(
f" - Parent Structure: `{ultimate_parent_name}`"
) # Removed ID here
# --- IO Aggregation Logic (from v24) ---
aggregated_io_addresses = []
parent_structure_id = (
io_search_root_info.get("parent_id")
if io_search_root_info
else None
)
io_search_root_name_disp = (
io_search_root_info.get("name", "?")
if io_search_root_info
else "?"
)
if parent_structure_id:
parent_structure_info = project_data.get("devices", {}).get(
parent_structure_id
)
parent_structure_name = (
parent_structure_info.get("name", "?")
if parent_structure_info
else "?"
)
search_title = f"parent '{parent_structure_name}'"
sibling_found_io = False
for dev_scan_id, dev_scan_info in project_data.get(
"devices", {}
).items():
if (
dev_scan_info.get("parent_id")
== parent_structure_id
):
io_from_sibling = find_io_recursively(
dev_scan_id, project_data
)
if io_from_sibling:
aggregated_io_addresses.extend(io_from_sibling)
sibling_found_io = True
if (
not sibling_found_io and not aggregated_io_addresses
): # Only show message if list still empty
markdown_lines.append(
f" - *No IO Addresses found in modules under {search_title} (ID: {parent_structure_id}).*"
)
elif io_search_root_id:
search_title = f"'{io_search_root_name_disp}'"
aggregated_io_addresses = find_io_recursively(
io_search_root_id, project_data
)
if not aggregated_io_addresses:
markdown_lines.append(
f" - *No IO Addresses found in modules under {search_title} (ID: {io_search_root_id}).*"
)
else:
markdown_lines.append(
f" - *Could not determine structure to search for IO addresses.*"
)
# --- End IO Aggregation ---
# Display aggregated IO Addresses with Siemens format (Cleaned)
if aggregated_io_addresses:
markdown_lines.append(
f" - **IO Addresses (Aggregated from Structure):**"
) # Removed redundant search root name
sorted_agg_io = sorted(
aggregated_io_addresses,
key=lambda x: (
(
int(x.get("module_pos", "9999"))
if x.get("module_pos", "9999").isdigit()
else 9999
),
x.get("module_name", ""),
x.get("type", ""),
(
int(x.get("start", "0"))
if x.get("start", "0").isdigit()
else float("inf")
),
),
)
last_module_id_key = None
for addr_info in sorted_agg_io:
current_module_id_key = (
addr_info.get("module_name", "?"),
addr_info.get("module_pos", "?"),
)
if current_module_id_key != last_module_id_key:
markdown_lines.append(
f" - **From Module:** {addr_info.get('module_name','?')} (Pos: {addr_info.get('module_pos','?')})"
)
last_module_id_key = current_module_id_key
# --- Siemens IO Formatting (from v25.1 - keep fixes) ---
io_type = addr_info.get("type", "?")
start_str = addr_info.get("start", "?")
length_str = addr_info.get("length", "?")
area_str = addr_info.get("area", "?")
siemens_addr = f"FMT_ERROR" # Default error
length_bits = 0
try:
start_byte = int(start_str)
length_bits = int(length_str)
length_bytes = math.ceil(
length_bits / 8.0
) # Use float division
if length_bits > 0 and length_bytes == 0:
length_bytes = 1 # Handle len < 8 bits
end_byte = start_byte + length_bytes - 1
prefix = "P?"
if io_type.lower() == "input":
prefix = "PE"
elif io_type.lower() == "output":
prefix = "PA"
siemens_addr = f"{prefix} {start_byte}..{end_byte}"
except Exception: # Catch any error during calc/format
siemens_addr = (
f"FMT_ERROR({start_str},{length_str})"
)
markdown_lines.append(
f" - `{siemens_addr}` (Len={length_bits} bits)" # Simplified output
)
# --- End Siemens IO Formatting ---
# IO Connections logic remains the same...
links_from = project_data.get("links_by_source", {}).get(
display_id_title, []
)
links_to = project_data.get("links_by_target", {}).get(
display_id_title, []
)
io_conns = []
for link in links_from:
if "channel" in link["source_suffix"].lower():
target_str = f"{link.get('target_device_name', link['target_id'])}:{link['target_suffix']}"
if link["target_id"] == display_id_title:
target_str = link["target_suffix"]
io_conns.append(
f"`{link['source_suffix']}` → `{target_str}`"
)
for link in links_to:
if "channel" in link["target_suffix"].lower():
source_str = f"{link.get('source_device_name', link['source_id'])}:{link['source_suffix']}"
if link["source_id"] == display_id_title:
source_str = link["source_suffix"]
io_conns.append(
f"`{source_str}` → `{link['target_suffix']}`"
)
if io_conns:
markdown_lines.append(
f" - **IO Connections (Channels):**"
)
for conn in sorted(list(set(io_conns))):
markdown_lines.append(f" - {conn}")
markdown_lines.append("") # Spacing
# --- *** END Display Logic *** ---
try:
with open(md_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(markdown_lines))
print(f"\nMarkdown summary written to: {md_file_path}")
except Exception as e:
print(f"ERROR writing Markdown file {md_file_path}: {e}")
traceback.print_exc()
# --- generate_io_upward_tree function (Unchanged from v23) ---
def generate_io_upward_tree(project_data, md_file_path):
"""(v23) Generates a debug tree starting from IO addresses upwards."""
markdown_lines = ["# IO Address Upward Connection Trace (Debug v23)", ""]
if not project_data or not project_data.get("devices"):
markdown_lines.append("*No device data found.*")
try:
with open(md_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(markdown_lines))
print(f"\nIO upward debug tree written to: {md_file_path}")
except Exception as e:
print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}")
return
node_to_network_map = {}
for net_id, net_info in project_data.get("networks", {}).items():
net_name = net_info.get("name", "?")
for node_id, node_addr in net_info.get("devices_on_net", {}).items():
node_to_network_map[node_id] = (net_id, net_name, node_addr)
devices_with_io = []
for dev_id, dev_info in project_data.get("devices", {}).items():
if dev_info.get("io_addresses"):
devices_with_io.append((dev_id, dev_info))
if not devices_with_io:
markdown_lines.append("*No devices with defined IO Addresses found.*")
else:
markdown_lines.append(
f"Found {len(devices_with_io)} device(s)/module(s) with IO addresses. Tracing connections upwards:\n"
)
devices_with_io.sort(
key=lambda item: (
(
int(item[1].get("position", "9999"))
if item[1].get("position", "9999").isdigit()
else 9999
),
item[1].get("name", ""),
)
)
for dev_id, dev_info in devices_with_io:
markdown_lines.append(
f"## IO Module: {dev_info.get('name', dev_id)} (ID: {dev_id})"
)
markdown_lines.append(f"- Position: {dev_info.get('position', 'N/A')}")
markdown_lines.append("- IO Addresses:")
for addr in sorted(
dev_info["io_addresses"],
key=lambda x: (
x.get("type", ""),
(
int(x.get("start", "0"))
if x.get("start", "0").isdigit()
else float("inf")
),
),
):
markdown_lines.append(
f" - `{addr.get('type','?').ljust(6)} Start={addr.get('start','?').ljust(4)} Len={addr.get('length','?').ljust(3)}` (Area: {addr.get('area','?')})"
)
markdown_lines.append("- Upward Path:")
current_id = dev_id
current_info = dev_info
indent = " "
path_found = False
ancestor_limit = 15
count = 0
while current_id and count < ancestor_limit:
ancestor_name = current_info.get("name", "?") if current_info else "?"
ancestor_pos = (
current_info.get("position", "N/A") if current_info else "N/A"
)
markdown_lines.append(
f"{indent}└─ {ancestor_name} (ID: {current_id}, Pos: {ancestor_pos})"
)
if current_id in node_to_network_map:
net_id, net_name, node_addr = node_to_network_map[current_id]
markdown_lines.append(f"{indent} └─ **Network Connection Point**")
markdown_lines.append(
f"{indent} - Node: {ancestor_name} (ID: {current_id})"
)
markdown_lines.append(
f"{indent} - Network: {net_name} (ID: {net_id})"
)
markdown_lines.append(f"{indent} - Address: `{node_addr}`")
plc_connection_found = False
for plc_id_check, plc_info_check in project_data.get(
"plcs", {}
).items():
if net_id in plc_info_check.get("connected_networks", {}):
markdown_lines.append(
f"{indent} - **Network associated with PLC:** {plc_info_check.get('name','?')} (ID: {plc_id_check})"
)
plc_connection_found = True
break
if not plc_connection_found:
markdown_lines.append(
f"{indent} - *Network not directly associated with a known PLC in data.*"
)
path_found = True
break
if current_id in project_data.get("plcs", {}):
markdown_lines.append(f"{indent} └─ **Is PLC:** {ancestor_name}")
path_found = True
break
parent_id = current_info.get("parent_id") if current_info else None
if parent_id:
current_info = project_data.get("devices", {}).get(parent_id)
if not current_info:
markdown_lines.append(
f"{indent} └─ Parent ID {parent_id} not found. Stopping trace."
)
break
current_id = parent_id
indent += " "
else:
markdown_lines.append(
f"{indent} └─ Reached top level (no parent)."
)
break
count += 1
if count >= ancestor_limit:
markdown_lines.append(
f"{indent} └─ Reached ancestor limit. Stopping trace."
)
if not path_found:
markdown_lines.append(
f"{indent}└─ *Could not trace path to a known Network Node or PLC.*"
)
markdown_lines.append("")
try:
with open(md_file_path, "w", encoding="utf-8") as f:
f.write("\n".join(markdown_lines))
print(f"\nIO upward debug tree written to: {md_file_path}")
except Exception as e:
print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}")
# --- process_aml_file function (unchanged from v22) ---
def process_aml_file(
aml_file_path, json_output_path, md_output_path, md_upward_output_path
):
# (Unchanged)
print(f"Processing AML file: {aml_file_path}")
if not os.path.exists(aml_file_path):
print(f"ERROR: Input AML file not found at {aml_file_path}")
return
try:
parser = ET.XMLParser(remove_blank_text=True, huge_tree=True)
tree = ET.parse(aml_file_path, parser)
root = tree.getroot()
project_data = extract_aml_data(root) # v15 extraction
print(f"Generating JSON output: {json_output_path}")
try:
with open(json_output_path, "w", encoding="utf-8") as f:
json.dump(project_data, f, indent=4, default=str)
print(f"JSON data written successfully.")
except Exception as e:
print(f"ERROR writing JSON file {json_output_path}: {e}")
traceback.print_exc()
generate_markdown_tree(project_data, md_output_path) # v26 MD generation
generate_io_upward_tree(
project_data, md_upward_output_path
) # v23 upward generation
except ET.LxmlError as xml_err:
print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}")
traceback.print_exc()
except Exception as e:
print(f"ERROR processing AML file {aml_file_path}: {e}")
traceback.print_exc()
def select_cax_file(initial_dir=None): # Add initial_dir parameter
"""Opens a dialog to select a CAx (XML) export file, starting in the specified directory."""
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename(
title="Select CAx Export File (XML)",
filetypes=[("XML Files", "*.xml"), ("AML Files", "*.aml"), ("All Files", "*.*")], # Added AML
initialdir=initial_dir # Set the initial directory
)
root.destroy()
if not file_path:
print("No CAx file selected. Exiting.")
sys.exit(0)
return file_path
def select_output_directory():
"""Opens a dialog to select the output directory."""
root = tk.Tk()
root.withdraw()
dir_path = filedialog.askdirectory(
title="Select Output Directory for JSON and MD files" # Updated title slightly
)
root.destroy()
if not dir_path:
print("No output directory selected. Exiting.")
sys.exit(0)
return dir_path
# --- Main Execution ---
if __name__ == "__main__":
configs = load_configuration()
working_directory = configs.get("working_directory")
script_version = "v28 - Working Directory Integration" # Updated version
print(
f"--- AML (CAx Export) to Hierarchical JSON and Obsidian MD Converter ({script_version}) ---"
)
# Validate working directory
if not working_directory or not os.path.isdir(working_directory):
print("ERROR: Working directory not set or invalid in configuration.")
print("Attempting to use script's directory as fallback.")
# Fallback to script's directory or current directory if needed
working_directory = os.path.dirname(os.path.abspath(__file__))
if not os.path.isdir(working_directory):
working_directory = os.getcwd()
print(f"Using fallback directory: {working_directory}")
# Optionally, prompt user to select a working directory here if critical
# output_dir = select_output_directory() # Keep this if you want user selection on failure
# Use working_directory as the output directory
output_dir = working_directory
print(f"Using Working Directory for Output: {output_dir}")
# 1. Select Input CAx File, starting in the working directory
# Pass working_directory to the selection function
cax_file_path = select_cax_file(initial_dir=working_directory)
# Convert paths to Path objects
input_path = Path(cax_file_path)
output_path = Path(output_dir) # Output path is the working directory
# Check if input file exists
if not input_path.is_file():
print(f"ERROR: Input file '{input_path}' not found or is not a file.")
sys.exit(1)
# Ensure output directory exists (redundant if working_directory is valid, but safe)
output_path.mkdir(parents=True, exist_ok=True)
# Construct output file paths within the selected output directory (working_directory)
output_json_file = output_path / input_path.with_suffix(".hierarchical.json").name
output_md_file = output_path / input_path.with_name(f"{input_path.stem}_Hardware_Tree.md") # Simplified name
output_md_upward_file = output_path / input_path.with_name(f"{input_path.stem}_IO_Upward_Debug.md") # Simplified name
print(f"Input AML: {input_path.resolve()}")
print(f"Output Directory: {output_path.resolve()}")
print(f"Output JSON: {output_json_file.resolve()}")
print(f"Output Main Tree MD: {output_md_file.resolve()}")
print(f"Output IO Debug Tree MD: {output_md_upward_file.resolve()}")
# Process the selected file and save outputs to the selected directory
process_aml_file(
str(input_path),
str(output_json_file),
str(output_md_file),
str(output_md_upward_file),
)
print("\nScript finished.")