""" export_io_from_CAx : Script que sirve para exraer los IOs de un proyecto de TIA Portal y generar un archivo Markdown con la información. """ import os import sys import tkinter as tk from tkinter import filedialog import traceback from lxml import etree as ET import json from pathlib import Path import re import math # Needed for ceil script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration # --- extract_aml_data function (Unchanged from v15) --- def extract_aml_data(root): """(v15 logic - Unchanged) Extracts data, correcting PLC network association lookup.""" data = { "plcs": {}, "networks": {}, "devices": {}, "links_by_source": {}, "links_by_target": {}, "connections": [], } device_id_to_parent_device = {} all_elements = root.xpath(".//*[local-name()='InternalElement']") print( f"Pass 1: Found {len(all_elements)} InternalElement(s). Populating device dictionary..." ) # (Pass 1 logic remains unchanged) for elem in all_elements: elem_id = elem.get("ID", None) if not elem_id: continue device_info = { "name": elem.get("Name", "N/A"), "id": elem_id, "class": "N/A", "type_identifier": "N/A", "order_number": "N/A", "type_name": "N/A", "firmware_version": "N/A", "position": elem.get("PositionNumber", "N/A"), "attributes": {}, "interfaces": [], "network_nodes": [], "io_addresses": [], "children_ids": [ c.get("ID") for c in elem.xpath("./*[local-name()='InternalElement']") if c.get("ID") ], "parent_id": ( elem.xpath("parent::*[local-name()='InternalElement']/@ID")[0] if elem.xpath("parent::*[local-name()='InternalElement']") else None ), } class_tag = elem.xpath("./*[local-name()='SystemUnitClass']") device_info["class"] = ( class_tag[0].get("Path", elem.get("RefBaseSystemUnitPath", "N/A")) if class_tag else elem.get("RefBaseSystemUnitPath", "N/A") ) attributes = elem.xpath("./*[local-name()='Attribute']") for attr in attributes: attr_name = attr.get("Name", "") value_elem = attr.xpath("./*[local-name()='Value']/text()") attr_value = value_elem[0] if value_elem else "" device_info["attributes"][attr_name] = attr_value if attr_name == "TypeIdentifier": device_info["type_identifier"] = attr_value if "OrderNumber:" in attr_value: device_info["order_number"] = attr_value.split("OrderNumber:")[ -1 ].strip() elif attr_name == "TypeName": device_info["type_name"] = attr_value elif attr_name == "FirmwareVersion": device_info["firmware_version"] = attr_value elif attr_name == "Address": address_parts = attr.xpath("./*[local-name()='Attribute']") for part in address_parts: addr_details = { "area": part.get("Name", "?"), "start": "N/A", "length": "N/A", "type": "N/A", } start_val = part.xpath( "./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()" ) len_val = part.xpath( "./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()" ) type_val = part.xpath( "./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()" ) if start_val: addr_details["start"] = start_val[0] if len_val: addr_details["length"] = len_val[0] if type_val: addr_details["type"] = type_val[0] if addr_details["start"] != "N/A": device_info["io_addresses"].append(addr_details) interfaces = elem.xpath("./*[local-name()='ExternalInterface']") for interface in interfaces: device_info["interfaces"].append( { "name": interface.get("Name", "N/A"), "id": interface.get("ID", "N/A"), "ref_base_class": interface.get("RefBaseClassPath", "N/A"), } ) network_node_elements = elem.xpath( "./*[local-name()='InternalElement'][*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]]" ) if not network_node_elements and elem.xpath( "./*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]" ): network_node_elements = [elem] for node_elem in network_node_elements: node_id = node_elem.get("ID") if not node_id: continue node_info = { "id": node_id, "name": node_elem.get("Name", device_info["name"]), "type": "N/A", "address": "N/A", } type_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()" ) addr_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()" ) if type_attr: node_info["type"] = type_attr[0] if addr_attr: node_info["address"] = addr_attr[0] if node_info["address"] == "N/A": parent_addr_attr = elem.xpath( "./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()" ) if parent_addr_attr: node_info["address"] = parent_addr_attr[0] if node_info["type"] == "N/A": parent_type_attr = elem.xpath( "./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()" ) if parent_type_attr: node_info["type"] = parent_type_attr[0] if node_info["address"] != "N/A": len_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()" ) node_info["length"] = len_attr[0] if len_attr else "N/A" device_info["network_nodes"].append(node_info) device_id_to_parent_device[node_id] = elem_id data["devices"][elem_id] = device_info print("Pass 2: Identifying PLCs and Networks (Refined v2)...") plc_ids_found = set() elem_map = {elem.get("ID"): elem for elem in all_elements if elem.get("ID")} for dev_id, device in data["devices"].items(): is_plc = False plc_order_prefixes = [ "6ES7 516-3FP03", "6ES7 151", "6ES7 31", "6ES7 41", "6ES7 51", ] if any( device.get("order_number", "N/A").startswith(prefix) for prefix in plc_order_prefixes ): is_plc = True elif ( "CPU" in device.get("type_name", "").upper() or "PLC" in device.get("type_name", "").upper() ): is_plc = True if is_plc: parent_id = device.get("parent_id") is_child_of_plc = False current_parent = parent_id while current_parent: if current_parent in plc_ids_found: is_child_of_plc = True break current_parent = ( data["devices"].get(current_parent, {}).get("parent_id") ) if not is_child_of_plc: if dev_id not in plc_ids_found: print( f" Identified PLC: {device['name']} ({dev_id}) - Type: {device.get('type_name', 'N/A')} OrderNo: {device.get('order_number', 'N/A')}" ) device["connected_networks"] = {} data["plcs"][dev_id] = device plc_ids_found.add(dev_id) is_network = False net_type = "Unknown" elem = elem_map.get(dev_id) if elem is not None: role_classes = elem.xpath( "./*[local-name()='SupportedRoleClass']/@RefRoleClassPath" ) is_subnet_by_role = any("SUBNET" in rc.upper() for rc in role_classes) if is_subnet_by_role: is_network = True for rc in role_classes: rc_upper = rc.upper() if "PROFINET" in rc_upper or "ETHERNET" in rc_upper: net_type = "Ethernet/Profinet" break elif "PROFIBUS" in rc_upper: net_type = "Profibus" break if net_type == "Unknown": if "PROFIBUS" in device["name"].upper(): net_type = "Profibus" elif ( "ETHERNET" in device["name"].upper() or "PROFINET" in device["name"].upper() ): net_type = "Ethernet/Profinet" if is_network: if dev_id not in data["networks"]: print( f" Identified Network: {device['name']} ({dev_id}) Type: {net_type}" ) data["networks"][dev_id] = { "name": device["name"], "type": net_type, "devices_on_net": {}, } print("Pass 3: Processing InternalLinks (Robust Network Mapping & IO)...") internal_links = root.xpath(".//*[local-name()='InternalLink']") print(f"Found {len(internal_links)} InternalLink(s).") conn_id_counter = 0 for link in internal_links: conn_id_counter += 1 link_name = link.get("Name", f"Link_{conn_id_counter}") side_a_ref = link.get("RefPartnerSideA", "") side_b_ref = link.get("RefPartnerSideB", "") side_a_match = re.match(r"([^:]+):?(.*)", side_a_ref) side_b_match = re.match(r"([^:]+):?(.*)", side_b_ref) side_a_id = side_a_match.group(1) if side_a_match else "N/A" side_a_suffix = ( side_a_match.group(2) if side_a_match and side_a_match.group(2) else side_a_id ) side_b_id = side_b_match.group(1) if side_b_match else "N/A" side_b_suffix = ( side_b_match.group(2) if side_b_match and side_b_match.group(2) else side_b_id ) network_id, device_node_id = None, None side_a_is_network = side_a_id in data["networks"] side_b_is_network = side_b_id in data["networks"] if side_a_is_network and not side_b_is_network: network_id, device_node_id = side_a_id, side_b_id elif side_b_is_network and not side_a_is_network: network_id, device_node_id = side_b_id, side_a_id elif side_a_is_network and side_b_is_network: continue elif not side_a_is_network and not side_b_is_network: pass if network_id and device_node_id: linked_device_id = device_id_to_parent_device.get(device_node_id) if not linked_device_id and device_node_id in data["devices"]: linked_device_id = device_node_id if linked_device_id and linked_device_id in data["devices"]: device_info = data["devices"].get(linked_device_id) if not device_info: continue address = "N/A" node_info_for_addr = data["devices"].get(device_node_id, {}) for node in node_info_for_addr.get("network_nodes", []): if node.get("id") == device_node_id: address = node.get("address", "N/A") break if address == "N/A": address = node_info_for_addr.get("attributes", {}).get( "NetworkAddress", "N/A" ) if address == "N/A": address = device_info.get("attributes", {}).get( "NetworkAddress", "N/A" ) node_name_for_log = node_info_for_addr.get("name", device_node_id) print( f" Mapping Device/Node '{node_name_for_log}' (NodeID:{device_node_id}, Addr:{address}) to Network '{data['networks'][network_id]['name']}'" ) data["networks"][network_id]["devices_on_net"][device_node_id] = address potential_plc_id = None interface_id = None interface_info = None node_check_info = data["devices"].get(device_node_id) if node_check_info: if device_node_id in data["plcs"]: potential_plc_id = device_node_id else: interface_id = node_check_info.get("parent_id") if interface_id and interface_id in data["devices"]: interface_info = data["devices"].get(interface_id) if interface_info: if interface_id in data["plcs"]: potential_plc_id = interface_id elif ( interface_info.get("parent_id") and interface_info["parent_id"] in data["plcs"] ): potential_plc_id = interface_info["parent_id"] if potential_plc_id: plc_object = data["plcs"][potential_plc_id] if "connected_networks" not in plc_object: plc_object["connected_networks"] = {} if network_id not in plc_object.get("connected_networks", {}): print( f" --> Associating Network '{data['networks'][network_id]['name']}' with PLC '{plc_object.get('name', 'Unknown PLC')}' (via Node '{node_name_for_log}' Addr: {address})" ) data["plcs"][potential_plc_id]["connected_networks"][ network_id ] = address elif ( plc_object["connected_networks"][network_id] == "N/A" and address != "N/A" ): print( f" --> Updating address for Network '{data['networks'][network_id]['name']}' on PLC '{plc_object.get('name', 'Unknown PLC')}' to: {address}" ) data["plcs"][potential_plc_id]["connected_networks"][ network_id ] = address else: print( f" Warning: Could not map linked device ID {linked_device_id} (from NodeID {device_node_id}) to any known device." ) continue if not network_id: # Generic links source_id, source_suffix, target_id, target_suffix = ( side_a_id, side_a_suffix, side_b_id, side_b_suffix, ) if ("Channel" in side_b_suffix or "Parameter" in side_b_suffix) and ( "Channel" not in side_a_suffix and "Parameter" not in side_a_suffix ): source_id, source_suffix, target_id, target_suffix = ( side_b_id, side_b_suffix, side_a_id, side_a_suffix, ) if source_id != "N/A" and target_id != "N/A": if source_id not in data["links_by_source"]: data["links_by_source"][source_id] = [] if target_id not in data["links_by_target"]: data["links_by_target"][target_id] = [] link_detail = { "name": link_name, "source_id": source_id, "source_suffix": source_suffix, "target_id": target_id, "target_suffix": target_suffix, "source_device_name": data["devices"] .get(source_id, {}) .get("name", source_id), "target_device_name": data["devices"] .get(target_id, {}) .get("name", target_id), } data["links_by_source"][source_id].append(link_detail) data["links_by_target"][target_id].append(link_detail) data["connections"].append(link_detail) print("Data extraction and structuring complete.") return data # --- Helper Function for Recursive IO Search (Unchanged from v20) --- def find_io_recursively(device_id, project_data): """Recursively finds all IO addresses under a given device ID.""" io_list = [] device_info = project_data.get("devices", {}).get(device_id) if not device_info: return io_list if device_info.get("io_addresses"): for addr in device_info["io_addresses"]: io_list.append( { "module_name": device_info.get("name", device_id), "module_pos": device_info.get("position", "N/A"), **addr, } ) children_ids = device_info.get("children_ids", []) for child_id in children_ids: if child_id != device_id: # Basic loop prevention io_list.extend(find_io_recursively(child_id, project_data)) return io_list # --- generate_markdown_tree function (v26 - Final Cleaned Version) --- def generate_markdown_tree(project_data, md_file_path): """(v26) Generates final hierarchical Markdown with aesthetic improvements.""" markdown_lines = ["# Project Hardware & IO Summary (Tree View v26)", ""] if not project_data or not project_data.get("plcs"): markdown_lines.append("*No PLC identified in the project data.*") try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"\nMarkdown summary written to: {md_file_path}") except Exception as e: print(f"ERROR writing Markdown file {md_file_path}: {e}") return markdown_lines.append(f"Identified {len(project_data['plcs'])} PLC(s).") for plc_id, plc_info in project_data.get("plcs", {}).items(): markdown_lines.append(f"\n## PLC: {plc_info.get('name', plc_id)}") type_name = plc_info.get("type_name", "N/A") order_num = plc_info.get("order_number", "N/A") firmware = plc_info.get("firmware_version", "N/A") if type_name and type_name != "N/A": markdown_lines.append(f"- **Type Name:** `{type_name}`") if order_num and order_num != "N/A": markdown_lines.append(f"- **Order Number:** `{order_num}`") if firmware and firmware != "N/A": markdown_lines.append(f"- **Firmware:** `{firmware}`") # ID removed plc_networks = plc_info.get("connected_networks", {}) markdown_lines.append("\n- **Networks:**") if not plc_networks: markdown_lines.append( " - *No network connections found associated with this PLC object.*" ) else: sorted_network_items = sorted( plc_networks.items(), key=lambda item: project_data.get("networks", {}) .get(item[0], {}) .get("name", item[0]), ) for net_id, plc_addr_on_net in sorted_network_items: net_info = project_data.get("networks", {}).get(net_id) if not net_info: markdown_lines.append( f" - !!! Error: Network info missing for ID {net_id} !!!" ) continue markdown_lines.append( f" - ### {net_info.get('name', net_id)} ({net_info.get('type', 'Unknown')})" ) markdown_lines.append( f" - PLC Address on this Net: `{plc_addr_on_net}`" ) markdown_lines.append(f" - **Devices on Network:**") devices_on_this_net = net_info.get("devices_on_net", {}) def sort_key(item): node_id, node_addr = item try: parts = [int(p) for p in re.findall(r"\d+", node_addr)] return parts except: return [float("inf")] plc_interface_and_node_ids = set() for node in plc_info.get("network_nodes", []): plc_interface_and_node_ids.add(node["id"]) interface_id = ( project_data["devices"].get(node["id"], {}).get("parent_id") ) if interface_id: plc_interface_and_node_ids.add(interface_id) plc_interface_and_node_ids.add(plc_id) other_device_items = sorted( [ (node_id, node_addr) for node_id, node_addr in devices_on_this_net.items() if node_id not in plc_interface_and_node_ids ], key=sort_key, ) if not other_device_items: markdown_lines.append(" - *None (besides PLC interfaces)*") else: # --- Display Logic with Sibling IO Aggregation & Aesthetics --- for node_id, node_addr in other_device_items: node_info = project_data.get("devices", {}).get(node_id) if not node_info: markdown_lines.append( f" - !!! Error: Node info missing for ID {node_id} Addr: {node_addr} !!!" ) continue interface_id = node_info.get("parent_id") interface_info = None actual_device_id = None actual_device_info = None rack_id = None rack_info = None if interface_id: interface_info = project_data.get("devices", {}).get( interface_id ) if interface_info: actual_device_id = interface_info.get("parent_id") if actual_device_id: actual_device_info = project_data.get( "devices", {} ).get(actual_device_id) if actual_device_info: potential_rack_id = actual_device_info.get( "parent_id" ) if potential_rack_id: potential_rack_info = project_data.get( "devices", {} ).get(potential_rack_id) if potential_rack_info and ( "Rack" in potential_rack_info.get("name", "") or potential_rack_info.get("position") is None ): rack_id = potential_rack_id rack_info = potential_rack_info display_info_title = ( actual_device_info if actual_device_info else (interface_info if interface_info else node_info) ) display_id_title = ( actual_device_id if actual_device_info else (interface_id if interface_info else node_id) ) io_search_root_id = ( actual_device_id if actual_device_info else (interface_id if interface_info else node_id) ) io_search_root_info = project_data.get("devices", {}).get( io_search_root_id ) # Construct Title display_name = display_info_title.get("name", display_id_title) via_node_name = node_info.get("name", node_id) title_str = f"#### {display_name}" if display_id_title != node_id: title_str += f" (via {via_node_name} @ `{node_addr}`)" else: title_str += f" (@ `{node_addr}`)" markdown_lines.append(f" - {title_str}") # Display Basic Details markdown_lines.append( f" - Address (on net): `{node_addr}`" ) type_name_disp = display_info_title.get("type_name", "N/A") order_num_disp = display_info_title.get("order_number", "N/A") pos_disp = display_info_title.get("position", "N/A") if type_name_disp and type_name_disp != "N/A": markdown_lines.append( f" - Type Name: `{type_name_disp}`" ) if order_num_disp and order_num_disp != "N/A": markdown_lines.append( f" - Order No: `{order_num_disp}`" ) if pos_disp and pos_disp != "N/A": markdown_lines.append( f" - Pos (in parent): `{pos_disp}`" ) ultimate_parent_id = rack_id if not ultimate_parent_id and actual_device_info: ultimate_parent_id = actual_device_info.get("parent_id") if ( ultimate_parent_id and ultimate_parent_id != display_id_title ): ultimate_parent_info = project_data.get("devices", {}).get( ultimate_parent_id ) ultimate_parent_name = ( ultimate_parent_info.get("name", "?") if ultimate_parent_info else "?" ) markdown_lines.append( f" - Parent Structure: `{ultimate_parent_name}`" ) # Removed ID here # --- IO Aggregation Logic (from v24) --- aggregated_io_addresses = [] parent_structure_id = ( io_search_root_info.get("parent_id") if io_search_root_info else None ) io_search_root_name_disp = ( io_search_root_info.get("name", "?") if io_search_root_info else "?" ) if parent_structure_id: parent_structure_info = project_data.get("devices", {}).get( parent_structure_id ) parent_structure_name = ( parent_structure_info.get("name", "?") if parent_structure_info else "?" ) search_title = f"parent '{parent_structure_name}'" sibling_found_io = False for dev_scan_id, dev_scan_info in project_data.get( "devices", {} ).items(): if ( dev_scan_info.get("parent_id") == parent_structure_id ): io_from_sibling = find_io_recursively( dev_scan_id, project_data ) if io_from_sibling: aggregated_io_addresses.extend(io_from_sibling) sibling_found_io = True if ( not sibling_found_io and not aggregated_io_addresses ): # Only show message if list still empty markdown_lines.append( f" - *No IO Addresses found in modules under {search_title} (ID: {parent_structure_id}).*" ) elif io_search_root_id: search_title = f"'{io_search_root_name_disp}'" aggregated_io_addresses = find_io_recursively( io_search_root_id, project_data ) if not aggregated_io_addresses: markdown_lines.append( f" - *No IO Addresses found in modules under {search_title} (ID: {io_search_root_id}).*" ) else: markdown_lines.append( f" - *Could not determine structure to search for IO addresses.*" ) # --- End IO Aggregation --- # Display aggregated IO Addresses with Siemens format (Cleaned) if aggregated_io_addresses: markdown_lines.append( f" - **IO Addresses (Aggregated from Structure):**" ) # Removed redundant search root name sorted_agg_io = sorted( aggregated_io_addresses, key=lambda x: ( ( int(x.get("module_pos", "9999")) if x.get("module_pos", "9999").isdigit() else 9999 ), x.get("module_name", ""), x.get("type", ""), ( int(x.get("start", "0")) if x.get("start", "0").isdigit() else float("inf") ), ), ) last_module_id_key = None for addr_info in sorted_agg_io: current_module_id_key = ( addr_info.get("module_name", "?"), addr_info.get("module_pos", "?"), ) if current_module_id_key != last_module_id_key: markdown_lines.append( f" - **From Module:** {addr_info.get('module_name','?')} (Pos: {addr_info.get('module_pos','?')})" ) last_module_id_key = current_module_id_key # --- Siemens IO Formatting (from v25.1 - keep fixes) --- io_type = addr_info.get("type", "?") start_str = addr_info.get("start", "?") length_str = addr_info.get("length", "?") area_str = addr_info.get("area", "?") siemens_addr = f"FMT_ERROR" # Default error length_bits = 0 try: start_byte = int(start_str) length_bits = int(length_str) length_bytes = math.ceil( length_bits / 8.0 ) # Use float division if length_bits > 0 and length_bytes == 0: length_bytes = 1 # Handle len < 8 bits end_byte = start_byte + length_bytes - 1 prefix = "P?" if io_type.lower() == "input": prefix = "EW" elif io_type.lower() == "output": prefix = "AW" siemens_addr = f"{prefix} {start_byte}..{end_byte}" except Exception: # Catch any error during calc/format siemens_addr = ( f"FMT_ERROR({start_str},{length_str})" ) markdown_lines.append( f" - `{siemens_addr}` (Len={length_bits} bits)" # Simplified output ) # --- End Siemens IO Formatting --- # IO Connections logic remains the same... links_from = project_data.get("links_by_source", {}).get( display_id_title, [] ) links_to = project_data.get("links_by_target", {}).get( display_id_title, [] ) io_conns = [] for link in links_from: if "channel" in link["source_suffix"].lower(): target_str = f"{link.get('target_device_name', link['target_id'])}:{link['target_suffix']}" if link["target_id"] == display_id_title: target_str = link["target_suffix"] io_conns.append( f"`{link['source_suffix']}` → `{target_str}`" ) for link in links_to: if "channel" in link["target_suffix"].lower(): source_str = f"{link.get('source_device_name', link['source_id'])}:{link['source_suffix']}" if link["source_id"] == display_id_title: source_str = link["source_suffix"] io_conns.append( f"`{source_str}` → `{link['target_suffix']}`" ) if io_conns: markdown_lines.append( f" - **IO Connections (Channels):**" ) for conn in sorted(list(set(io_conns))): markdown_lines.append(f" - {conn}") markdown_lines.append("") # Spacing # --- *** END Display Logic *** --- try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"\nMarkdown summary written to: {md_file_path}") except Exception as e: print(f"ERROR writing Markdown file {md_file_path}: {e}") traceback.print_exc() # --- generate_io_upward_tree function (Unchanged from v23) --- def generate_io_upward_tree(project_data, md_file_path): """(v23) Generates a debug tree starting from IO addresses upwards.""" markdown_lines = ["# IO Address Upward Connection Trace (Debug v23)", ""] if not project_data or not project_data.get("devices"): markdown_lines.append("*No device data found.*") try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"\nIO upward debug tree written to: {md_file_path}") except Exception as e: print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}") return node_to_network_map = {} for net_id, net_info in project_data.get("networks", {}).items(): net_name = net_info.get("name", "?") for node_id, node_addr in net_info.get("devices_on_net", {}).items(): node_to_network_map[node_id] = (net_id, net_name, node_addr) devices_with_io = [] for dev_id, dev_info in project_data.get("devices", {}).items(): if dev_info.get("io_addresses"): devices_with_io.append((dev_id, dev_info)) if not devices_with_io: markdown_lines.append("*No devices with defined IO Addresses found.*") else: markdown_lines.append( f"Found {len(devices_with_io)} device(s)/module(s) with IO addresses. Tracing connections upwards:\n" ) devices_with_io.sort( key=lambda item: ( ( int(item[1].get("position", "9999")) if item[1].get("position", "9999").isdigit() else 9999 ), item[1].get("name", ""), ) ) for dev_id, dev_info in devices_with_io: markdown_lines.append( f"## IO Module: {dev_info.get('name', dev_id)} (ID: {dev_id})" ) markdown_lines.append(f"- Position: {dev_info.get('position', 'N/A')}") markdown_lines.append("- IO Addresses:") for addr in sorted( dev_info["io_addresses"], key=lambda x: ( x.get("type", ""), ( int(x.get("start", "0")) if x.get("start", "0").isdigit() else float("inf") ), ), ): markdown_lines.append( f" - `{addr.get('type','?').ljust(6)} Start={addr.get('start','?').ljust(4)} Len={addr.get('length','?').ljust(3)}` (Area: {addr.get('area','?')})" ) markdown_lines.append("- Upward Path:") current_id = dev_id current_info = dev_info indent = " " path_found = False ancestor_limit = 15 count = 0 while current_id and count < ancestor_limit: ancestor_name = current_info.get("name", "?") if current_info else "?" ancestor_pos = ( current_info.get("position", "N/A") if current_info else "N/A" ) markdown_lines.append( f"{indent}└─ {ancestor_name} (ID: {current_id}, Pos: {ancestor_pos})" ) if current_id in node_to_network_map: net_id, net_name, node_addr = node_to_network_map[current_id] markdown_lines.append(f"{indent} └─ **Network Connection Point**") markdown_lines.append( f"{indent} - Node: {ancestor_name} (ID: {current_id})" ) markdown_lines.append( f"{indent} - Network: {net_name} (ID: {net_id})" ) markdown_lines.append(f"{indent} - Address: `{node_addr}`") plc_connection_found = False for plc_id_check, plc_info_check in project_data.get( "plcs", {} ).items(): if net_id in plc_info_check.get("connected_networks", {}): markdown_lines.append( f"{indent} - **Network associated with PLC:** {plc_info_check.get('name','?')} (ID: {plc_id_check})" ) plc_connection_found = True break if not plc_connection_found: markdown_lines.append( f"{indent} - *Network not directly associated with a known PLC in data.*" ) path_found = True break if current_id in project_data.get("plcs", {}): markdown_lines.append(f"{indent} └─ **Is PLC:** {ancestor_name}") path_found = True break parent_id = current_info.get("parent_id") if current_info else None if parent_id: current_info = project_data.get("devices", {}).get(parent_id) if not current_info: markdown_lines.append( f"{indent} └─ Parent ID {parent_id} not found. Stopping trace." ) break current_id = parent_id indent += " " else: markdown_lines.append( f"{indent} └─ Reached top level (no parent)." ) break count += 1 if count >= ancestor_limit: markdown_lines.append( f"{indent} └─ Reached ancestor limit. Stopping trace." ) if not path_found: markdown_lines.append( f"{indent}└─ *Could not trace path to a known Network Node or PLC.*" ) markdown_lines.append("") try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"\nIO upward debug tree written to: {md_file_path}") except Exception as e: print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}") # --- process_aml_file function (unchanged from v22) --- def process_aml_file( aml_file_path, json_output_path, md_output_path, md_upward_output_path ): # (Unchanged) print(f"Processing AML file: {aml_file_path}") if not os.path.exists(aml_file_path): print(f"ERROR: Input AML file not found at {aml_file_path}") return try: parser = ET.XMLParser(remove_blank_text=True, huge_tree=True) tree = ET.parse(aml_file_path, parser) root = tree.getroot() project_data = extract_aml_data(root) # v15 extraction print(f"Generating JSON output: {json_output_path}") try: with open(json_output_path, "w", encoding="utf-8") as f: json.dump(project_data, f, indent=4, default=str) print(f"JSON data written successfully.") except Exception as e: print(f"ERROR writing JSON file {json_output_path}: {e}") traceback.print_exc() generate_markdown_tree(project_data, md_output_path) # v26 MD generation generate_io_upward_tree( project_data, md_upward_output_path ) # v23 upward generation except ET.LxmlError as xml_err: print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}") traceback.print_exc() except Exception as e: print(f"ERROR processing AML file {aml_file_path}: {e}") traceback.print_exc() def select_cax_file(initial_dir=None): # Add initial_dir parameter """Opens a dialog to select a CAx (XML) export file, starting in the specified directory.""" root = tk.Tk() root.withdraw() file_path = filedialog.askopenfilename( title="Select CAx Export File (AML)", filetypes=[ ("AML Files", "*.aml"), ("All Files", "*.*")], # Added AML initialdir=initial_dir # Set the initial directory ) root.destroy() if not file_path: print("No CAx file selected. Exiting.") sys.exit(0) return file_path def select_output_directory(): """Opens a dialog to select the output directory.""" root = tk.Tk() root.withdraw() dir_path = filedialog.askdirectory( title="Select Output Directory for JSON and MD files" # Updated title slightly ) root.destroy() if not dir_path: print("No output directory selected. Exiting.") sys.exit(0) return dir_path # --- Main Execution --- if __name__ == "__main__": configs = load_configuration() working_directory = configs.get("working_directory") script_version = "v28 - Working Directory Integration" # Updated version print( f"--- AML (CAx Export) to Hierarchical JSON and Obsidian MD Converter ({script_version}) ---" ) # Validate working directory if not working_directory or not os.path.isdir(working_directory): print("ERROR: Working directory not set or invalid in configuration.") print("Attempting to use script's directory as fallback.") # Fallback to script's directory or current directory if needed working_directory = os.path.dirname(os.path.abspath(__file__)) if not os.path.isdir(working_directory): working_directory = os.getcwd() print(f"Using fallback directory: {working_directory}") # Optionally, prompt user to select a working directory here if critical # output_dir = select_output_directory() # Keep this if you want user selection on failure # Use working_directory as the output directory output_dir = working_directory print(f"Using Working Directory for Output: {output_dir}") # 1. Select Input CAx File, starting in the working directory # Pass working_directory to the selection function cax_file_path = select_cax_file(initial_dir=working_directory) # Convert paths to Path objects input_path = Path(cax_file_path) output_path = Path(output_dir) # Output path is the working directory # Check if input file exists if not input_path.is_file(): print(f"ERROR: Input file '{input_path}' not found or is not a file.") sys.exit(1) # Ensure output directory exists (redundant if working_directory is valid, but safe) output_path.mkdir(parents=True, exist_ok=True) # Construct output file paths within the selected output directory (working_directory) output_json_file = output_path / input_path.with_suffix(".hierarchical.json").name output_md_file = output_path / input_path.with_name(f"{input_path.stem}_Hardware_Tree.md") # Simplified name output_md_upward_file = output_path / input_path.with_name(f"{input_path.stem}_IO_Upward_Debug.md") # Simplified name print(f"Input AML: {input_path.resolve()}") print(f"Output Directory: {output_path.resolve()}") print(f"Output JSON: {output_json_file.resolve()}") print(f"Output Main Tree MD: {output_md_file.resolve()}") print(f"Output IO Debug Tree MD: {output_md_upward_file.resolve()}") # Process the selected file and save outputs to the selected directory process_aml_file( str(input_path), str(output_json_file), str(output_md_file), str(output_md_upward_file), ) print("\nScript finished.")