""" LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL Este script convierte archivos XML de Siemens LAD/FUP a un formato JSON simplificado. """ # ToUpload/x1_to_json.py # -*- coding: utf-8 -*- import json import argparse import os import sys import traceback import importlib from lxml import etree from lxml.etree import XMLSyntaxError as etree_XMLSyntaxError # Alias para evitar conflicto from collections import defaultdict import copy import time # <-- NUEVO: Para obtener metadatos script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration # Importar funciones comunes y namespaces desde el nuevo módulo de utils try: from parsers.parser_utils import ns, get_multilingual_text, parse_interface_members, adapt_namespaces except ImportError as e: print( f"Error crítico: No se pudieron importar funciones desde parsers.parser_utils: {e}" ) print( "Asegúrate de que el directorio 'parsers' y 'parsers/parser_utils.py' existen y son correctos." ) sys.exit(1) # --- NUEVAS FUNCIONES DE PARSEO para UDT y Tag Table (Sin cambios) --- def parse_udt(udt_element): """Parsea un elemento (UDT).""" print(" -> Detectado: PlcStruct (UDT)") block_data = { "block_name": "UnknownUDT", "block_type": "PlcUDT", # Identificador para x3 "language": "UDT", # Lenguaje específico "interface": {}, "networks": [], # Los UDTs no tienen redes "block_comment": "", } # (Resto de la lógica sin cambios) attribute_list_node = udt_element.xpath("./AttributeList") if attribute_list_node: attr_list = attribute_list_node[0] name_node = attr_list.xpath("./Name/text()") block_data["block_name"] = name_node[0].strip() if name_node else "UnknownUDT" comment_node_list = udt_element.xpath( "./ObjectList/MultilingualText[@CompositionName='Comment']" ) if comment_node_list: block_data["block_comment"] = get_multilingual_text(comment_node_list[0]) else: comment_attr_node = attr_list.xpath( "../ObjectList/MultilingualText[@CompositionName='Comment']" ) if comment_attr_node: block_data["block_comment"] = get_multilingual_text( comment_attr_node[0] ) interface_node_list = udt_element.xpath( "./AttributeList/Interface/iface:Sections/iface:Section[@Name='None']", namespaces=ns, ) if interface_node_list: section_node = interface_node_list[0] members_in_section = section_node.xpath("./iface:Member", namespaces=ns) if members_in_section: block_data["interface"]["None"] = parse_interface_members( members_in_section ) else: print( f"Advertencia: Sección 'None' encontrada en UDT '{block_data['block_name']}' pero sin miembros." ) else: interface_node_direct = udt_element.xpath( ".//iface:Interface/iface:Sections/iface:Section[@Name='None']", namespaces=ns, ) if interface_node_direct: section_node = interface_node_direct[0] members_in_section = section_node.xpath("./iface:Member", namespaces=ns) if members_in_section: block_data["interface"]["None"] = parse_interface_members( members_in_section ) else: print( f"Advertencia: Sección 'None' encontrada directamente en UDT '{block_data['block_name']}' pero sin miembros." ) else: print( f"Advertencia: No se encontró la sección 'None' de la interfaz para UDT '{block_data['block_name']}'." ) if not block_data["interface"]: print( f"Advertencia: No se pudo extraer la interfaz del UDT '{block_data['block_name']}'." ) return block_data def parse_tag_table(tag_table_element): """Parsea un elemento .""" print(" -> Detectado: PlcTagTable") table_data = { "block_name": "UnknownTagTable", "block_type": "PlcTagTable", # Identificador para x3 "language": "TagTable", # Lenguaje específico "tags": [], "networks": [], # Las Tag Tables no tienen redes "block_comment": "", # Las tablas de tags no suelen tener comentario de bloque } # (Resto de la lógica sin cambios) attribute_list_node = tag_table_element.xpath("./AttributeList") if attribute_list_node: name_node = attribute_list_node[0].xpath("./Name/text()") table_data["block_name"] = ( name_node[0].strip() if name_node else "UnknownTagTable" ) tag_elements = tag_table_element.xpath("./ObjectList/SW.Tags.PlcTag") print(f" - Encontrados {len(tag_elements)} tags.") for tag_elem in tag_elements: tag_info = { "name": "UnknownTag", "datatype": "Unknown", "address": None, "comment": "", } tag_attr_list = tag_elem.xpath("./AttributeList") if tag_attr_list: attr_list = tag_attr_list[0] name_node = attr_list.xpath("./Name/text()") tag_info["name"] = name_node[0].strip() if name_node else "UnknownTag" dtype_node = attr_list.xpath("./DataTypeName/text()") tag_info["datatype"] = dtype_node[0].strip() if dtype_node else "Unknown" addr_node = attr_list.xpath("./LogicalAddress/text()") tag_info["address"] = addr_node[0].strip() if addr_node else None comment_node_list = tag_elem.xpath( "./ObjectList/MultilingualText[@CompositionName='Comment']" ) if comment_node_list: tag_info["comment"] = get_multilingual_text(comment_node_list[0]) table_data["tags"].append(tag_info) return table_data # --- Cargador Dinámico de Parsers (sin cambios) --- def load_parsers(parsers_dir="parsers"): parser_map = {} script_dir = os.path.dirname(__file__) parsers_dir_path = os.path.join(script_dir, parsers_dir) if not os.path.isdir(parsers_dir_path): print(f"Error: Directorio de parsers no encontrado: '{parsers_dir_path}'") return parser_map print(f"Cargando parsers desde: '{parsers_dir_path}'") parsers_package = os.path.basename(parsers_dir) for filename in os.listdir(parsers_dir_path): if ( filename.startswith("parse_") and filename.endswith(".py") and filename not in ["__init__.py", "parser_utils.py"] ): module_name_rel = filename[:-3] full_module_name = f"{parsers_package}.{module_name_rel}" try: module = importlib.import_module(full_module_name) if hasattr(module, "get_parser_info") and callable( module.get_parser_info ): parser_info = module.get_parser_info() if ( isinstance(parser_info, dict) and "language" in parser_info and "parser_func" in parser_info ): languages = parser_info["language"] parser_func = parser_info["parser_func"] if isinstance(languages, list) and callable(parser_func): for lang in languages: lang_upper = lang.upper() if lang_upper in parser_map: print( f" Advertencia: Parser para '{lang_upper}' en {full_module_name} sobrescribe definición anterior." ) parser_map[lang_upper] = parser_func #print( # f" - Cargado parser para '{lang_upper}' desde {module_name_rel}.py" #) else: print( f" Advertencia: Formato inválido en get_parser_info de {full_module_name}." ) else: print( f" Advertencia: get_parser_info en {full_module_name} no devolvió el diccionario esperado." ) else: print( f" Advertencia: Módulo {module_name_rel}.py no tiene la función 'get_parser_info'." ) except ImportError as e: print(f"Error importando {full_module_name}: {e}") except Exception as e: print(f"Error procesando {full_module_name}: {e}") traceback.print_exc() print(f"\nTotal de lenguajes con parser cargado: {len(parser_map)}") print(f"Lenguajes soportados: {list(parser_map.keys())}") return parser_map # <-- MODIFICADO: parser_map ya no es un argumento, se carga dentro --> def convert_xml_to_json(xml_filepath, json_filepath): """ Convierte XML a JSON, detectando tipo, añadiendo metadatos del XML y extrayendo comentarios/títulos de red de forma centralizada. (v3) Carga los parsers necesarios internamente. """ print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") # <-- NUEVO: Cargar parsers aquí --> print("Cargando parsers de red...") parser_map = load_parsers() # <-- FIN NUEVO --> if not os.path.exists(xml_filepath): print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'") return False # Obtener metadatos del XML xml_mod_time = None xml_size = None try: xml_mod_time = os.path.getmtime(xml_filepath) xml_size = os.path.getsize(xml_filepath) print(f"Metadatos XML: ModTime={xml_mod_time}, Size={xml_size}") except OSError as e: print(f"Advertencia: No se pudieron obtener metadatos de '{xml_filepath}': {e}") try: print("Paso 1: Parseando archivo XML...") # Usar recover=True para intentar parsear incluso con errores menores parser = etree.XMLParser(remove_blank_text=True, recover=True) tree = etree.parse(xml_filepath, parser) root = tree.getroot() # Ajustar namespaces dinámicamente para soportar distintas versiones de TIA try: adapt_namespaces(root) except Exception as e_ns: print(f"Advertencia: No se pudo adaptar namespaces dinámicamente: {e_ns}") print("Paso 1: Parseo XML completado.") result = None the_block = None # Nodo principal (UDT, TagTable, o SW.Blocks.*) print("Paso 2: Detectando tipo de objeto principal...") # Intentar UDT udt_element = root.find(".//SW.Types.PlcStruct", namespaces=root.nsmap) if udt_element is not None: the_block = udt_element result = parse_udt(the_block) # Llamar a la función de parseo de UDT # Intentar Tag Table si no es UDT if result is None: tag_table_element = root.find(".//SW.Tags.PlcTagTable", namespaces=root.nsmap) if tag_table_element is not None: the_block = tag_table_element result = parse_tag_table(the_block) # Llamar a la función de parseo de TagTable # Intentar Bloques (OB, FC, FB, DB) si no es UDT ni TagTable if result is None: print("Paso 2: No es UDT ni Tag Table. Buscando SW.Blocks.* ...") # Busca cualquier bloque que empiece con SW.Blocks. y tenga un nodo hijo AttributeList block_list = root.xpath( "//*[starts-with(local-name(), 'SW.Blocks.') and ./AttributeList]" ) if block_list: the_block = block_list[0] # Tomar el primer bloque encontrado block_tag_name = etree.QName(the_block.tag).localname # Nombre del tag (ej. SW.Blocks.OB) block_type_map = { "SW.Blocks.FC": "FC", "SW.Blocks.FB": "FB", "SW.Blocks.GlobalDB": "GlobalDB", "SW.Blocks.OB": "OB", "SW.Blocks.InstanceDB": "InstanceDB" # <-- ADDED: Recognize InstanceDB } block_type_found = block_type_map.get(block_tag_name, "UnknownBlockType") print(f"Paso 2b: Bloque {block_tag_name} (Tipo: {block_type_found}) encontrado (ID={the_block.get('ID')}).") # --- Extraer información del Bloque (FC, FB, OB, DB) --- print("Paso 3: Extrayendo atributos del bloque...") attribute_list_node = the_block.xpath("./AttributeList") # Buscar hijo directo block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown" instance_of_name_val = None # <-- NUEVO: Para InstanceDB instance_of_type_val = None # <-- NUEVO: Para InstanceDB block_comment_val = "" if attribute_list_node: attr_list = attribute_list_node[0] name_node = attr_list.xpath("./Name/text()") block_name_val = name_node[0].strip() if name_node else block_name_val num_node = attr_list.xpath("./Number/text()") try: block_number_val = int(num_node[0]) if num_node else None except (ValueError, TypeError): block_number_val = None lang_node = attr_list.xpath("./ProgrammingLanguage/text()") # Asignar lenguaje por defecto si no se encuentra block_lang_val = lang_node[0].strip() if lang_node else \ ("DB" if block_type_found in ["GlobalDB", "InstanceDB"] else "Unknown") # <-- MODIFIED: Include InstanceDB for DB language default # <-- NUEVO: Extraer info de instancia si es InstanceDB --> if block_type_found == "InstanceDB": inst_name_node = attr_list.xpath("./InstanceOfName/text()") instance_of_name_val = inst_name_node[0].strip() if inst_name_node else None inst_type_node = attr_list.xpath("./InstanceOfType/text()") # Generalmente 'FB' instance_of_type_val = inst_type_node[0].strip() if inst_type_node else None print(f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje Bloque='{block_lang_val}'") # Extraer comentario del bloque (puede estar en AttributeList o ObjectList) comment_node_attr = attr_list.xpath("./Comment") if comment_node_attr: block_comment_val = get_multilingual_text(comment_node_attr[0]) else: comment_node_obj = the_block.xpath("./ObjectList/MultilingualText[@CompositionName='Comment']") if comment_node_obj: block_comment_val = get_multilingual_text(comment_node_obj[0]) print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'") else: print(f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}.") if block_type_found in ["GlobalDB", "InstanceDB"]: block_lang_val = "DB" # Default para DB/InstanceDB # <-- MODIFIED: Include InstanceDB # Inicializar diccionario de resultado para el bloque result = { "block_name": block_name_val, "block_number": block_number_val, "language": block_lang_val, "block_type": block_type_found, "block_comment": block_comment_val, "interface": {}, "networks": [] } # --- Extraer Interfaz del Bloque --- print("Paso 4: Extrayendo la interfaz del bloque...") interface_node = None if attribute_list_node: interface_node_list = attribute_list_node[0].xpath("./Interface") if interface_node_list: interface_node = interface_node_list[0] if interface_node is not None: # Buscar secciones dentro de la interfaz usando el namespace 'iface' all_sections = interface_node.xpath(".//iface:Section", namespaces=ns) if all_sections: processed_sections = set() for section in all_sections: section_name = section.get("Name") if not section_name or section_name in processed_sections: continue members_in_section = section.xpath("./iface:Member", namespaces=ns) if members_in_section: result["interface"][section_name] = parse_interface_members(members_in_section) processed_sections.add(section_name) else: print("Advertencia: Nodo Interface no contiene secciones .") if not result["interface"]: print("Advertencia: Interface encontrada pero sin secciones procesables.") elif block_type_found == "GlobalDB": # Buscar Static directamente si es DB y no hay nodo Interface static_members = the_block.xpath(".//iface:Section[@Name='Static']/iface:Member", namespaces=ns) if static_members: print("Paso 4: Encontrada sección Static para GlobalDB (sin nodo Interface explícito).") result["interface"]["Static"] = parse_interface_members(static_members) else: print("Advertencia: No se encontró sección 'Static' para GlobalDB.") else: print(f"Advertencia: No se encontró para bloque {block_type_found}.") if not result["interface"]: print("Advertencia: No se pudo extraer información de la interfaz.") # --- Procesar Redes (CompileUnits) --- if block_type_found not in ["GlobalDB", "InstanceDB"]: # DBs/InstanceDBs no tienen redes ejecutables # <-- MODIFIED: Include InstanceDB print("Paso 5: Buscando y PROCESANDO redes (CompileUnits)...") networks_processed_count = 0 result["networks"] = [] # Asegurar que esté inicializado object_list_node = the_block.xpath("./ObjectList") if object_list_node: compile_units = object_list_node[0].xpath("./SW.Blocks.CompileUnit") print(f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit.") for network_elem in compile_units: # network_elem es el nodo networks_processed_count += 1 network_id = network_elem.get("ID") network_lang = "Unknown" if not network_id: print("Advertencia: CompileUnit sin ID, saltando.") continue # Determinar lenguaje net_attr_list = network_elem.xpath("./AttributeList") if net_attr_list: lang_node = net_attr_list[0].xpath("./ProgrammingLanguage/text()") if lang_node: network_lang = lang_node[0].strip() elif result["language"] != "Unknown": network_lang = result["language"] print(f" - Procesando Red ID={network_id}, Lenguaje Red={network_lang}") parser_func = parser_map.get(network_lang.upper()) parsed_network_data = None if parser_func: try: parsed_network_data = parser_func(network_elem) except Exception as e_parse: print(f" ERROR durante el parseo de Red {network_id} ({network_lang}): {e_parse}") parsed_network_data = {"id": network_id, "language": network_lang, "logic": [], "error": f"Parser failed: {e_parse}"} else: print(f" Advertencia: Lenguaje de red '{network_lang}' no soportado.") parsed_network_data = {"id": network_id, "language": network_lang, "logic": [], "error": f"Unsupported language: {network_lang}"} # --- LÓGICA CORREGIDA: Asegurar que se procese incluso si el parser falló --- if parsed_network_data is None: # Si el parser falló TANTO que ni devolvió un dict parsed_network_data = {"id": network_id, "language": network_lang, "logic": [], "error": "Parser function returned None"} # Extraer Título y Comentario de la red SIEMPRE try: title_element = network_elem.xpath("./ObjectList/MultilingualText[@CompositionName='Title']") parsed_network_data["title"] = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}" comment_elem_net = network_elem.xpath("./ObjectList/MultilingualText[@CompositionName='Comment']") parsed_network_data["comment"] = get_multilingual_text(comment_elem_net[0]) if comment_elem_net else "" except Exception as e_comment: print(f" ERROR extrayendo Título/Comentario para Red {network_id}: {e_comment}") # Añadir valores por defecto si falla la extracción parsed_network_data["title"] = f"Network {network_id}" # Asegurar que title exista parsed_network_data["comment"] = f"// Error extrayendo comentario: {e_comment}" # Asegurar que comment exista result["networks"].append(parsed_network_data) # Añadir SIEMPRE a la listaa if networks_processed_count == 0: print(f"Advertencia: ObjectList para {block_type_found} sin SW.Blocks.CompileUnit.") else: print(f"Advertencia: No se encontró ObjectList para el bloque {block_type_found}.") else: print(f"Paso 5: Saltando procesamiento de redes para {block_type_found}.") # <-- MODIFIED: Updated message else: # No se encontró ningún bloque SW.Blocks.* print("Error Crítico: No se encontró el elemento raíz del bloque () después de descartar UDT/TagTable.") # <-- MODIFIED: Updated message # --- Fin del manejo de Bloques --- # --- Escritura del JSON Final --- if result: # Añadir metadatos XML al diccionario final if xml_mod_time is not None: result["source_xml_mod_time"] = xml_mod_time if xml_size is not None: result["source_xml_size"] = xml_size print("Paso 6: Escribiendo el resultado en el archivo JSON...") # Advertencias finales si faltan partes clave if result.get("block_type") not in ["PlcUDT", "PlcTagTable"] and not result.get("interface"): print("ADVERTENCIA FINAL: 'interface' está vacía en el JSON.") if result.get("block_type") not in ["PlcUDT", "PlcTagTable", "GlobalDB", "InstanceDB"] and not result.get("networks"): print("ADVERTENCIA FINAL: 'networks' está vacía en el JSON.") # <-- MODIFIED: Include InstanceDB # Escribir el archivo JSON try: with open(json_filepath, "w", encoding="utf-8") as f: json.dump(result, f, indent=4, ensure_ascii=False) print("Paso 6: Escritura JSON completada.") print(f"Conversión finalizada. JSON guardado en: '{os.path.relpath(json_filepath)}'") return True # Indicar éxito except IOError as e: print(f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"); return False except TypeError as e: print(f"Error Crítico: Problema al serializar a JSON. Error: {e}"); return False else: # Esto no debería ocurrir si se manejaron todos los tipos o hubo error antes print("Error Crítico: No se generó ningún resultado para el archivo XML.") return False except etree_XMLSyntaxError as e: # Usar alias print(f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}") return False except Exception as e: print(f"Error Crítico: Error inesperado durante la conversión: {e}") traceback.print_exc() return False # --- Punto de Entrada Principal (__main__) --- if __name__ == "__main__": # Lógica para ejecución standalone try: import tkinter as tk from tkinter import filedialog except ImportError: print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr) # No salimos, podríamos intentar obtener el path de otra forma o fallar más adelante tk = None # Marcar como no disponible xml_input_file = "" if tk: root = tk.Tk() root.withdraw() # Ocultar la ventana principal de Tkinter print("Por favor, selecciona el archivo XML de entrada...") xml_input_file = filedialog.askopenfilename( title="Selecciona el archivo XML de entrada", filetypes=[("XML files", "*.xml"), ("All files", "*.*")] ) root.destroy() # Cerrar Tkinter if not xml_input_file: print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr) # sys.exit(1) # No usar sys.exit aquí else: print( f"Archivo XML seleccionado: {xml_input_file}" ) # Calcular ruta de salida JSON xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0] base_dir = os.path.dirname(xml_input_file) output_dir = os.path.join(base_dir, "parsing") os.makedirs(output_dir, exist_ok=True) json_output_file = os.path.join(output_dir, f"{xml_filename_base}.json") print( f"(x1 - Standalone) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'" ) # Llamar a la función principal (que ahora carga los parsers) success = convert_xml_to_json(xml_input_file, json_output_file) if success: print("\nConversión completada exitosamente.") else: print(f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.", file=sys.stderr)