""" export_io_from_CAx : Script que sirve para exraer los IOs de un proyecto de TIA Portal y generar un archivo Markdown con la información. """ import os import sys import tkinter as tk from tkinter import filedialog import traceback from lxml import etree as ET import json from pathlib import Path import re import math # Needed for ceil script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration # --- extract_aml_data function (Unchanged from v15) --- def extract_aml_data(root): """(v15 logic - Unchanged) Extracts data, correcting PLC network association lookup.""" data = { "plcs": {}, "networks": {}, "devices": {}, "links_by_source": {}, "links_by_target": {}, "connections": [], } device_id_to_parent_device = {} all_elements = root.xpath(".//*[local-name()='InternalElement']") print( f"Pass 1: Found {len(all_elements)} InternalElement(s). Populating device dictionary..." ) # (Pass 1 logic remains unchanged) for elem in all_elements: elem_id = elem.get("ID", None) if not elem_id: continue device_info = { "name": elem.get("Name", "N/A"), "id": elem_id, "class": "N/A", "type_identifier": "N/A", "order_number": "N/A", "type_name": "N/A", "firmware_version": "N/A", "position": elem.get("PositionNumber", "N/A"), "attributes": {}, "interfaces": [], "network_nodes": [], "io_addresses": [], "children_ids": [ c.get("ID") for c in elem.xpath("./*[local-name()='InternalElement']") if c.get("ID") ], "parent_id": ( elem.xpath("parent::*[local-name()='InternalElement']/@ID")[0] if elem.xpath("parent::*[local-name()='InternalElement']") else None ), } class_tag = elem.xpath("./*[local-name()='SystemUnitClass']") device_info["class"] = ( class_tag[0].get("Path", elem.get("RefBaseSystemUnitPath", "N/A")) if class_tag else elem.get("RefBaseSystemUnitPath", "N/A") ) attributes = elem.xpath("./*[local-name()='Attribute']") for attr in attributes: attr_name = attr.get("Name", "") value_elem = attr.xpath("./*[local-name()='Value']/text()") attr_value = value_elem[0] if value_elem else "" device_info["attributes"][attr_name] = attr_value if attr_name == "TypeIdentifier": device_info["type_identifier"] = attr_value if "OrderNumber:" in attr_value: device_info["order_number"] = attr_value.split("OrderNumber:")[ -1 ].strip() elif attr_name == "TypeName": device_info["type_name"] = attr_value elif attr_name == "FirmwareVersion": device_info["firmware_version"] = attr_value elif attr_name == "Address": address_parts = attr.xpath("./*[local-name()='Attribute']") for part in address_parts: addr_details = { "area": part.get("Name", "?"), "start": "N/A", "length": "N/A", "type": "N/A", } start_val = part.xpath( "./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()" ) len_val = part.xpath( "./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()" ) type_val = part.xpath( "./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()" ) if start_val: addr_details["start"] = start_val[0] if len_val: addr_details["length"] = len_val[0] if type_val: addr_details["type"] = type_val[0] if addr_details["start"] != "N/A": device_info["io_addresses"].append(addr_details) interfaces = elem.xpath("./*[local-name()='ExternalInterface']") for interface in interfaces: device_info["interfaces"].append( { "name": interface.get("Name", "N/A"), "id": interface.get("ID", "N/A"), "ref_base_class": interface.get("RefBaseClassPath", "N/A"), } ) network_node_elements = elem.xpath( "./*[local-name()='InternalElement'][*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]]" ) if not network_node_elements and elem.xpath( "./*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]" ): network_node_elements = [elem] for node_elem in network_node_elements: node_id = node_elem.get("ID") if not node_id: continue node_info = { "id": node_id, "name": node_elem.get("Name", device_info["name"]), "type": "N/A", "address": "N/A", } type_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()" ) addr_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()" ) if type_attr: node_info["type"] = type_attr[0] if addr_attr: node_info["address"] = addr_attr[0] if node_info["address"] == "N/A": parent_addr_attr = elem.xpath( "./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()" ) if parent_addr_attr: node_info["address"] = parent_addr_attr[0] if node_info["type"] == "N/A": parent_type_attr = elem.xpath( "./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()" ) if parent_type_attr: node_info["type"] = parent_type_attr[0] if node_info["address"] != "N/A": len_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()" ) node_info["length"] = len_attr[0] if len_attr else "N/A" device_info["network_nodes"].append(node_info) device_id_to_parent_device[node_id] = elem_id data["devices"][elem_id] = device_info print("Pass 2: Identifying PLCs and Networks (Refined v2)...") plc_ids_found = set() elem_map = {elem.get("ID"): elem for elem in all_elements if elem.get("ID")} for dev_id, device in data["devices"].items(): is_plc = False plc_order_prefixes = [ "6ES7 516-3FP03", "6ES7 151", "6ES7 31", "6ES7 41", "6ES7 51", ] if any( device.get("order_number", "N/A").startswith(prefix) for prefix in plc_order_prefixes ): is_plc = True elif ( "CPU" in device.get("type_name", "").upper() or "PLC" in device.get("type_name", "").upper() ): is_plc = True if is_plc: parent_id = device.get("parent_id") is_child_of_plc = False current_parent = parent_id while current_parent: if current_parent in plc_ids_found: is_child_of_plc = True break current_parent = ( data["devices"].get(current_parent, {}).get("parent_id") ) if not is_child_of_plc: if dev_id not in plc_ids_found: print( f" Identified PLC: {device['name']} ({dev_id}) - Type: {device.get('type_name', 'N/A')} OrderNo: {device.get('order_number', 'N/A')}" ) device["connected_networks"] = {} data["plcs"][dev_id] = device plc_ids_found.add(dev_id) is_network = False net_type = "Unknown" elem = elem_map.get(dev_id) if elem is not None: role_classes = elem.xpath( "./*[local-name()='SupportedRoleClass']/@RefRoleClassPath" ) is_subnet_by_role = any("SUBNET" in rc.upper() for rc in role_classes) if is_subnet_by_role: is_network = True for rc in role_classes: rc_upper = rc.upper() if "PROFINET" in rc_upper or "ETHERNET" in rc_upper: net_type = "Ethernet/Profinet" break elif "PROFIBUS" in rc_upper: net_type = "Profibus" break if net_type == "Unknown": if "PROFIBUS" in device["name"].upper(): net_type = "Profibus" elif ( "ETHERNET" in device["name"].upper() or "PROFINET" in device["name"].upper() ): net_type = "Ethernet/Profinet" if is_network: if dev_id not in data["networks"]: print( f" Identified Network: {device['name']} ({dev_id}) Type: {net_type}" ) data["networks"][dev_id] = { "name": device["name"], "type": net_type, "devices_on_net": {}, } print("Pass 3: Processing InternalLinks (Robust Network Mapping & IO)...") internal_links = root.xpath(".//*[local-name()='InternalLink']") print(f"Found {len(internal_links)} InternalLink(s).") conn_id_counter = 0 for link in internal_links: conn_id_counter += 1 link_name = link.get("Name", f"Link_{conn_id_counter}") side_a_ref = link.get("RefPartnerSideA", "") side_b_ref = link.get("RefPartnerSideB", "") side_a_match = re.match(r"([^:]+):?(.*)", side_a_ref) side_b_match = re.match(r"([^:]+):?(.*)", side_b_ref) side_a_id = side_a_match.group(1) if side_a_match else "N/A" side_a_suffix = ( side_a_match.group(2) if side_a_match and side_a_match.group(2) else side_a_id ) side_b_id = side_b_match.group(1) if side_b_match else "N/A" side_b_suffix = ( side_b_match.group(2) if side_b_match and side_b_match.group(2) else side_b_id ) network_id, device_node_id = None, None side_a_is_network = side_a_id in data["networks"] side_b_is_network = side_b_id in data["networks"] if side_a_is_network and not side_b_is_network: network_id, device_node_id = side_a_id, side_b_id elif side_b_is_network and not side_a_is_network: network_id, device_node_id = side_b_id, side_a_id elif side_a_is_network and side_b_is_network: continue elif not side_a_is_network and not side_b_is_network: pass if network_id and device_node_id: linked_device_id = device_id_to_parent_device.get(device_node_id) if not linked_device_id and device_node_id in data["devices"]: linked_device_id = device_node_id if linked_device_id and linked_device_id in data["devices"]: device_info = data["devices"].get(linked_device_id) if not device_info: continue address = "N/A" node_info_for_addr = data["devices"].get(device_node_id, {}) for node in node_info_for_addr.get("network_nodes", []): if node.get("id") == device_node_id: address = node.get("address", "N/A") break if address == "N/A": address = node_info_for_addr.get("attributes", {}).get( "NetworkAddress", "N/A" ) if address == "N/A": address = device_info.get("attributes", {}).get( "NetworkAddress", "N/A" ) node_name_for_log = node_info_for_addr.get("name", device_node_id) print( f" Mapping Device/Node '{node_name_for_log}' (NodeID:{device_node_id}, Addr:{address}) to Network '{data['networks'][network_id]['name']}'" ) data["networks"][network_id]["devices_on_net"][device_node_id] = address potential_plc_id = None interface_id = None interface_info = None node_check_info = data["devices"].get(device_node_id) if node_check_info: if device_node_id in data["plcs"]: potential_plc_id = device_node_id else: interface_id = node_check_info.get("parent_id") if interface_id and interface_id in data["devices"]: interface_info = data["devices"].get(interface_id) if interface_info: if interface_id in data["plcs"]: potential_plc_id = interface_id elif ( interface_info.get("parent_id") and interface_info["parent_id"] in data["plcs"] ): potential_plc_id = interface_info["parent_id"] if potential_plc_id: plc_object = data["plcs"][potential_plc_id] if "connected_networks" not in plc_object: plc_object["connected_networks"] = {} if network_id not in plc_object.get("connected_networks", {}): print( f" --> Associating Network '{data['networks'][network_id]['name']}' with PLC '{plc_object.get('name', 'Unknown PLC')}' (via Node '{node_name_for_log}' Addr: {address})" ) data["plcs"][potential_plc_id]["connected_networks"][ network_id ] = address elif ( plc_object["connected_networks"][network_id] == "N/A" and address != "N/A" ): print( f" --> Updating address for Network '{data['networks'][network_id]['name']}' on PLC '{plc_object.get('name', 'Unknown PLC')}' to: {address}" ) data["plcs"][potential_plc_id]["connected_networks"][ network_id ] = address else: print( f" Warning: Could not map linked device ID {linked_device_id} (from NodeID {device_node_id}) to any known device." ) continue if not network_id: # Generic links source_id, source_suffix, target_id, target_suffix = ( side_a_id, side_a_suffix, side_b_id, side_b_suffix, ) if ("Channel" in side_b_suffix or "Parameter" in side_b_suffix) and ( "Channel" not in side_a_suffix and "Parameter" not in side_a_suffix ): source_id, source_suffix, target_id, target_suffix = ( side_b_id, side_b_suffix, side_a_id, side_a_suffix, ) if source_id != "N/A" and target_id != "N/A": if source_id not in data["links_by_source"]: data["links_by_source"][source_id] = [] if target_id not in data["links_by_target"]: data["links_by_target"][target_id] = [] link_detail = { "name": link_name, "source_id": source_id, "source_suffix": source_suffix, "target_id": target_id, "target_suffix": target_suffix, "source_device_name": data["devices"] .get(source_id, {}) .get("name", source_id), "target_device_name": data["devices"] .get(target_id, {}) .get("name", target_id), } data["links_by_source"][source_id].append(link_detail) data["links_by_target"][target_id].append(link_detail) data["connections"].append(link_detail) print("Data extraction and structuring complete.") return data # --- Helper Function for Recursive IO Search (Unchanged from v20) --- def find_io_recursively(device_id, project_data, module_context): """ Recursively finds all IO addresses under a given device ID, using module_context for details of the main hardware module. module_context = {"id": ..., "name": ..., "order_number": ..., "type_name": ...} """ io_list = [] device_info = project_data.get("devices", {}).get(device_id) if not device_info: return io_list if device_info.get("io_addresses"): # Slot position is from the current device_info (which holds the IO) # It's often found in attributes.PositionNumber for sub-elements. slot_pos = device_info.get("attributes", {}).get("PositionNumber", device_info.get("position", "N/A")) for addr in device_info["io_addresses"]: io_list.append( { "module_id": module_context["id"], "module_name": module_context["name"], "module_pos": slot_pos, # Slot of the IO sub-element "module_order_number": module_context["order_number"], "module_type_name": module_context["type_name"], **addr, } ) children_ids = device_info.get("children_ids", []) for child_id in children_ids: if child_id != device_id: # Basic loop prevention # The module_context remains the same as we recurse *within* a main module's structure io_list.extend(find_io_recursively(child_id, project_data, module_context)) return io_list # --- generate_io_summary_file function (Updated) --- def generate_io_summary_file(all_plc_io_for_table, md_file_path, plc_name, project_data, output_root_path): """ Generates a Hardware.md file with the IO summary table. If there's only one PLC, creates it in the root directory, otherwise creates PLC-specific named files. """ # Determine if this is the only PLC in the project plcs_count = len(project_data.get("plcs", {})) is_single_plc = plcs_count == 1 if is_single_plc: # For single PLC: create Hardware.md in the root directory hardware_file_path = os.path.join(output_root_path, "Hardware.md") file_title = f"# IO Summary Table for PLC: {plc_name}" else: # For multiple PLCs: create [PLC_Name]_Hardware.md in PLC's directory hardware_file_path = os.path.join(os.path.dirname(md_file_path), f"{sanitize_filename(plc_name)}_Hardware.md") file_title = f"# IO Summary Table for PLC: {plc_name}" markdown_lines = [file_title, ""] if all_plc_io_for_table: # Define table headers headers = [ "Network", "Type", "Address", "Device Name", "Sub-Device", "OrderNo", "Type", "IO Type", "IO Address", "Number of Bits" ] markdown_lines.append("| " + " | ".join(headers) + " |") markdown_lines.append("|-" + "-|-".join(["---"] * len(headers)) + "-|") # Sort the collected data sorted_table_data = sorted(all_plc_io_for_table, key=lambda x: x["SortKey"]) # Add rows to the table for row_data in sorted_table_data: row = [ row_data.get("Network", "N/A"), row_data.get("Network Type", "N/A"), row_data.get("Device Address", "N/A"), row_data.get("Device Name", "N/A"), row_data.get("Sub-Device", "N/A"), row_data.get("Sub-Device OrderNo", "N/A"), row_data.get("Sub-Device Type", "N/A"), row_data.get("IO Type", "N/A"), f"`{row_data.get('IO Address', 'N/A')}`", # Format IO Address as code row_data.get("Number of Bits", "N/A"), ] # Escape pipe characters within cell content if necessary row = [str(cell).replace('|', '\\|') for cell in row] markdown_lines.append("| " + " | ".join(row) + " |") else: markdown_lines.append("*No IO data found for this PLC.*") try: with open(hardware_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"IO summary table written to: {hardware_file_path}") except Exception as e: print(f"ERROR writing Hardware.md file {hardware_file_path}: {e}") traceback.print_exc() return hardware_file_path # --- generate_markdown_tree function --- def generate_markdown_tree(project_data, md_file_path, target_plc_id, output_root_path): """(Modified) Generates hierarchical Markdown for a specific PLC.""" plc_info = project_data.get("plcs", {}).get(target_plc_id) plc_name_for_title = "Unknown PLC" if plc_info: plc_name_for_title = plc_info.get('name', target_plc_id) # v31: Initialize list to store all IO data for the summary table for this PLC all_plc_io_for_table = [] markdown_lines = [f"# Hardware & IO Summary for PLC: {plc_name_for_title}", ""] if not plc_info: markdown_lines.append(f"*Details for PLC ID '{target_plc_id}' not found in the project data.*") try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"Markdown summary (PLC not found) written to: {md_file_path}") except Exception as e: print(f"ERROR writing Markdown file {md_file_path}: {e}") return # Content previously in the loop now directly uses plc_info and target_plc_id markdown_lines.append(f"\n## PLC: {plc_info.get('name', target_plc_id)}") type_name = plc_info.get("type_name", "N/A") order_num = plc_info.get("order_number", "N/A") firmware = plc_info.get("firmware_version", "N/A") if type_name and type_name != "N/A": markdown_lines.append(f"- **Type Name:** `{type_name}`") if order_num and order_num != "N/A": markdown_lines.append(f"- **Order Number:** `{order_num}`") if firmware and firmware != "N/A": markdown_lines.append(f"- **Firmware:** `{firmware}`") plc_networks = plc_info.get("connected_networks", {}) markdown_lines.append("\n- **Networks:**") if not plc_networks: markdown_lines.append( " - *No network connections found associated with this PLC object.*" ) else: sorted_network_items = sorted( plc_networks.items(), key=lambda item: project_data.get("networks", {}) .get(item[0], {}) .get("name", item[0]), ) for net_id, plc_addr_on_net in sorted_network_items: net_info = project_data.get("networks", {}).get(net_id) if not net_info: markdown_lines.append( f" - !!! Error: Network info missing for ID {net_id} !!!" ) continue markdown_lines.append( f" - ### {net_info.get('name', net_id)} ({net_info.get('type', 'Unknown')})" ) markdown_lines.append( f" - PLC Address on this Net: `{plc_addr_on_net}`" ) markdown_lines.append(f" - **Devices on Network:**") devices_on_this_net = net_info.get("devices_on_net", {}) def sort_key(item): node_id, node_addr = item try: parts = [int(p) for p in re.findall(r"\d+", node_addr)] return parts except: return [float("inf")] plc_interface_and_node_ids = set() for node in plc_info.get("network_nodes", []): plc_interface_and_node_ids.add(node["id"]) interface_id_lookup = ( project_data["devices"].get(node["id"], {}).get("parent_id") ) if interface_id_lookup: plc_interface_and_node_ids.add(interface_id_lookup) plc_interface_and_node_ids.add(target_plc_id) # Use target_plc_id here other_device_items = sorted( [ (node_id, node_addr) for node_id, node_addr in devices_on_this_net.items() if node_id not in plc_interface_and_node_ids ], key=sort_key, ) if not other_device_items: markdown_lines.append(" - *None (besides PLC interfaces)*") else: # --- Display Logic with Sibling IO Aggregation & Aesthetics --- for node_id, node_addr in other_device_items: # v31: Initialize list for table data for the current device being processed current_device_io_for_table = [] node_info = project_data.get("devices", {}).get(node_id) if not node_info: markdown_lines.append( f" - !!! Error: Node info missing for ID {node_id} Addr: {node_addr} !!!" ) continue interface_id = node_info.get("parent_id") interface_info_dev = None # Renamed to avoid conflict actual_device_id = None actual_device_info = None rack_id = None # rack_info = None # rack_info was not used if interface_id: interface_info_dev = project_data.get("devices", {}).get( interface_id ) if interface_info_dev: actual_device_id = interface_info_dev.get("parent_id") if actual_device_id: actual_device_info = project_data.get( "devices", {} ).get(actual_device_id) if actual_device_info: potential_rack_id = actual_device_info.get( "parent_id" ) if potential_rack_id: potential_rack_info = project_data.get( "devices", {} ).get(potential_rack_id) if potential_rack_info and ( "Rack" in potential_rack_info.get("name", "") or potential_rack_info.get("position") is None ): rack_id = potential_rack_id # rack_info = potential_rack_info # Not used display_info_title = ( actual_device_info if actual_device_info else (interface_info_dev if interface_info_dev else node_info) ) display_id_title = ( actual_device_id if actual_device_info else (interface_id if interface_info_dev else node_id) ) io_search_root_id = ( actual_device_id if actual_device_info else (interface_id if interface_info_dev else node_id) ) io_search_root_info = project_data.get("devices", {}).get( io_search_root_id ) # Construct Title display_name = display_info_title.get("name", display_id_title) via_node_name = node_info.get("name", node_id) title_str = f"#### {display_name}" if display_id_title != node_id: title_str += f" (via {via_node_name} @ `{node_addr}`)" else: title_str += f" (@ `{node_addr}`)" markdown_lines.append(f" - {title_str}") # Display Basic Details markdown_lines.append( f" - Address (on net): `{node_addr}`" ) type_name_disp = display_info_title.get("type_name", "N/A") order_num_disp = display_info_title.get("order_number", "N/A") pos_disp = display_info_title.get("position", "N/A") if type_name_disp and type_name_disp != "N/A": markdown_lines.append( f" - Type Name: `{type_name_disp}`" ) if order_num_disp and order_num_disp != "N/A": markdown_lines.append( f" - Order No: `{order_num_disp}`" ) if pos_disp and pos_disp != "N/A": markdown_lines.append( f" - Pos (in parent): `{pos_disp}`" ) ultimate_parent_id = rack_id if not ultimate_parent_id and actual_device_info: ultimate_parent_id = actual_device_info.get("parent_id") if ( ultimate_parent_id and ultimate_parent_id != display_id_title ): ultimate_parent_info = project_data.get("devices", {}).get( ultimate_parent_id ) ultimate_parent_name = ( ultimate_parent_info.get("name", "?") if ultimate_parent_info else "?" ) markdown_lines.append( f" - Parent Structure: `{ultimate_parent_name}`" ) # --- IO Aggregation Logic (from v24) --- aggregated_io_addresses = [] parent_structure_id = ( io_search_root_info.get("parent_id") if io_search_root_info else None ) io_search_root_name_disp = ( io_search_root_info.get("name", "?") if io_search_root_info else "?" ) if parent_structure_id: parent_structure_info = project_data.get("devices", {}).get( parent_structure_id ) parent_structure_name = ( parent_structure_info.get("name", "?") if parent_structure_info else "?" ) search_title = f"parent '{parent_structure_name}'" sibling_found_io = False for dev_scan_id, dev_scan_info in project_data.get( "devices", {} ).items(): if ( dev_scan_info.get("parent_id") == parent_structure_id ): # This dev_scan_info is the module module_context_for_sibling = { "id": dev_scan_id, "name": dev_scan_info.get("name", dev_scan_id), "order_number": dev_scan_info.get("order_number", "N/A"), "type_name": dev_scan_info.get("type_name", "N/A") } io_from_sibling = find_io_recursively( dev_scan_id, project_data, module_context_for_sibling ) if io_from_sibling: aggregated_io_addresses.extend(io_from_sibling) sibling_found_io = True if ( not sibling_found_io and not aggregated_io_addresses ): # Only show message if list still empty markdown_lines.append( f" - *No IO Addresses found in modules under {search_title} (ID: {parent_structure_id}).*" ) elif io_search_root_id: search_title = f"'{io_search_root_name_disp}'" module_context_for_root = { "id": io_search_root_id, "name": io_search_root_info.get("name", io_search_root_id), "order_number": io_search_root_info.get("order_number", "N/A"), "type_name": io_search_root_info.get("type_name", "N/A") } aggregated_io_addresses = find_io_recursively( io_search_root_id, project_data, module_context_for_root ) if not aggregated_io_addresses: markdown_lines.append( f" - *No IO Addresses found in modules under {search_title} (ID: {io_search_root_id}).*" ) else: markdown_lines.append( f" - *Could not determine structure to search for IO addresses.*" ) # --- End IO Aggregation --- # Display aggregated IO Addresses with Siemens format (Cleaned) if aggregated_io_addresses: markdown_lines.append( f" - **IO Addresses (Aggregated from Structure):**" ) sorted_agg_io = sorted( aggregated_io_addresses, key=lambda x: ( ( int(x.get("module_pos", "9999")) if str(x.get("module_pos", "9999")).isdigit() # Ensure it's a string before isdigit else 9999 ), x.get("module_name", ""), x.get("type", ""), ( int(x.get("start", "0")) if str(x.get("start", "0")).isdigit() # Ensure it's a string else float("inf") ), ), ) last_module_id_processed = None # Use the actual module ID for grouping for addr_info in sorted_agg_io: current_module_id_for_grouping = addr_info.get("module_id") if current_module_id_for_grouping != last_module_id_processed: module_name_disp = addr_info.get('module_name','?') module_type_name_disp = addr_info.get('module_type_name', 'N/A') module_order_num_disp = addr_info.get('module_order_number', 'N/A') module_line_parts = [f"**{module_name_disp}**"] if module_type_name_disp and module_type_name_disp != 'N/A': module_line_parts.append(f"Type: `{module_type_name_disp}`") if module_order_num_disp and module_order_num_disp != 'N/A': module_line_parts.append(f"OrderNo: `{module_order_num_disp}`") # Removed (Pos: ...) from this line as requested markdown_lines.append(f" - {', '.join(module_line_parts)}") last_module_id_processed = current_module_id_for_grouping # --- Siemens IO Formatting (from v25.1 - keep fixes) --- io_type = addr_info.get("type", "?") start_str = addr_info.get("start", "?") length_str = addr_info.get("length", "?") # area_str = addr_info.get("area", "?") # Not used in final output string siemens_addr = f"FMT_ERROR" # Default error length_bits = 0 try: start_byte = int(start_str) length_bits = int(length_str) length_bytes = math.ceil( length_bits / 8.0 ) # Use float division if length_bits > 0 and length_bytes == 0: length_bytes = 1 # Handle len < 8 bits end_byte = start_byte + length_bytes - 1 prefix = "P?" if io_type.lower() == "input": prefix = "EW" elif io_type.lower() == "output": prefix = "AW" siemens_addr = f"{prefix} {start_byte}..{end_byte}" except Exception: siemens_addr = ( f"FMT_ERROR({start_str},{length_str})" ) # v31: Collect data for the summary table (Corrected Indentation) current_device_io_for_table.append({ "Network": net_info.get('name', net_id), "Network Type": net_info.get('type', 'Unknown'), "Device Address": node_addr, "Device Name": display_name, # Main device name "Sub-Device": addr_info.get('module_name','?'), # Module name "Sub-Device OrderNo": addr_info.get('module_order_number', 'N/A'), "Sub-Device Type": addr_info.get('module_type_name', 'N/A'), "IO Type": io_type, "IO Address": siemens_addr, "Number of Bits": length_bits, "SortKey": ( # Add a sort key for the table net_info.get('name', net_id), sort_key((node_id, node_addr)), # Reuse the device sort key ( int(addr_info.get("module_pos", "9999")) if str(addr_info.get("module_pos", "9999")).isdigit() else 9999 ), addr_info.get("module_name", ""), io_type, ( int(addr_info.get("start", "0")) if str(addr_info.get("start", "0")).isdigit() else float("inf") ), ) }) markdown_lines.append( f" - `{siemens_addr}` (Len={length_bits} bits)" ) # --- End Siemens IO Formatting --- # IO Connections logic links_from = project_data.get("links_by_source", {}).get( display_id_title, [] ) links_to = project_data.get("links_by_target", {}).get( display_id_title, [] ) io_conns = [] for link in links_from: if "channel" in link["source_suffix"].lower(): target_str = f"{link.get('target_device_name', link['target_id'])}:{link['target_suffix']}" if link["target_id"] == display_id_title: target_str = link["target_suffix"] io_conns.append( f"`{link['source_suffix']}` → `{target_str}`" ) for link in links_to: if "channel" in link["target_suffix"].lower(): source_str = f"{link.get('source_device_name', link['source_id'])}:{link['source_suffix']}" if link["source_id"] == display_id_title: source_str = link["source_suffix"] io_conns.append( f"`{source_str}` → `{link['target_suffix']}`" ) if io_conns: markdown_lines.append( f" - **IO Connections (Channels):**" ) for conn in sorted(list(set(io_conns))): markdown_lines.append(f" - {conn}") # v31: Add collected IO for this device to the main list all_plc_io_for_table.extend(current_device_io_for_table) markdown_lines.append("") # Spacing # --- *** END Display Logic *** --- try: # Re-open the file in write mode to include the tree structure (without the table) with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"Markdown tree summary written to: {md_file_path}") # Generate the separate Hardware.md with the IO summary table if all_plc_io_for_table: hardware_file_path = generate_io_summary_file( all_plc_io_for_table, md_file_path, plc_name_for_title, project_data, output_root_path ) print(f"IO summary table generated in separate file: {hardware_file_path}") except Exception as e: print(f"ERROR writing Markdown file {md_file_path}: {e}") traceback.print_exc() # --- generate_io_upward_tree function (Unchanged from v23) --- def generate_io_upward_tree(project_data, md_file_path): """(v23) Generates a debug tree starting from IO addresses upwards.""" markdown_lines = ["# IO Address Upward Connection Trace (Debug v23)", ""] if not project_data or not project_data.get("devices"): markdown_lines.append("*No device data found.*") try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"\nIO upward debug tree written to: {md_file_path}") except Exception as e: print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}") return node_to_network_map = {} for net_id, net_info in project_data.get("networks", {}).items(): net_name = net_info.get("name", "?") for node_id, node_addr in net_info.get("devices_on_net", {}).items(): node_to_network_map[node_id] = (net_id, net_name, node_addr) devices_with_io = [] for dev_id, dev_info in project_data.get("devices", {}).items(): if dev_info.get("io_addresses"): devices_with_io.append((dev_id, dev_info)) if not devices_with_io: markdown_lines.append("*No devices with defined IO Addresses found.*") else: markdown_lines.append( f"Found {len(devices_with_io)} device(s)/module(s) with IO addresses. Tracing connections upwards:\n" ) devices_with_io.sort( key=lambda item: ( ( int(item[1].get("position", "9999")) if item[1].get("position", "9999").isdigit() else 9999 ), item[1].get("name", ""), ) ) for dev_id, dev_info in devices_with_io: markdown_lines.append( f"## IO Module: {dev_info.get('name', dev_id)} (ID: {dev_id})" ) markdown_lines.append(f"- Position: {dev_info.get('position', 'N/A')}") markdown_lines.append("- IO Addresses:") for addr in sorted( dev_info["io_addresses"], key=lambda x: ( x.get("type", ""), ( int(x.get("start", "0")) if x.get("start", "0").isdigit() else float("inf") ), ), ): markdown_lines.append( f" - `{addr.get('type','?').ljust(6)} Start={addr.get('start','?').ljust(4)} Len={addr.get('length','?').ljust(3)}` (Area: {addr.get('area','?')})" ) markdown_lines.append("- Upward Path:") current_id = dev_id current_info = dev_info indent = " " path_found = False ancestor_limit = 15 count = 0 while current_id and count < ancestor_limit: ancestor_name = current_info.get("name", "?") if current_info else "?" ancestor_pos = ( current_info.get("position", "N/A") if current_info else "N/A" ) markdown_lines.append( f"{indent}└─ {ancestor_name} (ID: {current_id}, Pos: {ancestor_pos})" ) if current_id in node_to_network_map: net_id, net_name, node_addr = node_to_network_map[current_id] markdown_lines.append(f"{indent} └─ **Network Connection Point**") markdown_lines.append( f"{indent} - Node: {ancestor_name} (ID: {current_id})" ) markdown_lines.append( f"{indent} - Network: {net_name} (ID: {net_id})" ) markdown_lines.append(f"{indent} - Address: `{node_addr}`") plc_connection_found = False for plc_id_check, plc_info_check in project_data.get( "plcs", {} ).items(): if net_id in plc_info_check.get("connected_networks", {}): markdown_lines.append( f"{indent} - **Network associated with PLC:** {plc_info_check.get('name','?')} (ID: {plc_id_check})" ) plc_connection_found = True break if not plc_connection_found: markdown_lines.append( f"{indent} - *Network not directly associated with a known PLC in data.*" ) path_found = True break if current_id in project_data.get("plcs", {}): markdown_lines.append(f"{indent} └─ **Is PLC:** {ancestor_name}") path_found = True break parent_id = current_info.get("parent_id") if current_info else None if parent_id: current_info = project_data.get("devices", {}).get(parent_id) if not current_info: markdown_lines.append( f"{indent} └─ Parent ID {parent_id} not found. Stopping trace." ) break current_id = parent_id indent += " " else: markdown_lines.append( f"{indent} └─ Reached top level (no parent)." ) break count += 1 if count >= ancestor_limit: markdown_lines.append( f"{indent} └─ Reached ancestor limit. Stopping trace." ) if not path_found: markdown_lines.append( f"{indent}└─ *Could not trace path to a known Network Node or PLC.*" ) markdown_lines.append("") try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"\nIO upward debug tree written to: {md_file_path}") except Exception as e: print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}") # --- extract_and_save_global_outputs function (Refactored from process_aml_file) --- def extract_and_save_global_outputs(aml_file_path, json_output_path, md_upward_output_path): """Extracts data from AML, saves global JSON and IO upward tree, returns project_data.""" # (Unchanged) print(f"Processing AML file: {aml_file_path}") if not os.path.exists(aml_file_path): print(f"ERROR: Input AML file not found at {aml_file_path}") return try: parser = ET.XMLParser(remove_blank_text=True, huge_tree=True) tree = ET.parse(aml_file_path, parser) root = tree.getroot() project_data = extract_aml_data(root) # v15 extraction print(f"Generating JSON output: {json_output_path}") try: with open(json_output_path, "w", encoding="utf-8") as f: json.dump(project_data, f, indent=4, default=str) print(f"JSON data written successfully.") except Exception as e: print(f"ERROR writing JSON file {json_output_path}: {e}") traceback.print_exc() # Generate and save the IO upward tree (global) generate_io_upward_tree( project_data, md_upward_output_path ) return project_data except ET.LxmlError as xml_err: print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}") traceback.print_exc() return None except Exception as e: print(f"ERROR processing AML file {aml_file_path}: {e}") traceback.print_exc() return None def select_cax_file(initial_dir=None): # Add initial_dir parameter """Opens a dialog to select a CAx (XML) export file, starting in the specified directory.""" root = tk.Tk() root.withdraw() file_path = filedialog.askopenfilename( title="Select CAx Export File (AML)", filetypes=[ ("AML Files", "*.aml"), ("All Files", "*.*")], # Added AML initialdir=initial_dir # Set the initial directory ) root.destroy() if not file_path: print("No CAx file selected. Exiting.") sys.exit(0) return file_path def select_output_directory(): """Opens a dialog to select the output directory.""" root = tk.Tk() root.withdraw() dir_path = filedialog.askdirectory( title="Select Output Directory for JSON and MD files" # Updated title slightly ) root.destroy() if not dir_path: print("No output directory selected. Exiting.") sys.exit(0) return dir_path def sanitize_filename(name): """Sanitizes a string to be used as a valid filename or directory name.""" name = str(name) # Ensure it's a string name = re.sub(r'[<>:"/\\|?*]', '_', name) # Replace forbidden characters name = re.sub(r'\s+', '_', name) # Replace multiple whitespace with single underscore name = name.strip('._') # Remove leading/trailing dots or underscores return name if name else "Unnamed_Device" # --- Main Execution --- if __name__ == "__main__": configs = load_configuration() working_directory = configs.get("working_directory") script_version = "v31.1 - Corrected IO Summary Table Initialization" # Updated version print( f"--- AML (CAx Export) to Hierarchical JSON and Obsidian MD Converter ({script_version}) ---" ) # Validate working directory if not working_directory or not os.path.isdir(working_directory): print("ERROR: Working directory not set or invalid in configuration.") print("Attempting to use script's directory as fallback.") # Fallback to script's directory or current directory if needed working_directory = os.path.dirname(os.path.abspath(__file__)) if not os.path.isdir(working_directory): working_directory = os.getcwd() print(f"Using fallback directory: {working_directory}") # Optionally, prompt user to select a working directory here if critical # output_dir = select_output_directory() # Keep this if you want user selection on failure # Use working_directory as the output directory output_dir = working_directory print(f"Using Working Directory for Output: {output_dir}") # 1. Select Input CAx File, starting in the working directory # Pass working_directory to the selection function cax_file_path = select_cax_file(initial_dir=working_directory) # Convert paths to Path objects input_path = Path(cax_file_path) output_path = Path(output_dir) # Output path is the working directory # Check if input file exists if not input_path.is_file(): print(f"ERROR: Input file '{input_path}' not found or is not a file.") sys.exit(1) # Ensure output directory exists (redundant if working_directory is valid, but safe) output_path.mkdir(parents=True, exist_ok=True) # Construct output file paths within the selected output directory (working_directory) output_json_file = output_path / input_path.with_suffix(".hierarchical.json").name # Hardware tree MD name is now PLC-specific and handled below output_md_upward_file = output_path / input_path.with_name(f"{input_path.stem}_IO_Upward_Debug.md") print(f"Input AML: {input_path.resolve()}") print(f"Output Directory: {output_path.resolve()}") print(f"Output JSON: {output_json_file.resolve()}") print(f"Output IO Debug Tree MD: {output_md_upward_file.resolve()}") # Process the AML file to get project_data and save global files project_data = extract_and_save_global_outputs( str(input_path), str(output_json_file), str(output_md_upward_file), ) if project_data: # Now, generate the hardware tree per PLC if not project_data.get("plcs"): print("\nNo PLCs found in the project data. Cannot generate PLC-specific hardware trees.") else: print(f"\nFound {len(project_data['plcs'])} PLC(s). Generating individual hardware trees...") for plc_id, plc_data_for_plc in project_data.get("plcs", {}).items(): plc_name_original = plc_data_for_plc.get('name', plc_id) plc_name_sanitized = sanitize_filename(plc_name_original) plc_doc_dir = output_path / plc_name_sanitized / "Documentation" plc_doc_dir.mkdir(parents=True, exist_ok=True) hardware_tree_md_filename = f"{input_path.stem}_Hardware_Tree.md" output_plc_md_file = plc_doc_dir / hardware_tree_md_filename print(f" Generating Hardware Tree for PLC '{plc_name_original}' (ID: {plc_id}) at: {output_plc_md_file.resolve()}") # Pass output_path as the root directory for Hardware.md placement generate_markdown_tree(project_data, str(output_plc_md_file), plc_id, str(output_path)) else: print("\nFailed to process AML data. Halting before generating PLC-specific trees.") print("\nScript finished.")