import os import sys import traceback # Import lxml from lxml import etree as ET import json from pathlib import Path import re # --- Configuration --- # (No changes needed here) # CORRECTED function from v6 def extract_aml_data_v7(root): """Extracts device information using lxml and local-name() XPath. (Corrected)""" project_data = {"devices": {}, "connections": []} # Use dict for devices instance_hierarchies = root.xpath(".//*[local-name()='InstanceHierarchy']") if not instance_hierarchies: print("ERROR: Could not find 'InstanceHierarchy'.") return project_data ih = instance_hierarchies[0] print(f"Processing InstanceHierarchy: {ih.get('Name', 'N/A')}") internal_elements = ih.xpath(".//*[local-name()='InternalElement']") print(f"Found {len(internal_elements)} InternalElement(s). Analyzing...") # --- Device Loop --- for elem in internal_elements: elem_id = elem.get("ID", None) if not elem_id: continue device_info = { "name": elem.get("Name", "N/A"), "id": elem_id, "class": "N/A", "type_identifier": "N/A", "order_number": "N/A", "type_name": "N/A", "firmware_version": "N/A", # Added firmware field "position": elem.get("PositionNumber", "N/A"), "attributes": {}, "interfaces": [], "network_nodes": [], "io_addresses": [], } # Get Device Class/Type class_tag = elem.xpath("./*[local-name()='SystemUnitClass']") if class_tag: device_info["class"] = class_tag[0].get( "Path", elem.get("RefBaseSystemUnitPath", "N/A") ) else: device_info["class"] = elem.get("RefBaseSystemUnitPath", "N/A") # Extract Attributes attributes = elem.xpath( "./*[local-name()='Attribute']" ) # Direct attributes first if not attributes: attributes = elem.xpath(".//*[local-name()='Attribute']") # Fallback nested for attr in attributes: attr_name = attr.get("Name", "") # Get text value of the direct child 'Value' tag value_elem = attr.xpath( "./*[local-name()='Value']/text()" ) # CORRECT variable name attr_value = ( value_elem[0] if value_elem else "" ) # USE CORRECT variable name here # Store common identifying attributes directly if attr_name == "TypeIdentifier": device_info["type_identifier"] = attr_value if "OrderNumber:" in attr_value: device_info["order_number"] = attr_value.split("OrderNumber:")[-1] elif attr_name == "TypeName": device_info["type_name"] = attr_value elif attr_name == "FirmwareVersion": device_info["firmware_version"] = attr_value # Store all attributes for reference device_info["attributes"][attr_name] = attr_value # Extract Detailed IO Addresses if attr_name == "Address": address_parts = attr.xpath("./*[local-name()='Attribute']") for part in address_parts: addr_details = { "area": part.get("Name", "?"), "start": "N/A", "length": "N/A", "type": "N/A", } start_val = part.xpath( "./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()" ) len_val = part.xpath( "./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()" ) type_val = part.xpath( "./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()" ) if start_val: addr_details["start"] = start_val[0] if len_val: addr_details["length"] = len_val[0] if type_val: addr_details["type"] = type_val[0] if addr_details["start"] != "N/A": device_info["io_addresses"].append(addr_details) # Extract External Interfaces interfaces = elem.xpath("./*[local-name()='ExternalInterface']") for interface in interfaces: interface_info = { "name": interface.get("Name", "N/A"), "id": interface.get("ID", "N/A"), "ref_base_class": interface.get("RefBaseClassPath", "N/A"), } device_info["interfaces"].append(interface_info) # Extract Network Nodes network_nodes = elem.xpath( ".//*[local-name()='InternalElement']/*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]" ) for node_role in network_nodes: node_elem = node_role.getparent() node_info = { "name": node_elem.get("Name", "N/A"), "type": "N/A", "address": "N/A", } type_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()" ) addr_attr = node_elem.xpath( "./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()" ) if type_attr: node_info["type"] = type_attr[0] if addr_attr: node_info["address"] = addr_attr[0] if node_info["address"] != "N/A": device_info["network_nodes"].append(node_info) project_data["devices"][elem_id] = device_info # Find and process InternalLinks internal_links = root.xpath(".//*[local-name()='InternalLink']") print(f"Found {len(internal_links)} InternalLink(s) globally.") for link in internal_links: side_a_match = re.match(r"([^:]+):?(.*)", link.get("RefPartnerSideA", "")) side_b_match = re.match(r"([^:]+):?(.*)", link.get("RefPartnerSideB", "")) side_a_id = side_a_match.group(1) if side_a_match else "N/A" side_a_suffix = ( side_a_match.group(2) if side_a_match and side_a_match.group(2) else side_a_id ) side_b_id = side_b_match.group(1) if side_b_match else "N/A" side_b_suffix = ( side_b_match.group(2) if side_b_match and side_b_match.group(2) else side_b_id ) link_info = { "name": link.get("Name", "N/A"), "side_a_id": side_a_id, "side_a_ref_suffix": side_a_suffix, "side_b_id": side_b_id, "side_b_ref_suffix": side_b_suffix, "side_a_device_name": project_data["devices"] .get(side_a_id, {}) .get("name", side_a_id), "side_b_device_name": project_data["devices"] .get(side_b_id, {}) .get("name", side_b_id), } project_data["connections"].append(link_info) return project_data # --- generate_markdown_obsidian function remains the same as in v6 --- def generate_markdown_obsidian(project_data, md_file_path): """Generates structured Markdown output for Obsidian.""" def generate_table(headers, rows): lines = [] if not rows: return ["No data available."] lines.append("| " + " | ".join(headers) + " |") lines.append("|" + "---|" * len(headers)) for row in rows: lines.append("| " + " | ".join(map(str, row)) + " |") return lines markdown_lines = ["# Project Hardware & IO Summary (from CAx Export)", ""] network_rows = [] for device_id, device in project_data["devices"].items(): for node in device.get("network_nodes", []): network_rows.append( [ device.get("name", "N/A"), node.get("name", "N/A"), node.get("type", "N/A"), node.get("address", "N/A"), ] ) markdown_lines.append("## Network Configuration") markdown_lines.extend( generate_table( ["Parent Device", "Interface/Node Name", "Type", "Address (IP/DP)"], network_rows, ) ) markdown_lines.append("") io_module_rows = [] for device_id, device in project_data["devices"].items(): if device.get("io_addresses"): address_strs = [ f"{addr['type']} Start:{addr['start']} Len:{addr['length']} (Area:{addr['area']})" for addr in device["io_addresses"] ] io_module_rows.append( [ device.get("name", "N/A"), device.get("type_name", "N/A"), device.get("order_number", "N/A"), device.get("position", "N/A"), "
".join(address_strs), ] ) markdown_lines.append("## I/O Modules & Addresses") markdown_lines.extend( generate_table( [ "Module Name", "Type Name", "Order Number", "Slot/Pos", "Logical Addresses", ], io_module_rows, ) ) markdown_lines.append("") connection_rows = [] for i, conn in enumerate(project_data.get("connections", [])): if i >= 50: # Limit links shown in MD connection_rows.append(["...", "...", "..."]) break source = f"{conn.get('side_a_device_name', 'UNKNOWN')}::{conn.get('side_a_ref_suffix', 'N/A')}" target = f"{conn.get('side_b_device_name', 'UNKNOWN')}::{conn.get('side_b_ref_suffix', 'N/A')}" connection_rows.append([conn.get("name", "N/A"), f"`{source}`", f"`{target}`"]) markdown_lines.append("## Connections / IO Tag Links") markdown_lines.extend( generate_table( [ "Link Name", "Source (Device::Channel/Interface)", "Target (Device::Tag/Interface)", ], connection_rows, ) ) markdown_lines.append("") try: with open(md_file_path, "w", encoding="utf-8") as f: f.write("\n".join(markdown_lines)) print(f"Markdown summary written to: {md_file_path}") except Exception as e: print(f"ERROR writing Markdown file {md_file_path}: {e}") # --- process_aml_file_v7 function calls the corrected extraction --- def process_aml_file_v7(aml_file_path, json_output_path, md_output_path): """Main function using lxml with local-name() and corrected extraction.""" print(f"Processing AML file: {aml_file_path}") if not os.path.exists(aml_file_path): print(f"ERROR: Input AML file not found at {aml_file_path}") return try: parser = ET.XMLParser(remove_blank_text=True) tree = ET.parse(aml_file_path, parser) root = tree.getroot() project_data = extract_aml_data_v7(root) # Call corrected function print(f"Generating JSON output: {json_output_path}") try: with open(json_output_path, "w", encoding="utf-8") as f: json.dump(project_data, f, indent=4, default=str) print(f"JSON data written successfully.") except Exception as e: print(f"ERROR writing JSON file {json_output_path}: {e}") generate_markdown_obsidian( project_data, md_output_path ) # Use the same MD generator except ET.LxmlError as xml_err: print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}") except Exception as e: print(f"ERROR processing AML file {aml_file_path}: {e}") traceback.print_exc() # --- Main Execution --- if __name__ == "__main__": print("--- AML (CAx Export) to JSON and Obsidian MD Converter (v7 - Corrected) ---") input_aml_file = "SAE196_c0.2.XML_CAx_Export.xml" input_path = Path(input_aml_file) if not input_path.is_file(): print(f"ERROR: Input file '{input_aml_file}' not found.") sys.exit(1) output_json_file = input_path.with_suffix(".detailed.json") output_md_file = input_path.with_name(f"{input_path.stem}_Obsidian_Summary.md") print(f"Input AML: {input_path}") print(f"Output JSON: {output_json_file}") print(f"Output Markdown: {output_md_file}") process_aml_file_v7( str(input_path), str(output_json_file), str(output_md_file) ) # Call v7 print("\nScript finished.")