diff --git a/__pycache__/config_manager.cpython-310.pyc b/__pycache__/config_manager.cpython-310.pyc index fe74599..256b5f7 100644 Binary files a/__pycache__/config_manager.cpython-310.pyc and b/__pycache__/config_manager.cpython-310.pyc differ diff --git a/backend/__pycache__/script_utils.cpython-310.pyc b/backend/__pycache__/script_utils.cpython-310.pyc index 951067f..e2c68b8 100644 Binary files a/backend/__pycache__/script_utils.cpython-310.pyc and b/backend/__pycache__/script_utils.cpython-310.pyc differ diff --git a/backend/script_groups/ObtainIOFromProjectTia/esquema_group.json b/backend/script_groups/ObtainIOFromProjectTia/esquema_group.json new file mode 100644 index 0000000..1c9e43a --- /dev/null +++ b/backend/script_groups/ObtainIOFromProjectTia/esquema_group.json @@ -0,0 +1,4 @@ +{ + "type": "object", + "properties": {} +} \ No newline at end of file diff --git a/backend/script_groups/ObtainIOFromProjectTia/esquema_work.json b/backend/script_groups/ObtainIOFromProjectTia/esquema_work.json new file mode 100644 index 0000000..1c9e43a --- /dev/null +++ b/backend/script_groups/ObtainIOFromProjectTia/esquema_work.json @@ -0,0 +1,4 @@ +{ + "type": "object", + "properties": {} +} \ No newline at end of file diff --git a/backend/script_groups/ObtainIOFromProjectTia/x1.py b/backend/script_groups/ObtainIOFromProjectTia/x1.py new file mode 100644 index 0000000..7a5a762 --- /dev/null +++ b/backend/script_groups/ObtainIOFromProjectTia/x1.py @@ -0,0 +1,277 @@ +""" +export_logic_from_tia : +Script para exportar el software de un PLC desde TIA Portal en archivos XML y SCL. +""" +import tkinter as tk +from tkinter import filedialog +import os +import sys +import traceback + +script_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +) +sys.path.append(script_root) +from backend.script_utils import load_configuration + +# --- Configuration --- +TIA_PORTAL_VERSION = "18.0" # Target TIA Portal version (e.g., "18.0") +EXPORT_OPTIONS = None # Use default export options +KEEP_FOLDER_STRUCTURE = True # Replicate TIA project folder structure in export directory + +# --- TIA Scripting Import Handling --- +# Check if the TIA_SCRIPTING environment variable is set +if os.getenv('TIA_SCRIPTING'): + sys.path.append(os.getenv('TIA_SCRIPTING')) +else: + # Optional: Define a fallback path if the environment variable isn't set + # fallback_path = "C:\\path\\to\\your\\TIA_Scripting_binaries" + # if os.path.exists(fallback_path): + # sys.path.append(fallback_path) + pass # Allow import to fail if not found + +try: + import siemens_tia_scripting as ts + EXPORT_OPTIONS = ts.Enums.ExportOptions.WithDefaults # Set default options now that 'ts' is imported +except ImportError: + print("ERROR: Failed to import 'siemens_tia_scripting'.") + print("Ensure:") + print(f"1. TIA Portal Openness for V{TIA_PORTAL_VERSION} is installed.") + print("2. The 'siemens_tia_scripting' Python module is installed (pip install ...) or") + print(" the path to its binaries is set in the 'TIA_SCRIPTING' environment variable.") + print("3. You are using a compatible Python version (e.g., 3.12.X as per documentation).") + sys.exit(1) +except Exception as e: + print(f"An unexpected error occurred during import: {e}") + traceback.print_exc() + sys.exit(1) + +# --- Functions --- + +def select_project_file(): + """Opens a dialog to select a TIA Portal project file.""" + root = tk.Tk() + root.withdraw() # Hide the main tkinter window + file_path = filedialog.askopenfilename( + title="Select TIA Portal Project File", + filetypes=[(f"TIA Portal V{TIA_PORTAL_VERSION} Projects", f"*.ap{TIA_PORTAL_VERSION.split('.')[0]}")] # e.g. *.ap18 + ) + root.destroy() + if not file_path: + print("No project file selected. Exiting.") + sys.exit(0) + return file_path + +def select_export_directory(): + """Opens a dialog to select the export directory.""" + root = tk.Tk() + root.withdraw() # Hide the main tkinter window + dir_path = filedialog.askdirectory( + title="Select Export Directory" + ) + root.destroy() + if not dir_path: + print("No export directory selected. Exiting.") + sys.exit(0) + return dir_path + +def export_plc_data(plc, export_base_dir): + """Exports Blocks, UDTs, and Tag Tables from a given PLC.""" + plc_name = plc.get_name() + print(f"\n--- Processing PLC: {plc_name} ---") + + # Define base export path for this PLC + plc_export_dir = os.path.join(export_base_dir, plc_name) + os.makedirs(plc_export_dir, exist_ok=True) + + # --- Export Program Blocks --- + blocks_exported = 0 + blocks_skipped = 0 + print(f"\n[PLC: {plc_name}] Exporting Program Blocks...") + xml_blocks_path = os.path.join(plc_export_dir, "ProgramBlocks_XML") + scl_blocks_path = os.path.join(plc_export_dir, "ProgramBlocks_SCL") + os.makedirs(xml_blocks_path, exist_ok=True) + os.makedirs(scl_blocks_path, exist_ok=True) + print(f" XML Target: {xml_blocks_path}") + print(f" SCL Target: {scl_blocks_path}") + + try: + program_blocks = plc.get_program_blocks() # + print(f" Found {len(program_blocks)} program blocks.") + for block in program_blocks: + block_name = block.get_name() # Assuming get_name() exists + print(f" Processing block: {block_name}...") + try: + if not block.is_consistent(): # + print(f" Compiling block {block_name}...") + block.compile() # + if not block.is_consistent(): + print(f" WARNING: Block {block_name} inconsistent after compile. Skipping.") + blocks_skipped += 1 + continue + + print(f" Exporting {block_name} as XML...") + block.export(target_directory_path=xml_blocks_path, # + export_options=EXPORT_OPTIONS, # + export_format=ts.Enums.ExportFormats.SimaticML, # + keep_folder_structure=KEEP_FOLDER_STRUCTURE) # + + try: + prog_language = block.get_property(name="ProgrammingLanguage") + if prog_language == "SCL": + print(f" Exporting {block_name} as SCL...") + block.export(target_directory_path=scl_blocks_path, + export_options=EXPORT_OPTIONS, + export_format=ts.Enums.ExportFormats.ExternalSource, # + keep_folder_structure=KEEP_FOLDER_STRUCTURE) + except Exception as prop_ex: + print(f" Could not get ProgrammingLanguage for {block_name}. Skipping SCL. Error: {prop_ex}") + + blocks_exported += 1 + except Exception as block_ex: + print(f" ERROR exporting block {block_name}: {block_ex}") + blocks_skipped += 1 + print(f" Program Blocks Export Summary: Exported={blocks_exported}, Skipped/Errors={blocks_skipped}") + except Exception as e: + print(f" ERROR processing Program Blocks: {e}") + traceback.print_exc() + + # --- Export PLC Data Types (UDTs) --- + udts_exported = 0 + udts_skipped = 0 + print(f"\n[PLC: {plc_name}] Exporting PLC Data Types (UDTs)...") + udt_export_path = os.path.join(plc_export_dir, "PlcDataTypes") + os.makedirs(udt_export_path, exist_ok=True) + print(f" Target: {udt_export_path}") + + try: + udts = plc.get_user_data_types() # + print(f" Found {len(udts)} UDTs.") + for udt in udts: + udt_name = udt.get_name() # + print(f" Processing UDT: {udt_name}...") + try: + if not udt.is_consistent(): # + print(f" Compiling UDT {udt_name}...") + udt.compile() # + if not udt.is_consistent(): + print(f" WARNING: UDT {udt_name} inconsistent after compile. Skipping.") + udts_skipped += 1 + continue + + print(f" Exporting {udt_name}...") + udt.export(target_directory_path=udt_export_path, # + export_options=EXPORT_OPTIONS, # + # export_format defaults to SimaticML for UDTs + keep_folder_structure=KEEP_FOLDER_STRUCTURE) # + udts_exported += 1 + except Exception as udt_ex: + print(f" ERROR exporting UDT {udt_name}: {udt_ex}") + udts_skipped += 1 + print(f" UDT Export Summary: Exported={udts_exported}, Skipped/Errors={udts_skipped}") + except Exception as e: + print(f" ERROR processing UDTs: {e}") + traceback.print_exc() + + # --- Export PLC Tag Tables --- + tags_exported = 0 + tags_skipped = 0 + print(f"\n[PLC: {plc_name}] Exporting PLC Tag Tables...") + tags_export_path = os.path.join(plc_export_dir, "PlcTags") + os.makedirs(tags_export_path, exist_ok=True) + print(f" Target: {tags_export_path}") + + try: + tag_tables = plc.get_plc_tag_tables() # + print(f" Found {len(tag_tables)} Tag Tables.") + for table in tag_tables: + table_name = table.get_name() # + print(f" Processing Tag Table: {table_name}...") + try: + # Note: Consistency check might not be available/needed for tag tables like blocks/UDTs + print(f" Exporting {table_name}...") + table.export(target_directory_path=tags_export_path, # + export_options=EXPORT_OPTIONS, # + # export_format defaults to SimaticML for Tag Tables + keep_folder_structure=KEEP_FOLDER_STRUCTURE) # + tags_exported += 1 + except Exception as table_ex: + print(f" ERROR exporting Tag Table {table_name}: {table_ex}") + tags_skipped += 1 + print(f" Tag Table Export Summary: Exported={tags_exported}, Skipped/Errors={tags_skipped}") + except Exception as e: + print(f" ERROR processing Tag Tables: {e}") + traceback.print_exc() + + print(f"\n--- Finished processing PLC: {plc_name} ---") + + +# --- Main Script --- + +if __name__ == "__main__": + configs = load_configuration() + + print("--- TIA Portal Data Exporter (Blocks, UDTs, Tags) ---") + + # 1. Select Files/Folders + project_file = select_project_file() + export_dir = select_export_directory() + + print(f"\nSelected Project: {project_file}") + print(f"Selected Export Directory: {export_dir}") + + portal_instance = None + project_object = None + + try: + # 2. Connect to TIA Portal + print(f"\nConnecting to TIA Portal V{TIA_PORTAL_VERSION}...") + portal_instance = ts.open_portal( + version=TIA_PORTAL_VERSION, + portal_mode=ts.Enums.PortalMode.WithGraphicalUserInterface + ) + print("Connected to TIA Portal.") + print(f"Portal Process ID: {portal_instance.get_process_id()}") # + + # 3. Open Project + print(f"Opening project: {os.path.basename(project_file)}...") + project_object = portal_instance.open_project(project_file_path=project_file) # + if project_object is None: + print("Project might already be open, attempting to get handle...") + project_object = portal_instance.get_project() # + if project_object is None: + raise Exception("Failed to open or get the specified project.") + print("Project opened successfully.") + + # 4. Get PLCs + plcs = project_object.get_plcs() # + if not plcs: + print("No PLC devices found in the project.") + else: + print(f"Found {len(plcs)} PLC(s). Starting export process...") + + # 5. Iterate and Export Data for each PLC + for plc_device in plcs: + export_plc_data(plc=plc_device, export_base_dir=export_dir) + + print("\nExport process completed.") + + except ts.TiaException as tia_ex: + print(f"\nTIA Portal Openness Error: {tia_ex}") + traceback.print_exc() + except FileNotFoundError: + print(f"\nERROR: Project file not found at {project_file}") + except Exception as e: + print(f"\nAn unexpected error occurred: {e}") + traceback.print_exc() + finally: + # 6. Cleanup + if portal_instance: + try: + print("\nClosing TIA Portal...") + portal_instance.close_portal() # + print("TIA Portal closed.") + except Exception as close_ex: + print(f"Error during TIA Portal cleanup: {close_ex}") + + print("\nScript finished.") \ No newline at end of file diff --git a/backend/script_groups/ObtainIOFromProjectTia/x2.py b/backend/script_groups/ObtainIOFromProjectTia/x2.py new file mode 100644 index 0000000..100062b --- /dev/null +++ b/backend/script_groups/ObtainIOFromProjectTia/x2.py @@ -0,0 +1,269 @@ +""" +export_CAx_from_tia : +Script que exporta los datos CAx de un proyecto de TIA Portal y genera un resumen en Markdown. +""" +import tkinter as tk +from tkinter import filedialog +import os +import sys +import traceback +import xml.etree.ElementTree as ET # Library to parse XML (AML) + +script_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +) +sys.path.append(script_root) +from backend.script_utils import load_configuration + +# --- Configuration --- +TIA_PORTAL_VERSION = "18.0" # Target TIA Portal version + +# --- TIA Scripting Import Handling --- +# (Same import handling as the previous script) +if os.getenv('TIA_SCRIPTING'): + sys.path.append(os.getenv('TIA_SCRIPTING')) +else: + pass + +try: + import siemens_tia_scripting as ts +except ImportError: + print("ERROR: Failed to import 'siemens_tia_scripting'.") + print("Ensure TIA Openness, the module, and Python 3.12.X are set up.") + sys.exit(1) +except Exception as e: + print(f"An unexpected error occurred during import: {e}") + traceback.print_exc() + sys.exit(1) + +# --- Functions --- + +def select_project_file(): + """Opens a dialog to select a TIA Portal project file.""" + root = tk.Tk() + root.withdraw() + file_path = filedialog.askopenfilename( + title="Select TIA Portal Project File", + filetypes=[(f"TIA Portal V{TIA_PORTAL_VERSION} Projects", f"*.ap{TIA_PORTAL_VERSION.split('.')[0]}")] + ) + root.destroy() + if not file_path: + print("No project file selected. Exiting.") + sys.exit(0) + return file_path + +def select_output_directory(): + """Opens a dialog to select the output directory.""" + root = tk.Tk() + root.withdraw() + dir_path = filedialog.askdirectory( + title="Select Output Directory for AML and MD files" + ) + root.destroy() + if not dir_path: + print("No output directory selected. Exiting.") + sys.exit(0) + return dir_path + +def find_elements(element, path): + """Helper to find elements using namespaces commonly found in AML.""" + # AutomationML namespaces often vary slightly or might be default + # This basic approach tries common prefixes or no prefix + namespaces = { + '': element.tag.split('}')[0][1:] if '}' in element.tag else '', # Default namespace if present + 'caex': 'http://www.dke.de/CAEX', # Common CAEX namespace + # Add other potential namespaces if needed based on file inspection + } + # Try finding with common prefixes or the default namespace + for prefix, uri in namespaces.items(): + # Construct path with namespace URI if prefix is defined + namespaced_path = path + if prefix: + parts = path.split('/') + namespaced_parts = [f"{{{uri}}}{part}" if part != '.' else part for part in parts] + namespaced_path = '/'.join(namespaced_parts) + + # Try findall with the constructed path + found = element.findall(namespaced_path) + if found: + return found # Return first successful find + + # Fallback: try finding without explicit namespace (might work if default ns is used throughout) + # This might require adjusting the path string itself depending on the XML structure + try: + # Simple attempt without namespace handling if the above fails + return element.findall(path) + except SyntaxError: # Handle potential errors if path isn't valid without namespaces + return [] + + +def parse_aml_to_markdown(aml_file_path, md_file_path): + """Parses the AML file and generates a Markdown summary.""" + print(f"Parsing AML file: {aml_file_path}") + try: + tree = ET.parse(aml_file_path) + root = tree.getroot() + + markdown_lines = ["# Project CAx Data Summary (AutomationML)", ""] + + # Find InstanceHierarchy - usually contains the project structure + # Note: Namespace handling in ElementTree can be tricky. Adjust '{...}' part if needed. + # We will use a helper function 'find_elements' to try common patterns + instance_hierarchies = find_elements(root, './/InstanceHierarchy') # Common CAEX tag + + if not instance_hierarchies: + markdown_lines.append("Could not find InstanceHierarchy in the AML file.") + print("Warning: Could not find InstanceHierarchy element.") + else: + # Assuming the first InstanceHierarchy is the main one + ih = instance_hierarchies[0] + markdown_lines.append(f"## Instance Hierarchy: {ih.get('Name', 'N/A')}") + markdown_lines.append("") + + # Look for InternalElements which represent devices/components + internal_elements = find_elements(ih, './/InternalElement') # Common CAEX tag + + if not internal_elements: + markdown_lines.append("No devices (InternalElement) found in InstanceHierarchy.") + print("Info: No InternalElement tags found under InstanceHierarchy.") + else: + markdown_lines.append(f"Found {len(internal_elements)} device(s)/component(s):") + markdown_lines.append("") + markdown_lines.append("| Name | SystemUnitClass | RefBaseSystemUnitPath | Attributes |") + markdown_lines.append("|---|---|---|---|") + + for elem in internal_elements: + name = elem.get('Name', 'N/A') + ref_path = elem.get('RefBaseSystemUnitPath', 'N/A') # Path to class definition + + # Try to get the class name from the RefBaseSystemUnitPath or SystemUnitClassLib + su_class_path = find_elements(elem, './/SystemUnitClass') # Check direct child first + su_class = su_class_path[0].get('Path', 'N/A') if su_class_path else ref_path.split('/')[-1] # Fallback to last part of path + + attributes_md = "" + attributes = find_elements(elem, './/Attribute') # Find attributes + attr_list = [] + for attr in attributes: + attr_name = attr.get('Name', '') + attr_value_elem = find_elements(attr, './/Value') # Get Value element + attr_value = attr_value_elem[0].text if attr_value_elem and attr_value_elem[0].text else 'N/A' + + # Look for potential IP addresses (common attribute names) + if "Address" in attr_name or "IP" in attr_name: + attr_list.append(f"**{attr_name}**: {attr_value}") + else: + attr_list.append(f"{attr_name}: {attr_value}") + + attributes_md = "
".join(attr_list) if attr_list else "None" + + + markdown_lines.append(f"| {name} | {su_class} | `{ref_path}` | {attributes_md} |") + + # Write to Markdown file + with open(md_file_path, 'w', encoding='utf-8') as f: + f.write("\n".join(markdown_lines)) + print(f"Markdown summary written to: {md_file_path}") + + except ET.ParseError as xml_err: + print(f"ERROR parsing XML file {aml_file_path}: {xml_err}") + with open(md_file_path, 'w', encoding='utf-8') as f: + f.write(f"# Error\n\nFailed to parse AML file: {os.path.basename(aml_file_path)}\n\nError: {xml_err}") + except Exception as e: + print(f"ERROR processing AML file {aml_file_path}: {e}") + traceback.print_exc() + with open(md_file_path, 'w', encoding='utf-8') as f: + f.write(f"# Error\n\nAn unexpected error occurred while processing AML file: {os.path.basename(aml_file_path)}\n\nError: {e}") + + +# --- Main Script --- + +if __name__ == "__main__": + configs = load_configuration() + print("--- TIA Portal Project CAx Exporter and Analyzer ---") + + # 1. Select Files/Folders + project_file = select_project_file() + output_dir = select_output_directory() + + print(f"\nSelected Project: {project_file}") + print(f"Selected Output Directory: {output_dir}") + + # Define output file names + project_base_name = os.path.splitext(os.path.basename(project_file))[0] + aml_file = os.path.join(output_dir, f"{project_base_name}_CAx_Export.aml") + md_file = os.path.join(output_dir, f"{project_base_name}_CAx_Summary.md") + log_file = os.path.join(output_dir, f"{project_base_name}_CAx_Export.log") # Log file for the export process + + print(f"Will export CAx data to: {aml_file}") + print(f"Will generate summary to: {md_file}") + print(f"Export log file: {log_file}") + + + portal_instance = None + project_object = None + cax_export_successful = False + + try: + # 2. Connect to TIA Portal + print(f"\nConnecting to TIA Portal V{TIA_PORTAL_VERSION}...") + portal_instance = ts.open_portal( + version=TIA_PORTAL_VERSION, + portal_mode=ts.Enums.PortalMode.WithGraphicalUserInterface + ) + print("Connected.") + + # 3. Open Project + print(f"Opening project: {os.path.basename(project_file)}...") + project_object = portal_instance.open_project(project_file_path=project_file) + if project_object is None: + project_object = portal_instance.get_project() + if project_object is None: + raise Exception("Failed to open or get the specified project.") + print("Project opened.") + + # 4. Export CAx Data (Project Level) + print(f"Exporting CAx data for the project to {aml_file}...") + # Ensure output directory exists for the log file as well + os.makedirs(os.path.dirname(log_file), exist_ok=True) + + export_result = project_object.export_cax_data(export_file_path=aml_file, log_file_path=log_file) # [cite: 361] + + if export_result: + print("CAx data exported successfully.") + cax_export_successful = True + else: + print("CAx data export failed. Check the log file for details:") + print(f" Log file: {log_file}") + # Write basic error message to MD file if export fails + with open(md_file, 'w', encoding='utf-8') as f: + f.write(f"# Error\n\nCAx data export failed. Check log file: {log_file}") + + + except ts.TiaException as tia_ex: + print(f"\nTIA Portal Openness Error: {tia_ex}") + traceback.print_exc() + except FileNotFoundError: + print(f"\nERROR: Project file not found at {project_file}") + except Exception as e: + print(f"\nAn unexpected error occurred during TIA interaction: {e}") + traceback.print_exc() + finally: + # Close TIA Portal before processing the file (or detach) + if portal_instance: + try: + print("\nClosing TIA Portal...") + portal_instance.close_portal() + print("TIA Portal closed.") + except Exception as close_ex: + print(f"Error during TIA Portal cleanup: {close_ex}") + + # 5. Parse AML and Generate Markdown (only if export was successful) + if cax_export_successful: + if os.path.exists(aml_file): + parse_aml_to_markdown(aml_file, md_file) + else: + print(f"ERROR: Export was reported successful, but AML file not found at {aml_file}") + with open(md_file, 'w', encoding='utf-8') as f: + f.write(f"# Error\n\nExport was reported successful, but AML file not found:\n{aml_file}") + + print("\nScript finished.") \ No newline at end of file diff --git a/backend/script_groups/ObtainIOFromProjectTia/x3.py b/backend/script_groups/ObtainIOFromProjectTia/x3.py new file mode 100644 index 0000000..eb67bed --- /dev/null +++ b/backend/script_groups/ObtainIOFromProjectTia/x3.py @@ -0,0 +1,1029 @@ +""" +export_io_from_CAx : +Script que sirve para exraer los IOs de un proyecto de TIA Portal y +generar un archivo Markdown con la información. +""" +import os +import sys +from tkinter import filedialog +import traceback +from lxml import etree as ET +import json +from pathlib import Path +import re +import math # Needed for ceil + +script_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +) +sys.path.append(script_root) +from backend.script_utils import load_configuration + +# --- extract_aml_data function (Unchanged from v15) --- +def extract_aml_data(root): + """(v15 logic - Unchanged) Extracts data, correcting PLC network association lookup.""" + data = { + "plcs": {}, + "networks": {}, + "devices": {}, + "links_by_source": {}, + "links_by_target": {}, + "connections": [], + } + device_id_to_parent_device = {} + all_elements = root.xpath(".//*[local-name()='InternalElement']") + print( + f"Pass 1: Found {len(all_elements)} InternalElement(s). Populating device dictionary..." + ) + # (Pass 1 logic remains unchanged) + for elem in all_elements: + elem_id = elem.get("ID", None) + if not elem_id: + continue + device_info = { + "name": elem.get("Name", "N/A"), + "id": elem_id, + "class": "N/A", + "type_identifier": "N/A", + "order_number": "N/A", + "type_name": "N/A", + "firmware_version": "N/A", + "position": elem.get("PositionNumber", "N/A"), + "attributes": {}, + "interfaces": [], + "network_nodes": [], + "io_addresses": [], + "children_ids": [ + c.get("ID") + for c in elem.xpath("./*[local-name()='InternalElement']") + if c.get("ID") + ], + "parent_id": ( + elem.xpath("parent::*[local-name()='InternalElement']/@ID")[0] + if elem.xpath("parent::*[local-name()='InternalElement']") + else None + ), + } + class_tag = elem.xpath("./*[local-name()='SystemUnitClass']") + device_info["class"] = ( + class_tag[0].get("Path", elem.get("RefBaseSystemUnitPath", "N/A")) + if class_tag + else elem.get("RefBaseSystemUnitPath", "N/A") + ) + attributes = elem.xpath("./*[local-name()='Attribute']") + for attr in attributes: + attr_name = attr.get("Name", "") + value_elem = attr.xpath("./*[local-name()='Value']/text()") + attr_value = value_elem[0] if value_elem else "" + device_info["attributes"][attr_name] = attr_value + if attr_name == "TypeIdentifier": + device_info["type_identifier"] = attr_value + if "OrderNumber:" in attr_value: + device_info["order_number"] = attr_value.split("OrderNumber:")[ + -1 + ].strip() + elif attr_name == "TypeName": + device_info["type_name"] = attr_value + elif attr_name == "FirmwareVersion": + device_info["firmware_version"] = attr_value + elif attr_name == "Address": + address_parts = attr.xpath("./*[local-name()='Attribute']") + for part in address_parts: + addr_details = { + "area": part.get("Name", "?"), + "start": "N/A", + "length": "N/A", + "type": "N/A", + } + start_val = part.xpath( + "./*[local-name()='Attribute'][@Name='StartAddress']/*[local-name()='Value']/text()" + ) + len_val = part.xpath( + "./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()" + ) + type_val = part.xpath( + "./*[local-name()='Attribute'][@Name='IoType']/*[local-name()='Value']/text()" + ) + if start_val: + addr_details["start"] = start_val[0] + if len_val: + addr_details["length"] = len_val[0] + if type_val: + addr_details["type"] = type_val[0] + if addr_details["start"] != "N/A": + device_info["io_addresses"].append(addr_details) + interfaces = elem.xpath("./*[local-name()='ExternalInterface']") + for interface in interfaces: + device_info["interfaces"].append( + { + "name": interface.get("Name", "N/A"), + "id": interface.get("ID", "N/A"), + "ref_base_class": interface.get("RefBaseClassPath", "N/A"), + } + ) + network_node_elements = elem.xpath( + "./*[local-name()='InternalElement'][*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]]" + ) + if not network_node_elements and elem.xpath( + "./*[local-name()='SupportedRoleClass'][contains(@RefRoleClassPath, 'Node')]" + ): + network_node_elements = [elem] + for node_elem in network_node_elements: + node_id = node_elem.get("ID") + if not node_id: + continue + node_info = { + "id": node_id, + "name": node_elem.get("Name", device_info["name"]), + "type": "N/A", + "address": "N/A", + } + type_attr = node_elem.xpath( + "./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()" + ) + addr_attr = node_elem.xpath( + "./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()" + ) + if type_attr: + node_info["type"] = type_attr[0] + if addr_attr: + node_info["address"] = addr_attr[0] + if node_info["address"] == "N/A": + parent_addr_attr = elem.xpath( + "./*[local-name()='Attribute'][@Name='NetworkAddress']/*[local-name()='Value']/text()" + ) + if parent_addr_attr: + node_info["address"] = parent_addr_attr[0] + if node_info["type"] == "N/A": + parent_type_attr = elem.xpath( + "./*[local-name()='Attribute'][@Name='Type']/*[local-name()='Value']/text()" + ) + if parent_type_attr: + node_info["type"] = parent_type_attr[0] + if node_info["address"] != "N/A": + len_attr = node_elem.xpath( + "./*[local-name()='Attribute'][@Name='Length']/*[local-name()='Value']/text()" + ) + node_info["length"] = len_attr[0] if len_attr else "N/A" + device_info["network_nodes"].append(node_info) + device_id_to_parent_device[node_id] = elem_id + data["devices"][elem_id] = device_info + + print("Pass 2: Identifying PLCs and Networks (Refined v2)...") + plc_ids_found = set() + elem_map = {elem.get("ID"): elem for elem in all_elements if elem.get("ID")} + for dev_id, device in data["devices"].items(): + is_plc = False + plc_order_prefixes = [ + "6ES7 516-3FP03", + "6ES7 151", + "6ES7 31", + "6ES7 41", + "6ES7 51", + ] + if any( + device.get("order_number", "N/A").startswith(prefix) + for prefix in plc_order_prefixes + ): + is_plc = True + elif ( + "CPU" in device.get("type_name", "").upper() + or "PLC" in device.get("type_name", "").upper() + ): + is_plc = True + if is_plc: + parent_id = device.get("parent_id") + is_child_of_plc = False + current_parent = parent_id + while current_parent: + if current_parent in plc_ids_found: + is_child_of_plc = True + break + current_parent = ( + data["devices"].get(current_parent, {}).get("parent_id") + ) + if not is_child_of_plc: + if dev_id not in plc_ids_found: + print( + f" Identified PLC: {device['name']} ({dev_id}) - Type: {device.get('type_name', 'N/A')} OrderNo: {device.get('order_number', 'N/A')}" + ) + device["connected_networks"] = {} + data["plcs"][dev_id] = device + plc_ids_found.add(dev_id) + is_network = False + net_type = "Unknown" + elem = elem_map.get(dev_id) + if elem is not None: + role_classes = elem.xpath( + "./*[local-name()='SupportedRoleClass']/@RefRoleClassPath" + ) + is_subnet_by_role = any("SUBNET" in rc.upper() for rc in role_classes) + if is_subnet_by_role: + is_network = True + for rc in role_classes: + rc_upper = rc.upper() + if "PROFINET" in rc_upper or "ETHERNET" in rc_upper: + net_type = "Ethernet/Profinet" + break + elif "PROFIBUS" in rc_upper: + net_type = "Profibus" + break + if net_type == "Unknown": + if "PROFIBUS" in device["name"].upper(): + net_type = "Profibus" + elif ( + "ETHERNET" in device["name"].upper() + or "PROFINET" in device["name"].upper() + ): + net_type = "Ethernet/Profinet" + if is_network: + if dev_id not in data["networks"]: + print( + f" Identified Network: {device['name']} ({dev_id}) Type: {net_type}" + ) + data["networks"][dev_id] = { + "name": device["name"], + "type": net_type, + "devices_on_net": {}, + } + + print("Pass 3: Processing InternalLinks (Robust Network Mapping & IO)...") + internal_links = root.xpath(".//*[local-name()='InternalLink']") + print(f"Found {len(internal_links)} InternalLink(s).") + conn_id_counter = 0 + for link in internal_links: + conn_id_counter += 1 + link_name = link.get("Name", f"Link_{conn_id_counter}") + side_a_ref = link.get("RefPartnerSideA", "") + side_b_ref = link.get("RefPartnerSideB", "") + side_a_match = re.match(r"([^:]+):?(.*)", side_a_ref) + side_b_match = re.match(r"([^:]+):?(.*)", side_b_ref) + side_a_id = side_a_match.group(1) if side_a_match else "N/A" + side_a_suffix = ( + side_a_match.group(2) + if side_a_match and side_a_match.group(2) + else side_a_id + ) + side_b_id = side_b_match.group(1) if side_b_match else "N/A" + side_b_suffix = ( + side_b_match.group(2) + if side_b_match and side_b_match.group(2) + else side_b_id + ) + network_id, device_node_id = None, None + side_a_is_network = side_a_id in data["networks"] + side_b_is_network = side_b_id in data["networks"] + if side_a_is_network and not side_b_is_network: + network_id, device_node_id = side_a_id, side_b_id + elif side_b_is_network and not side_a_is_network: + network_id, device_node_id = side_b_id, side_a_id + elif side_a_is_network and side_b_is_network: + continue + elif not side_a_is_network and not side_b_is_network: + pass + if network_id and device_node_id: + linked_device_id = device_id_to_parent_device.get(device_node_id) + if not linked_device_id and device_node_id in data["devices"]: + linked_device_id = device_node_id + if linked_device_id and linked_device_id in data["devices"]: + device_info = data["devices"].get(linked_device_id) + if not device_info: + continue + address = "N/A" + node_info_for_addr = data["devices"].get(device_node_id, {}) + for node in node_info_for_addr.get("network_nodes", []): + if node.get("id") == device_node_id: + address = node.get("address", "N/A") + break + if address == "N/A": + address = node_info_for_addr.get("attributes", {}).get( + "NetworkAddress", "N/A" + ) + if address == "N/A": + address = device_info.get("attributes", {}).get( + "NetworkAddress", "N/A" + ) + node_name_for_log = node_info_for_addr.get("name", device_node_id) + print( + f" Mapping Device/Node '{node_name_for_log}' (NodeID:{device_node_id}, Addr:{address}) to Network '{data['networks'][network_id]['name']}'" + ) + data["networks"][network_id]["devices_on_net"][device_node_id] = address + potential_plc_id = None + interface_id = None + interface_info = None + node_check_info = data["devices"].get(device_node_id) + if node_check_info: + if device_node_id in data["plcs"]: + potential_plc_id = device_node_id + else: + interface_id = node_check_info.get("parent_id") + if interface_id and interface_id in data["devices"]: + interface_info = data["devices"].get(interface_id) + if interface_info: + if interface_id in data["plcs"]: + potential_plc_id = interface_id + elif ( + interface_info.get("parent_id") + and interface_info["parent_id"] in data["plcs"] + ): + potential_plc_id = interface_info["parent_id"] + if potential_plc_id: + plc_object = data["plcs"][potential_plc_id] + if "connected_networks" not in plc_object: + plc_object["connected_networks"] = {} + if network_id not in plc_object.get("connected_networks", {}): + print( + f" --> Associating Network '{data['networks'][network_id]['name']}' with PLC '{plc_object.get('name', 'Unknown PLC')}' (via Node '{node_name_for_log}' Addr: {address})" + ) + data["plcs"][potential_plc_id]["connected_networks"][ + network_id + ] = address + elif ( + plc_object["connected_networks"][network_id] == "N/A" + and address != "N/A" + ): + print( + f" --> Updating address for Network '{data['networks'][network_id]['name']}' on PLC '{plc_object.get('name', 'Unknown PLC')}' to: {address}" + ) + data["plcs"][potential_plc_id]["connected_networks"][ + network_id + ] = address + else: + print( + f" Warning: Could not map linked device ID {linked_device_id} (from NodeID {device_node_id}) to any known device." + ) + continue + if not network_id: # Generic links + source_id, source_suffix, target_id, target_suffix = ( + side_a_id, + side_a_suffix, + side_b_id, + side_b_suffix, + ) + if ("Channel" in side_b_suffix or "Parameter" in side_b_suffix) and ( + "Channel" not in side_a_suffix and "Parameter" not in side_a_suffix + ): + source_id, source_suffix, target_id, target_suffix = ( + side_b_id, + side_b_suffix, + side_a_id, + side_a_suffix, + ) + if source_id != "N/A" and target_id != "N/A": + if source_id not in data["links_by_source"]: + data["links_by_source"][source_id] = [] + if target_id not in data["links_by_target"]: + data["links_by_target"][target_id] = [] + link_detail = { + "name": link_name, + "source_id": source_id, + "source_suffix": source_suffix, + "target_id": target_id, + "target_suffix": target_suffix, + "source_device_name": data["devices"] + .get(source_id, {}) + .get("name", source_id), + "target_device_name": data["devices"] + .get(target_id, {}) + .get("name", target_id), + } + data["links_by_source"][source_id].append(link_detail) + data["links_by_target"][target_id].append(link_detail) + data["connections"].append(link_detail) + print("Data extraction and structuring complete.") + return data + + +# --- Helper Function for Recursive IO Search (Unchanged from v20) --- +def find_io_recursively(device_id, project_data): + """Recursively finds all IO addresses under a given device ID.""" + io_list = [] + device_info = project_data.get("devices", {}).get(device_id) + if not device_info: + return io_list + if device_info.get("io_addresses"): + for addr in device_info["io_addresses"]: + io_list.append( + { + "module_name": device_info.get("name", device_id), + "module_pos": device_info.get("position", "N/A"), + **addr, + } + ) + children_ids = device_info.get("children_ids", []) + for child_id in children_ids: + if child_id != device_id: # Basic loop prevention + io_list.extend(find_io_recursively(child_id, project_data)) + return io_list + + +# --- generate_markdown_tree function (v26 - Final Cleaned Version) --- +def generate_markdown_tree(project_data, md_file_path): + """(v26) Generates final hierarchical Markdown with aesthetic improvements.""" + markdown_lines = ["# Project Hardware & IO Summary (Tree View v26)", ""] + + if not project_data or not project_data.get("plcs"): + markdown_lines.append("*No PLC identified in the project data.*") + try: + with open(md_file_path, "w", encoding="utf-8") as f: + f.write("\n".join(markdown_lines)) + print(f"\nMarkdown summary written to: {md_file_path}") + except Exception as e: + print(f"ERROR writing Markdown file {md_file_path}: {e}") + return + + markdown_lines.append(f"Identified {len(project_data['plcs'])} PLC(s).") + + for plc_id, plc_info in project_data.get("plcs", {}).items(): + markdown_lines.append(f"\n## PLC: {plc_info.get('name', plc_id)}") + type_name = plc_info.get("type_name", "N/A") + order_num = plc_info.get("order_number", "N/A") + firmware = plc_info.get("firmware_version", "N/A") + if type_name and type_name != "N/A": + markdown_lines.append(f"- **Type Name:** `{type_name}`") + if order_num and order_num != "N/A": + markdown_lines.append(f"- **Order Number:** `{order_num}`") + if firmware and firmware != "N/A": + markdown_lines.append(f"- **Firmware:** `{firmware}`") + # ID removed + + plc_networks = plc_info.get("connected_networks", {}) + markdown_lines.append("\n- **Networks:**") + if not plc_networks: + markdown_lines.append( + " - *No network connections found associated with this PLC object.*" + ) + else: + sorted_network_items = sorted( + plc_networks.items(), + key=lambda item: project_data.get("networks", {}) + .get(item[0], {}) + .get("name", item[0]), + ) + + for net_id, plc_addr_on_net in sorted_network_items: + net_info = project_data.get("networks", {}).get(net_id) + if not net_info: + markdown_lines.append( + f" - !!! Error: Network info missing for ID {net_id} !!!" + ) + continue + + markdown_lines.append( + f" - ### {net_info.get('name', net_id)} ({net_info.get('type', 'Unknown')})" + ) + markdown_lines.append( + f" - PLC Address on this Net: `{plc_addr_on_net}`" + ) + markdown_lines.append(f" - **Devices on Network:**") + + devices_on_this_net = net_info.get("devices_on_net", {}) + + def sort_key(item): + node_id, node_addr = item + try: + parts = [int(p) for p in re.findall(r"\d+", node_addr)] + return parts + except: + return [float("inf")] + + plc_interface_and_node_ids = set() + for node in plc_info.get("network_nodes", []): + plc_interface_and_node_ids.add(node["id"]) + interface_id = ( + project_data["devices"].get(node["id"], {}).get("parent_id") + ) + if interface_id: + plc_interface_and_node_ids.add(interface_id) + plc_interface_and_node_ids.add(plc_id) + + other_device_items = sorted( + [ + (node_id, node_addr) + for node_id, node_addr in devices_on_this_net.items() + if node_id not in plc_interface_and_node_ids + ], + key=sort_key, + ) + + if not other_device_items: + markdown_lines.append(" - *None (besides PLC interfaces)*") + else: + # --- Display Logic with Sibling IO Aggregation & Aesthetics --- + for node_id, node_addr in other_device_items: + node_info = project_data.get("devices", {}).get(node_id) + if not node_info: + markdown_lines.append( + f" - !!! Error: Node info missing for ID {node_id} Addr: {node_addr} !!!" + ) + continue + + interface_id = node_info.get("parent_id") + interface_info = None + actual_device_id = None + actual_device_info = None + rack_id = None + rack_info = None + if interface_id: + interface_info = project_data.get("devices", {}).get( + interface_id + ) + if interface_info: + actual_device_id = interface_info.get("parent_id") + if actual_device_id: + actual_device_info = project_data.get( + "devices", {} + ).get(actual_device_id) + if actual_device_info: + potential_rack_id = actual_device_info.get( + "parent_id" + ) + if potential_rack_id: + potential_rack_info = project_data.get( + "devices", {} + ).get(potential_rack_id) + if potential_rack_info and ( + "Rack" + in potential_rack_info.get("name", "") + or potential_rack_info.get("position") + is None + ): + rack_id = potential_rack_id + rack_info = potential_rack_info + + display_info_title = ( + actual_device_info + if actual_device_info + else (interface_info if interface_info else node_info) + ) + display_id_title = ( + actual_device_id + if actual_device_info + else (interface_id if interface_info else node_id) + ) + + io_search_root_id = ( + actual_device_id + if actual_device_info + else (interface_id if interface_info else node_id) + ) + io_search_root_info = project_data.get("devices", {}).get( + io_search_root_id + ) + + # Construct Title + display_name = display_info_title.get("name", display_id_title) + via_node_name = node_info.get("name", node_id) + title_str = f"#### {display_name}" + if display_id_title != node_id: + title_str += f" (via {via_node_name} @ `{node_addr}`)" + else: + title_str += f" (@ `{node_addr}`)" + markdown_lines.append(f" - {title_str}") + + # Display Basic Details + markdown_lines.append( + f" - Address (on net): `{node_addr}`" + ) + type_name_disp = display_info_title.get("type_name", "N/A") + order_num_disp = display_info_title.get("order_number", "N/A") + pos_disp = display_info_title.get("position", "N/A") + if type_name_disp and type_name_disp != "N/A": + markdown_lines.append( + f" - Type Name: `{type_name_disp}`" + ) + if order_num_disp and order_num_disp != "N/A": + markdown_lines.append( + f" - Order No: `{order_num_disp}`" + ) + if pos_disp and pos_disp != "N/A": + markdown_lines.append( + f" - Pos (in parent): `{pos_disp}`" + ) + ultimate_parent_id = rack_id + if not ultimate_parent_id and actual_device_info: + ultimate_parent_id = actual_device_info.get("parent_id") + if ( + ultimate_parent_id + and ultimate_parent_id != display_id_title + ): + ultimate_parent_info = project_data.get("devices", {}).get( + ultimate_parent_id + ) + ultimate_parent_name = ( + ultimate_parent_info.get("name", "?") + if ultimate_parent_info + else "?" + ) + markdown_lines.append( + f" - Parent Structure: `{ultimate_parent_name}`" + ) # Removed ID here + + # --- IO Aggregation Logic (from v24) --- + aggregated_io_addresses = [] + parent_structure_id = ( + io_search_root_info.get("parent_id") + if io_search_root_info + else None + ) + io_search_root_name_disp = ( + io_search_root_info.get("name", "?") + if io_search_root_info + else "?" + ) + + if parent_structure_id: + parent_structure_info = project_data.get("devices", {}).get( + parent_structure_id + ) + parent_structure_name = ( + parent_structure_info.get("name", "?") + if parent_structure_info + else "?" + ) + search_title = f"parent '{parent_structure_name}'" + sibling_found_io = False + for dev_scan_id, dev_scan_info in project_data.get( + "devices", {} + ).items(): + if ( + dev_scan_info.get("parent_id") + == parent_structure_id + ): + io_from_sibling = find_io_recursively( + dev_scan_id, project_data + ) + if io_from_sibling: + aggregated_io_addresses.extend(io_from_sibling) + sibling_found_io = True + if ( + not sibling_found_io and not aggregated_io_addresses + ): # Only show message if list still empty + markdown_lines.append( + f" - *No IO Addresses found in modules under {search_title} (ID: {parent_structure_id}).*" + ) + + elif io_search_root_id: + search_title = f"'{io_search_root_name_disp}'" + aggregated_io_addresses = find_io_recursively( + io_search_root_id, project_data + ) + if not aggregated_io_addresses: + markdown_lines.append( + f" - *No IO Addresses found in modules under {search_title} (ID: {io_search_root_id}).*" + ) + else: + markdown_lines.append( + f" - *Could not determine structure to search for IO addresses.*" + ) + # --- End IO Aggregation --- + + # Display aggregated IO Addresses with Siemens format (Cleaned) + if aggregated_io_addresses: + markdown_lines.append( + f" - **IO Addresses (Aggregated from Structure):**" + ) # Removed redundant search root name + sorted_agg_io = sorted( + aggregated_io_addresses, + key=lambda x: ( + ( + int(x.get("module_pos", "9999")) + if x.get("module_pos", "9999").isdigit() + else 9999 + ), + x.get("module_name", ""), + x.get("type", ""), + ( + int(x.get("start", "0")) + if x.get("start", "0").isdigit() + else float("inf") + ), + ), + ) + last_module_id_key = None + for addr_info in sorted_agg_io: + current_module_id_key = ( + addr_info.get("module_name", "?"), + addr_info.get("module_pos", "?"), + ) + if current_module_id_key != last_module_id_key: + markdown_lines.append( + f" - **From Module:** {addr_info.get('module_name','?')} (Pos: {addr_info.get('module_pos','?')})" + ) + last_module_id_key = current_module_id_key + + # --- Siemens IO Formatting (from v25.1 - keep fixes) --- + io_type = addr_info.get("type", "?") + start_str = addr_info.get("start", "?") + length_str = addr_info.get("length", "?") + area_str = addr_info.get("area", "?") + siemens_addr = f"FMT_ERROR" # Default error + length_bits = 0 + try: + start_byte = int(start_str) + length_bits = int(length_str) + length_bytes = math.ceil( + length_bits / 8.0 + ) # Use float division + if length_bits > 0 and length_bytes == 0: + length_bytes = 1 # Handle len < 8 bits + end_byte = start_byte + length_bytes - 1 + prefix = "P?" + if io_type.lower() == "input": + prefix = "PE" + elif io_type.lower() == "output": + prefix = "PA" + siemens_addr = f"{prefix} {start_byte}..{end_byte}" + except Exception: # Catch any error during calc/format + siemens_addr = ( + f"FMT_ERROR({start_str},{length_str})" + ) + + markdown_lines.append( + f" - `{siemens_addr}` (Len={length_bits} bits)" # Simplified output + ) + # --- End Siemens IO Formatting --- + + # IO Connections logic remains the same... + links_from = project_data.get("links_by_source", {}).get( + display_id_title, [] + ) + links_to = project_data.get("links_by_target", {}).get( + display_id_title, [] + ) + io_conns = [] + for link in links_from: + if "channel" in link["source_suffix"].lower(): + target_str = f"{link.get('target_device_name', link['target_id'])}:{link['target_suffix']}" + if link["target_id"] == display_id_title: + target_str = link["target_suffix"] + io_conns.append( + f"`{link['source_suffix']}` → `{target_str}`" + ) + for link in links_to: + if "channel" in link["target_suffix"].lower(): + source_str = f"{link.get('source_device_name', link['source_id'])}:{link['source_suffix']}" + if link["source_id"] == display_id_title: + source_str = link["source_suffix"] + io_conns.append( + f"`{source_str}` → `{link['target_suffix']}`" + ) + if io_conns: + markdown_lines.append( + f" - **IO Connections (Channels):**" + ) + for conn in sorted(list(set(io_conns))): + markdown_lines.append(f" - {conn}") + markdown_lines.append("") # Spacing + # --- *** END Display Logic *** --- + + try: + with open(md_file_path, "w", encoding="utf-8") as f: + f.write("\n".join(markdown_lines)) + print(f"\nMarkdown summary written to: {md_file_path}") + except Exception as e: + print(f"ERROR writing Markdown file {md_file_path}: {e}") + traceback.print_exc() + + +# --- generate_io_upward_tree function (Unchanged from v23) --- +def generate_io_upward_tree(project_data, md_file_path): + """(v23) Generates a debug tree starting from IO addresses upwards.""" + markdown_lines = ["# IO Address Upward Connection Trace (Debug v23)", ""] + if not project_data or not project_data.get("devices"): + markdown_lines.append("*No device data found.*") + try: + with open(md_file_path, "w", encoding="utf-8") as f: + f.write("\n".join(markdown_lines)) + print(f"\nIO upward debug tree written to: {md_file_path}") + except Exception as e: + print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}") + return + node_to_network_map = {} + for net_id, net_info in project_data.get("networks", {}).items(): + net_name = net_info.get("name", "?") + for node_id, node_addr in net_info.get("devices_on_net", {}).items(): + node_to_network_map[node_id] = (net_id, net_name, node_addr) + devices_with_io = [] + for dev_id, dev_info in project_data.get("devices", {}).items(): + if dev_info.get("io_addresses"): + devices_with_io.append((dev_id, dev_info)) + if not devices_with_io: + markdown_lines.append("*No devices with defined IO Addresses found.*") + else: + markdown_lines.append( + f"Found {len(devices_with_io)} device(s)/module(s) with IO addresses. Tracing connections upwards:\n" + ) + devices_with_io.sort( + key=lambda item: ( + ( + int(item[1].get("position", "9999")) + if item[1].get("position", "9999").isdigit() + else 9999 + ), + item[1].get("name", ""), + ) + ) + for dev_id, dev_info in devices_with_io: + markdown_lines.append( + f"## IO Module: {dev_info.get('name', dev_id)} (ID: {dev_id})" + ) + markdown_lines.append(f"- Position: {dev_info.get('position', 'N/A')}") + markdown_lines.append("- IO Addresses:") + for addr in sorted( + dev_info["io_addresses"], + key=lambda x: ( + x.get("type", ""), + ( + int(x.get("start", "0")) + if x.get("start", "0").isdigit() + else float("inf") + ), + ), + ): + markdown_lines.append( + f" - `{addr.get('type','?').ljust(6)} Start={addr.get('start','?').ljust(4)} Len={addr.get('length','?').ljust(3)}` (Area: {addr.get('area','?')})" + ) + markdown_lines.append("- Upward Path:") + current_id = dev_id + current_info = dev_info + indent = " " + path_found = False + ancestor_limit = 15 + count = 0 + while current_id and count < ancestor_limit: + ancestor_name = current_info.get("name", "?") if current_info else "?" + ancestor_pos = ( + current_info.get("position", "N/A") if current_info else "N/A" + ) + markdown_lines.append( + f"{indent}└─ {ancestor_name} (ID: {current_id}, Pos: {ancestor_pos})" + ) + if current_id in node_to_network_map: + net_id, net_name, node_addr = node_to_network_map[current_id] + markdown_lines.append(f"{indent} └─ **Network Connection Point**") + markdown_lines.append( + f"{indent} - Node: {ancestor_name} (ID: {current_id})" + ) + markdown_lines.append( + f"{indent} - Network: {net_name} (ID: {net_id})" + ) + markdown_lines.append(f"{indent} - Address: `{node_addr}`") + plc_connection_found = False + for plc_id_check, plc_info_check in project_data.get( + "plcs", {} + ).items(): + if net_id in plc_info_check.get("connected_networks", {}): + markdown_lines.append( + f"{indent} - **Network associated with PLC:** {plc_info_check.get('name','?')} (ID: {plc_id_check})" + ) + plc_connection_found = True + break + if not plc_connection_found: + markdown_lines.append( + f"{indent} - *Network not directly associated with a known PLC in data.*" + ) + path_found = True + break + if current_id in project_data.get("plcs", {}): + markdown_lines.append(f"{indent} └─ **Is PLC:** {ancestor_name}") + path_found = True + break + parent_id = current_info.get("parent_id") if current_info else None + if parent_id: + current_info = project_data.get("devices", {}).get(parent_id) + if not current_info: + markdown_lines.append( + f"{indent} └─ Parent ID {parent_id} not found. Stopping trace." + ) + break + current_id = parent_id + indent += " " + else: + markdown_lines.append( + f"{indent} └─ Reached top level (no parent)." + ) + break + count += 1 + if count >= ancestor_limit: + markdown_lines.append( + f"{indent} └─ Reached ancestor limit. Stopping trace." + ) + if not path_found: + markdown_lines.append( + f"{indent}└─ *Could not trace path to a known Network Node or PLC.*" + ) + markdown_lines.append("") + try: + with open(md_file_path, "w", encoding="utf-8") as f: + f.write("\n".join(markdown_lines)) + print(f"\nIO upward debug tree written to: {md_file_path}") + except Exception as e: + print(f"ERROR writing IO upward debug tree file {md_file_path}: {e}") + + +# --- process_aml_file function (unchanged from v22) --- +def process_aml_file( + aml_file_path, json_output_path, md_output_path, md_upward_output_path +): + # (Unchanged) + print(f"Processing AML file: {aml_file_path}") + if not os.path.exists(aml_file_path): + print(f"ERROR: Input AML file not found at {aml_file_path}") + return + try: + parser = ET.XMLParser(remove_blank_text=True, huge_tree=True) + tree = ET.parse(aml_file_path, parser) + root = tree.getroot() + project_data = extract_aml_data(root) # v15 extraction + print(f"Generating JSON output: {json_output_path}") + try: + with open(json_output_path, "w", encoding="utf-8") as f: + json.dump(project_data, f, indent=4, default=str) + print(f"JSON data written successfully.") + except Exception as e: + print(f"ERROR writing JSON file {json_output_path}: {e}") + traceback.print_exc() + generate_markdown_tree(project_data, md_output_path) # v26 MD generation + generate_io_upward_tree( + project_data, md_upward_output_path + ) # v23 upward generation + except ET.LxmlError as xml_err: + print(f"ERROR parsing XML file {aml_file_path} with lxml: {xml_err}") + traceback.print_exc() + except Exception as e: + print(f"ERROR processing AML file {aml_file_path}: {e}") + traceback.print_exc() + +def select_cax_file(): + """Opens a dialog to select a CAx (XML) export file.""" + root = tk.Tk() + root.withdraw() + file_path = filedialog.askopenfilename( + title="Select CAx Export File (XML)", + filetypes=[("XML Files", "*.xml"), ("All Files", "*.*")] # Changed filetypes + ) + root.destroy() + if not file_path: + print("No CAx file selected. Exiting.") + sys.exit(0) + return file_path + +def select_output_directory(): + """Opens a dialog to select the output directory.""" + root = tk.Tk() + root.withdraw() + dir_path = filedialog.askdirectory( + title="Select Output Directory for JSON and MD files" # Updated title slightly + ) + root.destroy() + if not dir_path: + print("No output directory selected. Exiting.") + sys.exit(0) + return dir_path + +# --- Main Execution --- +if __name__ == "__main__": + # configs = load_configuration() # Keep if needed, otherwise remove + + script_version = "v27 - User Input/Output Paths" # Updated version + print( + f"--- AML (CAx Export) to Hierarchical JSON and Obsidian MD Converter ({script_version}) ---" + ) + + # 1. Select Input CAx File and Output Directory + cax_file_path = select_cax_file() + output_dir = select_output_directory() + + # Convert paths to Path objects + input_path = Path(cax_file_path) + output_path = Path(output_dir) + + # Check if input file exists + if not input_path.is_file(): + print(f"ERROR: Input file '{input_path}' not found or is not a file.") + sys.exit(1) + + # Ensure output directory exists + output_path.mkdir(parents=True, exist_ok=True) + + # Construct output file paths within the selected output directory + output_json_file = output_path / input_path.with_suffix(".hierarchical.json").name + output_md_file = output_path / input_path.with_name(f"{input_path.stem}_Hardware_Tree.md").name + output_md_upward_file = output_path / input_path.with_name(f"{input_path.stem}_IO_Upward_Debug.md").name + + print(f"Input AML: {input_path.resolve()}") + print(f"Output Directory: {output_path.resolve()}") + print(f"Output JSON: {output_json_file.resolve()}") + print(f"Output Main Tree MD: {output_md_file.resolve()}") + print(f"Output IO Debug Tree MD: {output_md_upward_file.resolve()}") + + # Process the selected file and save outputs to the selected directory + process_aml_file( + str(input_path), + str(output_json_file), + str(output_md_file), + str(output_md_upward_file), + ) + + print("\nScript finished.") \ No newline at end of file diff --git a/backend/script_utils.py b/backend/script_utils.py index ad52c41..a73a1c3 100644 --- a/backend/script_utils.py +++ b/backend/script_utils.py @@ -1,5 +1,6 @@ import os import json +import inspect from typing import Dict, Any @@ -20,8 +21,11 @@ def load_configuration() -> Dict[str, Any]: working_dir = configs.get("working_directory", "") """ try: - # Get directory of the calling script - script_dir = os.path.dirname(os.path.abspath(__file__)) + # Obtener el frame del llamador + caller_frame = inspect.stack()[1] + caller_file = caller_frame.filename + # Obtener el directorio del script que llama a esta función + script_dir = os.path.dirname(os.path.abspath(caller_file)) # Path to the config file config_file_path = os.path.join(script_dir, "script_config.json") diff --git a/config_manager.py b/config_manager.py index 4c4a301..0399976 100644 --- a/config_manager.py +++ b/config_manager.py @@ -423,11 +423,12 @@ class ConfigurationManager: stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, - bufsize=1, + encoding='utf-8', # <--- Añadir explícitamente la codificación UTF-8 + errors='replace', # Opcional: reemplazar caracteres mal formados en lugar de fallar + bufsize=1, # Line buffered env=dict( os.environ, - # SCRIPT_CONFIGS=json.dumps(configs), # Commented out as we now use a file - PYTHONIOENCODING="utf-8", + PYTHONIOENCODING="utf-8", # Mantener esto también es bueno ), ) diff --git a/data/log.txt b/data/log.txt index b5a0582..f606c05 100644 --- a/data/log.txt +++ b/data/log.txt @@ -1,16 +1 @@ -[21:32:28] Configuraciones guardadas en d:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\example_group\script_config.json -[21:32:28] Iniciando ejecución de x1.py -[21:32:33] Configuration file not found: d:\Proyectos\Scripts\ParamManagerScripts\backend\script_config.json -[21:32:33] === Ejecutando Script de Prueba 1 === -[21:32:33] Configuraciones cargadas: -[21:32:33] Nivel 1: {} -[21:32:33] Nivel 2: {} -[21:32:33] Nivel 3: {} -[21:32:33] Simulando procesamiento... -[21:32:33] Progreso: 20% -[21:32:33] Progreso: 40% -[21:32:33] Progreso: 60% -[21:32:33] Progreso: 80% -[21:32:33] Progreso: 100% -[21:32:33] ¡Proceso completado! -[21:32:33] Ejecución completada +[22:54:35] Error: Directorio de trabajo no configurado