From 6a06f321760a526a901eb813964e2e123200df33 Mon Sep 17 00:00:00 2001 From: Miguel Date: Sun, 20 Apr 2025 19:42:59 +0200 Subject: [PATCH] . --- x0_main.py | 97 +++++++---- x1_to_json.py | 406 ++++++++++++++++++++++++++++++++------------- x2_process.py | 226 ++++++++++++++++++------- x3_generate_scl.py | 55 ++++-- 4 files changed, 565 insertions(+), 219 deletions(-) diff --git a/x0_main.py b/x0_main.py index 322bd7a..2feb30b 100644 --- a/x0_main.py +++ b/x0_main.py @@ -5,6 +5,7 @@ import sys import locale import glob + # (Función get_console_encoding y variable CONSOLE_ENCODING como antes) def get_console_encoding(): """Obtiene la codificación preferida de la consola, con fallback.""" @@ -12,11 +13,13 @@ def get_console_encoding(): return locale.getpreferredencoding(False) except Exception: # Fallback común en Windows si falla getpreferredencoding - return "cp1252" # O prueba con 'utf-8' si cp1252 da problemas + return "cp1252" # O prueba con 'utf-8' si cp1252 da problemas + CONSOLE_ENCODING = get_console_encoding() # print(f"Detected console encoding: {CONSOLE_ENCODING}") + # (Función run_script como antes, usando CONSOLE_ENCODING) def run_script(script_name, xml_arg): """Runs a given script with the specified XML file argument.""" @@ -24,17 +27,21 @@ def run_script(script_name, xml_arg): script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), script_name) # Usar la ruta absoluta al ejecutable de Python actual python_executable = sys.executable - command = [python_executable, script_path, xml_arg] # Usar la ruta absoluta de python + command = [ + python_executable, + script_path, + xml_arg, + ] # Usar la ruta absoluta de python print(f"\n--- Running {script_name} with argument: {xml_arg} ---") try: # Ejecutar el proceso hijo result = subprocess.run( command, - check=True, # Lanza excepción si el script falla (return code != 0) - capture_output=True,# Captura stdout y stderr - text=True, # Decodifica stdout/stderr como texto - encoding=CONSOLE_ENCODING, # Usa la codificación detectada - errors='replace' # Reemplaza caracteres no decodificables + check=True, # Lanza excepción si el script falla (return code != 0) + capture_output=True, # Captura stdout y stderr + text=True, # Decodifica stdout/stderr como texto + encoding=CONSOLE_ENCODING, # Usa la codificación detectada + errors="replace", # Reemplaza caracteres no decodificables ) # Imprimir stdout y stderr si no están vacíos @@ -45,20 +52,28 @@ def run_script(script_name, xml_arg): print(stdout_clean) if stderr_clean: # Imprimir stderr claramente para errores del script hijo - print(f"--- Stderr ({script_name}) ---", file=sys.stderr) # Imprimir en stderr + print( + f"--- Stderr ({script_name}) ---", file=sys.stderr + ) # Imprimir en stderr print(stderr_clean, file=sys.stderr) print("--------------------------", file=sys.stderr) print(f"--- {script_name} finished successfully ---") - return True # Indicar éxito + return True # Indicar éxito except FileNotFoundError: # Error si el script python o el ejecutable no se encuentran - print(f"Error: Script '{script_path}' or Python executable '{python_executable}' not found.", file=sys.stderr) + print( + f"Error: Script '{script_path}' or Python executable '{python_executable}' not found.", + file=sys.stderr, + ) return False except subprocess.CalledProcessError as e: # Error si el script hijo devuelve un código de error (ej., sys.exit(1)) - print(f"Error running {script_name}: Script returned non-zero exit code {e.returncode}.", file=sys.stderr) + print( + f"Error running {script_name}: Script returned non-zero exit code {e.returncode}.", + file=sys.stderr, + ) # Decodificar e imprimir stdout/stderr del proceso fallido stdout_decoded = e.stdout.strip() if e.stdout else "" @@ -71,14 +86,18 @@ def run_script(script_name, xml_arg): print(f"--- Stderr ({script_name}) ---", file=sys.stderr) print(stderr_decoded, file=sys.stderr) print("--------------------------", file=sys.stderr) - return False # Indicar fallo + return False # Indicar fallo except Exception as e: # Otros errores inesperados - print(f"An unexpected error occurred while running {script_name}: {e}", file=sys.stderr) + print( + f"An unexpected error occurred while running {script_name}: {e}", + file=sys.stderr, + ) # Imprimir traceback para depuración import traceback + traceback.print_exc(file=sys.stderr) - return False # Indicar fallo + return False # Indicar fallo # --- NO SE NECESITA select_xml_file() si procesamos todos --- @@ -95,20 +114,27 @@ if __name__ == "__main__": # Verificar si el directorio 'XML Project' existe if not os.path.isdir(xml_project_dir): - print(f"Error: El directorio '{xml_project_dir}' no existe o no es un directorio.", file=sys.stderr) - print("Por favor, crea el directorio 'XML Project' en la misma carpeta que este script y coloca tus archivos XML dentro.") - sys.exit(1) # Salir con error + print( + f"Error: El directorio '{xml_project_dir}' no existe o no es un directorio.", + file=sys.stderr, + ) + print( + "Por favor, crea el directorio 'XML Project' en la misma carpeta que este script y coloca tus archivos XML dentro." + ) + sys.exit(1) # Salir con error # Buscar todos los archivos .xml recursivamente search_pattern = os.path.join(xml_project_dir, "**", "*.xml") xml_files_found = glob.glob(search_pattern, recursive=True) if not xml_files_found: - print(f"No se encontraron archivos XML en '{xml_project_dir}' o sus subdirectorios.") - sys.exit(0) # Salir limpiamente si no hay archivos + print( + f"No se encontraron archivos XML en '{xml_project_dir}' o sus subdirectorios." + ) + sys.exit(0) # Salir limpiamente si no hay archivos print(f"Se encontraron {len(xml_files_found)} archivos XML para procesar:") - xml_files_found.sort() # Ordenar para consistencia + xml_files_found.sort() # Ordenar para consistencia for xml_file in xml_files_found: print(f" - {os.path.relpath(xml_file, script_dir)}") @@ -128,24 +154,35 @@ if __name__ == "__main__": # Usar la ruta absoluta para los scripts hijos absolute_xml_filepath = os.path.abspath(xml_filepath) - + # Derivar nombres esperados para archivos intermedios (para depuración) xml_base_name = os.path.splitext(os.path.basename(absolute_xml_filepath))[0] xml_dir = os.path.dirname(absolute_xml_filepath) parsing_dir = os.path.join(xml_dir, "parsing") expected_json_file = os.path.join(parsing_dir, f"{xml_base_name}.json") - expected_processed_json = os.path.join(parsing_dir, f"{xml_base_name}_processed.json") + expected_processed_json = os.path.join( + parsing_dir, f"{xml_base_name}_processed.json" + ) # Ejecutar los scripts en secuencia success = True if not run_script(script1, absolute_xml_filepath): - print(f"\nPipeline falló en el script '{script1}' para el archivo: {relative_path}", file=sys.stderr) + print( + f"\nPipeline falló en el script '{script1}' para el archivo: {relative_path}", + file=sys.stderr, + ) success = False elif not run_script(script2, absolute_xml_filepath): - print(f"\nPipeline falló en el script '{script2}' para el archivo: {relative_path}", file=sys.stderr) + print( + f"\nPipeline falló en el script '{script2}' para el archivo: {relative_path}", + file=sys.stderr, + ) success = False elif not run_script(script3, absolute_xml_filepath): - print(f"\nPipeline falló en el script '{script3}' para el archivo: {relative_path}", file=sys.stderr) + print( + f"\nPipeline falló en el script '{script3}' para el archivo: {relative_path}", + file=sys.stderr, + ) success = False # Actualizar contadores y mostrar estado @@ -154,12 +191,16 @@ if __name__ == "__main__": processed_count += 1 else: failed_count += 1 - print(f"--- Pipeline falló para: {relative_path} ---", file=sys.stderr) # Indicar fallo + print( + f"--- Pipeline falló para: {relative_path} ---", file=sys.stderr + ) # Indicar fallo # --- PARTE 3: RESUMEN FINAL --- print("\n--- Resumen Final del Procesamiento ---") print(f"Total de archivos XML encontrados: {len(xml_files_found)}") - print(f"Archivos procesados exitosamente por el pipeline completo: {processed_count}") + print( + f"Archivos procesados exitosamente por el pipeline completo: {processed_count}" + ) print(f"Archivos que fallaron en algún punto del pipeline: {failed_count}") print("---------------------------------------") @@ -169,4 +210,4 @@ if __name__ == "__main__": else: sys.exit(0) -# --- FIN: Se elimina la lógica redundante que venía después del bucle --- \ No newline at end of file +# --- FIN: Se elimina la lógica redundante que venía después del bucle --- diff --git a/x1_to_json.py b/x1_to_json.py index b909a01..aad782e 100644 --- a/x1_to_json.py +++ b/x1_to_json.py @@ -24,15 +24,16 @@ except ImportError as e: # --- NUEVAS FUNCIONES DE PARSEO para UDT y Tag Table --- + def parse_udt(udt_element): """Parsea un elemento (UDT).""" print(" -> Detectado: PlcStruct (UDT)") block_data = { "block_name": "UnknownUDT", - "block_type": "PlcUDT", # Identificador para x3 - "language": "UDT", # Lenguaje específico + "block_type": "PlcUDT", # Identificador para x3 + "language": "UDT", # Lenguaje específico "interface": {}, - "networks": [], # Los UDTs no tienen redes + "networks": [], # Los UDTs no tienen redes "block_comment": "", } @@ -43,66 +44,87 @@ def parse_udt(udt_element): name_node = attr_list.xpath("./Name/text()") block_data["block_name"] = name_node[0].strip() if name_node else "UnknownUDT" # Comentario del UDT - comment_node_list = udt_element.xpath("./ObjectList/MultilingualText[@CompositionName='Comment']") + comment_node_list = udt_element.xpath( + "./ObjectList/MultilingualText[@CompositionName='Comment']" + ) if comment_node_list: - block_data["block_comment"] = get_multilingual_text(comment_node_list[0]) - else: # Fallback - comment_attr_node = attr_list.xpath("../ObjectList/MultilingualText[@CompositionName='Comment']") # Buscar desde el padre - if comment_attr_node : - block_data["block_comment"] = get_multilingual_text(comment_attr_node[0]) - + block_data["block_comment"] = get_multilingual_text(comment_node_list[0]) + else: # Fallback + comment_attr_node = attr_list.xpath( + "../ObjectList/MultilingualText[@CompositionName='Comment']" + ) # Buscar desde el padre + if comment_attr_node: + block_data["block_comment"] = get_multilingual_text( + comment_attr_node[0] + ) # Extraer interfaz (miembros) # La interfaz de un UDT suele estar directamente en
interface_node_list = udt_element.xpath( - "./AttributeList/Interface/iface:Sections/iface:Section[@Name='None']", namespaces=ns + "./AttributeList/Interface/iface:Sections/iface:Section[@Name='None']", + namespaces=ns, ) if interface_node_list: section_node = interface_node_list[0] members_in_section = section_node.xpath("./iface:Member", namespaces=ns) if members_in_section: # Usar la función existente para parsear miembros - block_data["interface"]["None"] = parse_interface_members(members_in_section) + block_data["interface"]["None"] = parse_interface_members( + members_in_section + ) else: - print(f"Advertencia: Sección 'None' encontrada en UDT '{block_data['block_name']}' pero sin miembros.") + print( + f"Advertencia: Sección 'None' encontrada en UDT '{block_data['block_name']}' pero sin miembros." + ) else: - # Intentar buscar interfaz directamente si no está en AttributeList (menos común) - interface_node_direct = udt_element.xpath( - ".//iface:Interface/iface:Sections/iface:Section[@Name='None']", namespaces=ns - ) - if interface_node_direct: - section_node = interface_node_direct[0] - members_in_section = section_node.xpath("./iface:Member", namespaces=ns) - if members_in_section: - block_data["interface"]["None"] = parse_interface_members(members_in_section) - else: - print(f"Advertencia: Sección 'None' encontrada directamente en UDT '{block_data['block_name']}' pero sin miembros.") - else: - print(f"Advertencia: No se encontró la sección 'None' de la interfaz para UDT '{block_data['block_name']}'.") - + # Intentar buscar interfaz directamente si no está en AttributeList (menos común) + interface_node_direct = udt_element.xpath( + ".//iface:Interface/iface:Sections/iface:Section[@Name='None']", + namespaces=ns, + ) + if interface_node_direct: + section_node = interface_node_direct[0] + members_in_section = section_node.xpath("./iface:Member", namespaces=ns) + if members_in_section: + block_data["interface"]["None"] = parse_interface_members( + members_in_section + ) + else: + print( + f"Advertencia: Sección 'None' encontrada directamente en UDT '{block_data['block_name']}' pero sin miembros." + ) + else: + print( + f"Advertencia: No se encontró la sección 'None' de la interfaz para UDT '{block_data['block_name']}'." + ) if not block_data["interface"]: - print(f"Advertencia: No se pudo extraer la interfaz del UDT '{block_data['block_name']}'.") + print( + f"Advertencia: No se pudo extraer la interfaz del UDT '{block_data['block_name']}'." + ) return block_data + def parse_tag_table(tag_table_element): """Parsea un elemento .""" print(" -> Detectado: PlcTagTable") table_data = { "block_name": "UnknownTagTable", - "block_type": "PlcTagTable", # Identificador para x3 - "language": "TagTable", # Lenguaje específico + "block_type": "PlcTagTable", # Identificador para x3 + "language": "TagTable", # Lenguaje específico "tags": [], - "networks": [], # Las Tag Tables no tienen redes - "block_comment": "", # Las tablas de tags no suelen tener comentario de bloque + "networks": [], # Las Tag Tables no tienen redes + "block_comment": "", # Las tablas de tags no suelen tener comentario de bloque } # Extraer nombre de la tabla attribute_list_node = tag_table_element.xpath("./AttributeList") if attribute_list_node: name_node = attribute_list_node[0].xpath("./Name/text()") - table_data["block_name"] = name_node[0].strip() if name_node else "UnknownTagTable" + table_data["block_name"] = ( + name_node[0].strip() if name_node else "UnknownTagTable" + ) # Extraer tags tag_elements = tag_table_element.xpath("./ObjectList/SW.Tags.PlcTag") @@ -112,7 +134,7 @@ def parse_tag_table(tag_table_element): "name": "UnknownTag", "datatype": "Unknown", "address": None, - "comment": "" + "comment": "", } tag_attr_list = tag_elem.xpath("./AttributeList") if tag_attr_list: @@ -125,7 +147,9 @@ def parse_tag_table(tag_table_element): tag_info["address"] = addr_node[0].strip() if addr_node else None # Extraer comentario del tag - comment_node_list = tag_elem.xpath("./ObjectList/MultilingualText[@CompositionName='Comment']") + comment_node_list = tag_elem.xpath( + "./ObjectList/MultilingualText[@CompositionName='Comment']" + ) if comment_node_list: tag_info["comment"] = get_multilingual_text(comment_node_list[0]) @@ -133,6 +157,7 @@ def parse_tag_table(tag_table_element): return table_data + # --- Cargador Dinámico de Parsers (sin cambios) --- def load_parsers(parsers_dir="parsers"): """ @@ -145,7 +170,7 @@ def load_parsers(parsers_dir="parsers"): parsers_dir_path = os.path.join(script_dir, parsers_dir) if not os.path.isdir(parsers_dir_path): print(f"Error: Directorio de parsers no encontrado: '{parsers_dir_path}'") - return parser_map # Devuelve mapa vacío + return parser_map # Devuelve mapa vacío print(f"Cargando parsers desde: '{parsers_dir_path}'") parsers_package = os.path.basename(parsers_dir) @@ -158,8 +183,10 @@ def load_parsers(parsers_dir="parsers"): and filename.endswith(".py") and filename not in ["__init__.py", "parser_utils.py"] ): - module_name_rel = filename[:-3] # Nombre sin .py (e.g., parse_lad_fbd) - full_module_name = f"{parsers_package}.{module_name_rel}" # e.g., parsers.parse_lad_fbd + module_name_rel = filename[:-3] # Nombre sin .py (e.g., parse_lad_fbd) + full_module_name = ( + f"{parsers_package}.{module_name_rel}" # e.g., parsers.parse_lad_fbd + ) try: # Importar el módulo dinámicamente module = importlib.import_module(full_module_name) @@ -181,7 +208,7 @@ def load_parsers(parsers_dir="parsers"): if isinstance(languages, list) and callable(parser_func): # Añadir la función al mapa para cada lenguaje que soporta for lang in languages: - lang_upper = lang.upper() # Usar mayúsculas como clave + lang_upper = lang.upper() # Usar mayúsculas como clave if lang_upper in parser_map: print( f" Advertencia: Parser para '{lang_upper}' en {full_module_name} sobrescribe definición anterior." @@ -213,22 +240,25 @@ def load_parsers(parsers_dir="parsers"): print(f"Lenguajes soportados: {list(parser_map.keys())}") return parser_map + # --- Función Principal de Conversión (MODIFICADA) --- def convert_xml_to_json(xml_filepath, json_filepath, parser_map): """Convierte XML a JSON, detectando tipo de bloque (FC/FB/OB/DB/UDT/TagTable).""" print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") if not os.path.exists(xml_filepath): print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'") - return False # Indicar fallo + return False # Indicar fallo try: print("Paso 1: Parseando archivo XML...") - parser = etree.XMLParser(remove_blank_text=True, recover=True) # recover=True puede ayudar + parser = etree.XMLParser( + remove_blank_text=True, recover=True + ) # recover=True puede ayudar tree = etree.parse(xml_filepath, parser) root = tree.getroot() print("Paso 1: Parseo XML completado.") - result = None # Inicializar resultado + result = None # Inicializar resultado # --- Detección del tipo de bloque/objeto principal --- print("Paso 2: Detectando tipo de objeto principal...") @@ -240,7 +270,9 @@ def convert_xml_to_json(xml_filepath, json_filepath, parser_map): # Buscar Tag Table si no es UDT if result is None: - tag_table_element = root.find(".//SW.Tags.PlcTagTable", namespaces=root.nsmap) + tag_table_element = root.find( + ".//SW.Tags.PlcTagTable", namespaces=root.nsmap + ) if tag_table_element is not None: result = parse_tag_table(tag_table_element) @@ -258,42 +290,73 @@ def convert_xml_to_json(xml_filepath, json_filepath, parser_map): if block_list: the_block = block_list[0] block_tag_name = etree.QName(the_block.tag).localname - if block_tag_name == "SW.Blocks.FC": block_type_found = "FC" - elif block_tag_name == "SW.Blocks.FB": block_type_found = "FB" - elif block_tag_name == "SW.Blocks.GlobalDB": block_type_found = "GlobalDB" - elif block_tag_name == "SW.Blocks.OB": block_type_found = "OB" - print(f"Paso 2b: Bloque {block_tag_name} (Tipo: {block_type_found}) encontrado (ID={the_block.get('ID')}).") + if block_tag_name == "SW.Blocks.FC": + block_type_found = "FC" + elif block_tag_name == "SW.Blocks.FB": + block_type_found = "FB" + elif block_tag_name == "SW.Blocks.GlobalDB": + block_type_found = "GlobalDB" + elif block_tag_name == "SW.Blocks.OB": + block_type_found = "OB" + print( + f"Paso 2b: Bloque {block_tag_name} (Tipo: {block_type_found}) encontrado (ID={the_block.get('ID')})." + ) else: - print("Error Crítico: No se encontró el elemento raíz del bloque () ni UDT ni Tag Table.") - return False # Fallo si no se encuentra ningún objeto principal + print( + "Error Crítico: No se encontró el elemento raíz del bloque () ni UDT ni Tag Table." + ) + return False # Fallo si no se encuentra ningún objeto principal # --- Si es FC/FB/OB/DB, continuar con el parseo original --- if the_block is not None: print("Paso 3: Extrayendo atributos del bloque...") # (Extracción de atributos Name, Number, Language como antes...) attribute_list_node = the_block.xpath("./AttributeList") - block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown" + block_name_val, block_number_val, block_lang_val = ( + "Unknown", + None, + "Unknown", + ) if attribute_list_node: attr_list = attribute_list_node[0] name_node = attr_list.xpath("./Name/text()") - block_name_val = name_node[0].strip() if name_node else block_name_val + block_name_val = ( + name_node[0].strip() if name_node else block_name_val + ) num_node = attr_list.xpath("./Number/text()") - try: block_number_val = int(num_node[0]) if num_node else None - except (ValueError, TypeError): block_number_val = None + try: + block_number_val = int(num_node[0]) if num_node else None + except (ValueError, TypeError): + block_number_val = None lang_node = attr_list.xpath("./ProgrammingLanguage/text()") - block_lang_val = (lang_node[0].strip() if lang_node else ("DB" if block_type_found == "GlobalDB" else "Unknown")) - print(f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje Bloque='{block_lang_val}'") + block_lang_val = ( + lang_node[0].strip() + if lang_node + else ("DB" if block_type_found == "GlobalDB" else "Unknown") + ) + print( + f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje Bloque='{block_lang_val}'" + ) else: - print(f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}.") - if block_type_found == "GlobalDB": block_lang_val = "DB" + print( + f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}." + ) + if block_type_found == "GlobalDB": + block_lang_val = "DB" # (Extracción de comentario como antes...) block_comment_val = "" - comment_node_list = the_block.xpath("./ObjectList/MultilingualText[@CompositionName='Comment']") - if comment_node_list: block_comment_val = get_multilingual_text(comment_node_list[0]) - else: # Fallback - comment_attr_node = the_block.xpath("./AttributeList/Comment") # Buscar desde AttributeList - if comment_attr_node : block_comment_val = get_multilingual_text(comment_attr_node[0]) + comment_node_list = the_block.xpath( + "./ObjectList/MultilingualText[@CompositionName='Comment']" + ) + if comment_node_list: + block_comment_val = get_multilingual_text(comment_node_list[0]) + else: # Fallback + comment_attr_node = the_block.xpath( + "./AttributeList/Comment" + ) # Buscar desde AttributeList + if comment_attr_node: + block_comment_val = get_multilingual_text(comment_attr_node[0]) print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'") @@ -305,34 +368,64 @@ def convert_xml_to_json(xml_filepath, json_filepath, parser_map): "block_type": block_type_found, "block_comment": block_comment_val, "interface": {}, - "networks": [], # Inicializar networks aquí + "networks": [], # Inicializar networks aquí } # (Extracción de interfaz como antes...) print("Paso 4: Extrayendo la interfaz del bloque...") - interface_node_list = attribute_list_node[0].xpath("./Interface") if attribute_list_node else [] + interface_node_list = ( + attribute_list_node[0].xpath("./Interface") + if attribute_list_node + else [] + ) if interface_node_list: interface_node = interface_node_list[0] - all_sections = interface_node.xpath(".//iface:Section", namespaces=ns) + all_sections = interface_node.xpath( + ".//iface:Section", namespaces=ns + ) if all_sections: processed_sections = set() for section in all_sections: section_name = section.get("Name") - if not section_name or section_name in processed_sections: continue - members_in_section = section.xpath("./iface:Member", namespaces=ns) + if not section_name or section_name in processed_sections: + continue + members_in_section = section.xpath( + "./iface:Member", namespaces=ns + ) if members_in_section: - result["interface"][section_name] = parse_interface_members(members_in_section) + result["interface"][section_name] = ( + parse_interface_members(members_in_section) + ) processed_sections.add(section_name) - else: print("Advertencia: Nodo Interface no contiene secciones .") - if not result["interface"]: print("Advertencia: Interface encontrada pero sin secciones procesables.") + else: + print( + "Advertencia: Nodo Interface no contiene secciones ." + ) + if not result["interface"]: + print( + "Advertencia: Interface encontrada pero sin secciones procesables." + ) elif block_type_found == "GlobalDB": - static_members = the_block.xpath(".//iface:Section[@Name='Static']/iface:Member", namespaces=ns) + static_members = the_block.xpath( + ".//iface:Section[@Name='Static']/iface:Member", namespaces=ns + ) if static_members: - print("Paso 4: Encontrada sección Static para GlobalDB (sin nodo Interface).") - result["interface"]["Static"] = parse_interface_members(static_members) - else: print("Advertencia: No se encontró sección 'Static' para GlobalDB.") - else: print(f"Advertencia: No se encontró para bloque {block_type_found}.") - if not result["interface"]: print("Advertencia: No se pudo extraer información de la interfaz.") + print( + "Paso 4: Encontrada sección Static para GlobalDB (sin nodo Interface)." + ) + result["interface"]["Static"] = parse_interface_members( + static_members + ) + else: + print( + "Advertencia: No se encontró sección 'Static' para GlobalDB." + ) + else: + print( + f"Advertencia: No se encontró para bloque {block_type_found}." + ) + if not result["interface"]: + print("Advertencia: No se pudo extraer información de la interfaz.") # (Procesamiento de redes como antes, SOLO si NO es GlobalDB) if block_type_found != "GlobalDB": @@ -341,82 +434,147 @@ def convert_xml_to_json(xml_filepath, json_filepath, parser_map): result["networks"] = [] object_list_node = the_block.xpath("./ObjectList") if object_list_node: - compile_units = object_list_node[0].xpath("./SW.Blocks.CompileUnit") - print(f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit.") + compile_units = object_list_node[0].xpath( + "./SW.Blocks.CompileUnit" + ) + print( + f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit." + ) # Bucle de parseo de redes (igual que antes) for network_elem in compile_units: networks_processed_count += 1 network_id = network_elem.get("ID") - if not network_id: continue + if not network_id: + continue network_lang = "LAD" net_attr_list = network_elem.xpath("./AttributeList") if net_attr_list: - lang_node = net_attr_list[0].xpath("./ProgrammingLanguage/text()") - if lang_node: network_lang = lang_node[0].strip() - print(f" - Procesando Red ID={network_id}, Lenguaje Red={network_lang}") + lang_node = net_attr_list[0].xpath( + "./ProgrammingLanguage/text()" + ) + if lang_node: + network_lang = lang_node[0].strip() + print( + f" - Procesando Red ID={network_id}, Lenguaje Red={network_lang}" + ) parser_func = parser_map.get(network_lang.upper()) parsed_network_data = None if parser_func: try: parsed_network_data = parser_func(network_elem) except Exception as e_parse: - print(f" ERROR durante el parseo de Red {network_id} ({network_lang}): {e_parse}") + print( + f" ERROR durante el parseo de Red {network_id} ({network_lang}): {e_parse}" + ) traceback.print_exc() - parsed_network_data = {"id": network_id, "language": network_lang, "logic": [], "error": f"Parser failed: {e_parse}"} + parsed_network_data = { + "id": network_id, + "language": network_lang, + "logic": [], + "error": f"Parser failed: {e_parse}", + } else: - print(f" Advertencia: Lenguaje de red '{network_lang}' no soportado.") - parsed_network_data = {"id": network_id, "language": network_lang, "logic": [], "error": f"Unsupported language: {network_lang}"} + print( + f" Advertencia: Lenguaje de red '{network_lang}' no soportado." + ) + parsed_network_data = { + "id": network_id, + "language": network_lang, + "logic": [], + "error": f"Unsupported language: {network_lang}", + } if parsed_network_data: - title_element = network_elem.xpath(".//iface:MultilingualText[@CompositionName='Title']",namespaces=ns) - parsed_network_data["title"] = (get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}") - comment_elem_net = network_elem.xpath("./ObjectList/MultilingualText[@CompositionName='Comment']", namespaces=ns) - if not comment_elem_net: comment_elem_net = network_elem.xpath(".//MultilingualText[@CompositionName='Comment']", namespaces=ns) # Fallback - parsed_network_data["comment"] = (get_multilingual_text(comment_elem_net[0]) if comment_elem_net else "") + title_element = network_elem.xpath( + ".//iface:MultilingualText[@CompositionName='Title']", + namespaces=ns, + ) + parsed_network_data["title"] = ( + get_multilingual_text(title_element[0]) + if title_element + else f"Network {network_id}" + ) + comment_elem_net = network_elem.xpath( + "./ObjectList/MultilingualText[@CompositionName='Comment']", + namespaces=ns, + ) + if not comment_elem_net: + comment_elem_net = network_elem.xpath( + ".//MultilingualText[@CompositionName='Comment']", + namespaces=ns, + ) # Fallback + parsed_network_data["comment"] = ( + get_multilingual_text(comment_elem_net[0]) + if comment_elem_net + else "" + ) result["networks"].append(parsed_network_data) - if networks_processed_count == 0: print(f"Advertencia: ObjectList para {block_type_found} sin SW.Blocks.CompileUnit.") - else: print(f"Advertencia: No se encontró ObjectList para el bloque {block_type_found}.") - else: # Es GlobalDB + if networks_processed_count == 0: + print( + f"Advertencia: ObjectList para {block_type_found} sin SW.Blocks.CompileUnit." + ) + else: + print( + f"Advertencia: No se encontró ObjectList para el bloque {block_type_found}." + ) + else: # Es GlobalDB print("Paso 5: Saltando procesamiento de redes para GlobalDB.") - # --- Escritura del JSON (si se encontró un objeto) --- if result: print("Paso 6: Escribiendo el resultado en el archivo JSON...") # Validaciones finales - if result.get("block_type") not in ["PlcUDT", "PlcTagTable"] and not result["interface"]: - print("ADVERTENCIA FINAL: 'interface' está vacía en el JSON.") - if result.get("block_type") not in ["PlcUDT", "PlcTagTable", "GlobalDB"] and not result["networks"]: - print("ADVERTENCIA FINAL: 'networks' está vacía en el JSON.") + if ( + result.get("block_type") not in ["PlcUDT", "PlcTagTable"] + and not result["interface"] + ): + print("ADVERTENCIA FINAL: 'interface' está vacía en el JSON.") + if ( + result.get("block_type") not in ["PlcUDT", "PlcTagTable", "GlobalDB"] + and not result["networks"] + ): + print("ADVERTENCIA FINAL: 'networks' está vacía en el JSON.") try: with open(json_filepath, "w", encoding="utf-8") as f: json.dump(result, f, indent=4, ensure_ascii=False) print("Paso 6: Escritura JSON completada.") - print(f"Conversión finalizada. JSON guardado en: '{os.path.relpath(json_filepath)}'") - return True # Indicar éxito + print( + f"Conversión finalizada. JSON guardado en: '{os.path.relpath(json_filepath)}'" + ) + return True # Indicar éxito - except IOError as e: print(f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"); return False - except TypeError as e: print(f"Error Crítico: Problema al serializar a JSON. Error: {e}"); return False + except IOError as e: + print( + f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}" + ) + return False + except TypeError as e: + print(f"Error Crítico: Problema al serializar a JSON. Error: {e}") + return False else: - print("Error Crítico: No se pudo determinar el tipo de objeto principal en el XML.") - return False - + print( + "Error Crítico: No se pudo determinar el tipo de objeto principal en el XML." + ) + return False except etree.XMLSyntaxError as e: - print(f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}") - return False # Indicar fallo + print( + f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}" + ) + return False # Indicar fallo except Exception as e: print(f"Error Crítico: Error inesperado durante la conversión: {e}") traceback.print_exc() - return False # Indicar fallo + return False # Indicar fallo + # --- Punto de Entrada Principal (__main__) --- if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Convert Simatic XML (FC/FB/OB/DB/UDT/TagTable) to simplified JSON using dynamic parsers." # Actualizado + description="Convert Simatic XML (FC/FB/OB/DB/UDT/TagTable) to simplified JSON using dynamic parsers." # Actualizado ) parser.add_argument( "xml_filepath", @@ -426,15 +584,20 @@ if __name__ == "__main__": xml_input_file = args.xml_filepath if not os.path.exists(xml_input_file): - print(f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'", file=sys.stderr) + print( + f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'", + file=sys.stderr, + ) sys.exit(1) # --- Cargar Parsers Dinámicamente --- - loaded_parsers = load_parsers() # Carga parsers LAD/FBD/STL/SCL + loaded_parsers = load_parsers() # Carga parsers LAD/FBD/STL/SCL if not loaded_parsers: # Continuar incluso sin parsers de red, ya que podríamos estar parseando UDT/TagTable - print("Advertencia (x1): No se cargaron parsers de red. Se continuará para UDT/TagTable/DB.") - #sys.exit(1) # Ya no salimos si no hay parsers de red + print( + "Advertencia (x1): No se cargaron parsers de red. Se continuará para UDT/TagTable/DB." + ) + # sys.exit(1) # Ya no salimos si no hay parsers de red # Derivar nombre de salida JSON xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0] @@ -443,14 +606,19 @@ if __name__ == "__main__": os.makedirs(output_dir, exist_ok=True) json_output_file = os.path.join(output_dir, f"{xml_filename_base}.json") - print(f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'") + print( + f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'" + ) # Llamar a la función de conversión principal success = convert_xml_to_json(xml_input_file, json_output_file, loaded_parsers) # Salir con código de error apropiado if success: - sys.exit(0) # Éxito + sys.exit(0) # Éxito else: - print(f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.", file=sys.stderr) - sys.exit(1) # Fallo \ No newline at end of file + print( + f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.", + file=sys.stderr, + ) + sys.exit(1) # Fallo diff --git a/x2_process.py b/x2_process.py index f3d5329..8c63903 100644 --- a/x2_process.py +++ b/x2_process.py @@ -7,15 +7,15 @@ import traceback import re import importlib import sys -import sympy # Import sympy +import sympy # Import sympy # Import necessary components from processors directory from processors.processor_utils import ( - format_variable_name, # Keep if used outside processors - sympy_expr_to_scl, # Needed for IF grouping and maybe others + format_variable_name, # Keep if used outside processors + sympy_expr_to_scl, # Needed for IF grouping and maybe others # get_target_scl_name might be used here? Unlikely. ) -from processors.symbol_manager import SymbolManager # Import the manager +from processors.symbol_manager import SymbolManager # Import the manager # --- Constantes y Configuración --- SCL_SUFFIX = "_sympy_processed" @@ -25,6 +25,7 @@ SIMPLIFIED_IF_COMMENT = "// Simplified IF condition by script" # Global data dictionary data = {} + # --- (process_group_ifs y load_processors SIN CAMBIOS) --- def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): """ @@ -109,9 +110,15 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): # SCoil/RCoil might also be groupable if their SCL is final assignment "SCoil", "RCoil", - "BLKMOV", # Added BLKMOV - "TON", "TOF", "TP", "Se", "Sd", # Added timers - "CTU", "CTD", "CTUD", # Added counters + "BLKMOV", # Added BLKMOV + "TON", + "TOF", + "TP", + "Se", + "Sd", # Added timers + "CTU", + "CTD", + "CTUD", # Added counters ] for consumer_instr in network_logic: @@ -137,7 +144,7 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): # Check if consumer is groupable AND has its final SCL generated if ( is_enabled_by_us - and consumer_type.endswith(SCL_SUFFIX) # Check if processed + and consumer_type.endswith(SCL_SUFFIX) # Check if processed and consumer_type_original in groupable_types ): @@ -148,7 +155,7 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): # If consumer SCL itself is an IF generated by EN, take the body if consumer_scl.strip().startswith("IF"): match = re.search( - r"IF\s+.*?THEN\s*(.*?)\s*END_IF;", # More robust regex + r"IF\s+.*?THEN\s*(.*?)\s*END_IF;", # More robust regex consumer_scl, re.DOTALL | re.IGNORECASE, ) @@ -203,18 +210,19 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): return made_change + def load_processors(processors_dir="processors"): """ Escanea el directorio, importa módulos, construye el mapa y una lista ordenada por prioridad. """ processor_map = {} - processor_list_unsorted = [] # Lista para guardar (priority, type_name, func) - default_priority = 10 # Prioridad si no se define en get_processor_info + processor_list_unsorted = [] # Lista para guardar (priority, type_name, func) + default_priority = 10 # Prioridad si no se define en get_processor_info if not os.path.isdir(processors_dir): print(f"Error: Directorio de procesadores no encontrado: '{processors_dir}'") - return processor_map, [] # Devuelve mapa vacío y lista vacía + return processor_map, [] # Devuelve mapa vacío y lista vacía print(f"Cargando procesadores desde: '{processors_dir}'") processors_package = os.path.basename(processors_dir) @@ -325,9 +333,15 @@ def process_json_to_scl(json_filepath, output_json_filepath): print(f"Procesando bloque tipo: {block_type}") # --- MODIFICADO: SALTAR PROCESAMIENTO PARA DB, UDT, TAG TABLE --- - if block_type in ["GlobalDB", "PlcUDT", "PlcTagTable"]: # <-- Comprobar tipos a saltar + if block_type in [ + "GlobalDB", + "PlcUDT", + "PlcTagTable", + ]: # <-- Comprobar tipos a saltar print(f"INFO: El bloque es {block_type}. Saltando procesamiento lógico de x2.") - print(f"Guardando JSON de {block_type} (sin cambios lógicos) en: {output_json_filepath}") + print( + f"Guardando JSON de {block_type} (sin cambios lógicos) en: {output_json_filepath}" + ) try: with open(output_json_filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) @@ -355,14 +369,26 @@ def process_json_to_scl(json_filepath, output_json_filepath): current_access_map = {} for instr in network.get("logic", []): for _, source in instr.get("inputs", {}).items(): - sources_to_check = (source if isinstance(source, list) else ([source] if isinstance(source, dict) else [])) + sources_to_check = ( + source + if isinstance(source, list) + else ([source] if isinstance(source, dict) else []) + ) for src in sources_to_check: - if (isinstance(src, dict) and src.get("uid") and src.get("type") in ["variable", "constant"]): + if ( + isinstance(src, dict) + and src.get("uid") + and src.get("type") in ["variable", "constant"] + ): current_access_map[src["uid"]] = src for _, dest_list in instr.get("outputs", {}).items(): if isinstance(dest_list, list): for dest in dest_list: - if (isinstance(dest, dict) and dest.get("uid") and dest.get("type") in ["variable", "constant"]): + if ( + isinstance(dest, dict) + and dest.get("uid") + and dest.get("type") in ["variable", "constant"] + ): current_access_map[dest["uid"]] = dest network_access_maps[net_id] = current_access_map @@ -391,7 +417,8 @@ def process_json_to_scl(json_filepath, output_json_filepath): for network in data.get("networks", []): network_id = network["id"] network_lang = network.get("language", "LAD") - if network_lang == "STL": continue + if network_lang == "STL": + continue access_map = network_access_maps.get(network_id, {}) network_logic = network.get("logic", []) @@ -399,30 +426,51 @@ def process_json_to_scl(json_filepath, output_json_filepath): instr_uid = instruction.get("instruction_uid") instr_type_current = instruction.get("type", "Unknown") - if (instr_type_current.endswith(SCL_SUFFIX) or "_error" in instr_type_current or instruction.get("grouped", False) or - instr_type_current in ["RAW_STL_CHUNK", "RAW_SCL_CHUNK", "UNSUPPORTED_LANG", "UNSUPPORTED_CONTENT", "PARSING_ERROR"]): + if ( + instr_type_current.endswith(SCL_SUFFIX) + or "_error" in instr_type_current + or instruction.get("grouped", False) + or instr_type_current + in [ + "RAW_STL_CHUNK", + "RAW_SCL_CHUNK", + "UNSUPPORTED_LANG", + "UNSUPPORTED_CONTENT", + "PARSING_ERROR", + ] + ): continue lookup_key = instr_type_current.lower() effective_type_name = lookup_key if instr_type_current == "Call": call_block_type = instruction.get("block_type", "").upper() - if call_block_type == "FC": effective_type_name = "call_fc" - elif call_block_type == "FB": effective_type_name = "call_fb" + if call_block_type == "FC": + effective_type_name = "call_fc" + elif call_block_type == "FB": + effective_type_name = "call_fb" if effective_type_name == current_type_name: try: - changed = func_to_call(instruction, network_id, sympy_map, symbol_manager, data) + changed = func_to_call( + instruction, network_id, sympy_map, symbol_manager, data + ) if changed: made_change_in_base_pass = True num_sympy_processed_this_pass += 1 except Exception as e: - print(f"ERROR(SymPy Base) al procesar {instr_type_current} UID {instr_uid}: {e}") + print( + f"ERROR(SymPy Base) al procesar {instr_type_current} UID {instr_uid}: {e}" + ) traceback.print_exc() - instruction["scl"] = f"// ERROR en SymPy procesador base: {e}" + instruction["scl"] = ( + f"// ERROR en SymPy procesador base: {e}" + ) instruction["type"] = instr_type_current + "_error" made_change_in_base_pass = True - print(f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy.") + print( + f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy." + ) # FASE 2: Agrupación IF (Ignorando STL) if made_change_in_base_pass or passes == 1: @@ -431,30 +479,58 @@ def process_json_to_scl(json_filepath, output_json_filepath): for network in data.get("networks", []): network_id = network["id"] network_lang = network.get("language", "LAD") - if network_lang == "STL": continue + if network_lang == "STL": + continue network_logic = network.get("logic", []) - uids_in_network = sorted([instr.get("instruction_uid", "Z") for instr in network_logic if instr.get("instruction_uid")]) + uids_in_network = sorted( + [ + instr.get("instruction_uid", "Z") + for instr in network_logic + if instr.get("instruction_uid") + ] + ) for uid_to_process in uids_in_network: - instruction = next((instr for instr in network_logic if instr.get("instruction_uid") == uid_to_process), None) - if not instruction: continue - if instruction.get("grouped") or "_error" in instruction.get("type", ""): continue - if instruction.get("type", "").endswith(SCL_SUFFIX): + instruction = next( + ( + instr + for instr in network_logic + if instr.get("instruction_uid") == uid_to_process + ), + None, + ) + if not instruction: + continue + if instruction.get("grouped") or "_error" in instruction.get( + "type", "" + ): + continue + if instruction.get("type", "").endswith(SCL_SUFFIX): try: - group_changed = process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data) + group_changed = process_group_ifs( + instruction, network_id, sympy_map, symbol_manager, data + ) if group_changed: made_change_in_group_pass = True num_grouped_this_pass += 1 except Exception as e: - print(f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}") + print( + f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}" + ) traceback.print_exc() - print(f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL).") + print( + f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL)." + ) # Comprobar si se completó if not made_change_in_base_pass and not made_change_in_group_pass: - print(f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---") + print( + f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---" + ) processing_complete = True else: - print(f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando...") + print( + f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando..." + ) if passes == max_passes and not processing_complete: print(f"\n--- ADVERTENCIA: Límite de {max_passes} pases alcanzado...") @@ -464,58 +540,92 @@ def process_json_to_scl(json_filepath, output_json_filepath): print(f"\n--- Verificación Final de Instrucciones No Procesadas ({block_type}) ---") unprocessed_count = 0 unprocessed_details = [] - ignored_types = ["raw_scl_chunk", "unsupported_lang", "raw_stl_chunk", "unsupported_content", "parsing_error"] + ignored_types = [ + "raw_scl_chunk", + "unsupported_lang", + "raw_stl_chunk", + "unsupported_content", + "parsing_error", + ] for network in data.get("networks", []): network_id = network.get("id", "Unknown ID") network_title = network.get("title", f"Network {network_id}") network_lang = network.get("language", "LAD") - if network_lang == "STL": continue + if network_lang == "STL": + continue for instruction in network.get("logic", []): instr_uid = instruction.get("instruction_uid", "Unknown UID") instr_type = instruction.get("type", "Unknown Type") is_grouped = instruction.get("grouped", False) - if (not instr_type.endswith(SCL_SUFFIX) and "_error" not in instr_type and not is_grouped and instr_type.lower() not in ignored_types): + if ( + not instr_type.endswith(SCL_SUFFIX) + and "_error" not in instr_type + and not is_grouped + and instr_type.lower() not in ignored_types + ): unprocessed_count += 1 - unprocessed_details.append(f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), Instrucción UID: {instr_uid}, Tipo: '{instr_type}'") + unprocessed_details.append( + f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), Instrucción UID: {instr_uid}, Tipo: '{instr_type}'" + ) if unprocessed_count > 0: - print(f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:") - for detail in unprocessed_details: print(detail) - else: print("INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas.") + print( + f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:" + ) + for detail in unprocessed_details: + print(detail) + else: + print( + "INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas." + ) print(f"\nGuardando JSON procesado ({block_type}) en: {output_json_filepath}") try: - with open(output_json_filepath, "w", encoding="utf-8") as f: + with open(output_json_filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) print("Guardado completado.") return True - except Exception as e: - print(f"Error Crítico al guardar JSON procesado: {e}"); + except Exception as e: + print(f"Error Crítico al guardar JSON procesado: {e}") traceback.print_exc() return False + # --- Ejecución (MODIFICADO) --- if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Process simplified JSON to embed SCL logic. Expects original XML filepath as argument.") - parser.add_argument("source_xml_filepath", help="Path to the original source XML file (passed from x0_main.py).") + parser = argparse.ArgumentParser( + description="Process simplified JSON to embed SCL logic. Expects original XML filepath as argument." + ) + parser.add_argument( + "source_xml_filepath", + help="Path to the original source XML file (passed from x0_main.py).", + ) args = parser.parse_args() source_xml_file = args.source_xml_filepath if not os.path.exists(source_xml_file): - print(f"Advertencia (x2): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON correspondiente.") + print( + f"Advertencia (x2): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON correspondiente." + ) xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0] base_dir = os.path.dirname(source_xml_file) parsing_dir = os.path.join(base_dir, "parsing") input_json_file = os.path.join(parsing_dir, f"{xml_filename_base}.json") output_json_file = os.path.join(parsing_dir, f"{xml_filename_base}_processed.json") - + os.makedirs(parsing_dir, exist_ok=True) - - print(f"(x2) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'") + + print( + f"(x2) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'" + ) if not os.path.exists(input_json_file): - print(f"Error Fatal (x2): El archivo de entrada JSON no existe: '{input_json_file}'") - print(f"Asegúrate de que 'x1_to_json.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'.") + print( + f"Error Fatal (x2): El archivo de entrada JSON no existe: '{input_json_file}'" + ) + print( + f"Asegúrate de que 'x1_to_json.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'." + ) sys.exit(1) else: try: @@ -525,6 +635,8 @@ if __name__ == "__main__": else: sys.exit(1) except Exception as e: - print(f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}") + print( + f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}" + ) traceback.print_exc() - sys.exit(1) \ No newline at end of file + sys.exit(1) diff --git a/x3_generate_scl.py b/x3_generate_scl.py index 4f5c2b1..f6dc8cd 100644 --- a/x3_generate_scl.py +++ b/x3_generate_scl.py @@ -13,6 +13,7 @@ try: from generators.generate_scl_code_block import generate_scl_for_code_block from generators.generate_md_udt import generate_udt_markdown from generators.generate_md_tag_table import generate_tag_table_markdown + # Importar format_variable_name (necesario para el nombre de archivo) from generators.generator_utils import format_variable_name except ImportError as e: @@ -20,6 +21,7 @@ except ImportError as e: print("Asegúrate de que el directorio 'generators' y sus archivos .py existen.") sys.exit(1) + # --- Función Principal de Generación (Despachador) --- def generate_scl_or_markdown(processed_json_filepath, output_directory): """ @@ -35,15 +37,19 @@ def generate_scl_or_markdown(processed_json_filepath, output_directory): with open(processed_json_filepath, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: - print(f"Error al cargar/parsear JSON: {e}"); traceback.print_exc(); return + print(f"Error al cargar/parsear JSON: {e}") + traceback.print_exc() + return block_name = data.get("block_name", "UnknownBlock") block_type = data.get("block_type", "Unknown") - scl_block_name = format_variable_name(block_name) # Nombre seguro para archivo + scl_block_name = format_variable_name(block_name) # Nombre seguro para archivo output_content = [] - output_extension = ".scl" # Default + output_extension = ".scl" # Default - print(f"Generando salida para: {block_type} '{scl_block_name}' (Original: {block_name})") + print( + f"Generando salida para: {block_type} '{scl_block_name}' (Original: {block_name})" + ) # --- Selección del Generador y Extensión --- generation_function = None @@ -63,8 +69,10 @@ def generate_scl_or_markdown(processed_json_filepath, output_directory): print(f" -> Modo de generación: {block_type} SCL") generation_function = generate_scl_for_code_block output_extension = ".scl" - else: # Tipo desconocido - print(f"Error: Tipo de bloque desconocido '{block_type}'. No se generará archivo.") + else: # Tipo desconocido + print( + f"Error: Tipo de bloque desconocido '{block_type}'. No se generará archivo." + ) return # --- Llamar a la función generadora --- @@ -72,9 +80,11 @@ def generate_scl_or_markdown(processed_json_filepath, output_directory): try: output_content = generation_function(data) except Exception as gen_e: - print(f"Error durante la generación de contenido para {block_type} '{scl_block_name}': {gen_e}") + print( + f"Error durante la generación de contenido para {block_type} '{scl_block_name}': {gen_e}" + ) traceback.print_exc() - return # No intentar escribir si la generación falla + return # No intentar escribir si la generación falla # --- Escritura del Archivo de Salida --- output_filename_base = f"{scl_block_name}{output_extension}" @@ -91,20 +101,35 @@ def generate_scl_or_markdown(processed_json_filepath, output_directory): print(f"Error al escribir el archivo {output_extension.upper()}: {e}") traceback.print_exc() + # --- Ejecución --- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Generate final SCL or Markdown file.") - parser.add_argument("source_xml_filepath", help="Path to the original source XML file.") - args = parser.parse_args(); source_xml_file = args.source_xml_filepath - if not os.path.exists(source_xml_file): print(f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}'.") + parser.add_argument( + "source_xml_filepath", help="Path to the original source XML file." + ) + args = parser.parse_args() + source_xml_file = args.source_xml_filepath + if not os.path.exists(source_xml_file): + print( + f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}'." + ) xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0] base_dir = os.path.dirname(source_xml_file) parsing_dir = os.path.join(base_dir, "parsing") input_json_file = os.path.join(parsing_dir, f"{xml_filename_base}_processed.json") output_dir = base_dir - print(f"(x3) Generando SCL/MD desde: '{os.path.relpath(input_json_file)}' en directorio: '{os.path.relpath(output_dir)}'") + print( + f"(x3) Generando SCL/MD desde: '{os.path.relpath(input_json_file)}' en directorio: '{os.path.relpath(output_dir)}'" + ) if not os.path.exists(input_json_file): - print(f"Error Fatal (x3): JSON procesado no encontrado: '{input_json_file}'"); sys.exit(1) + print(f"Error Fatal (x3): JSON procesado no encontrado: '{input_json_file}'") + sys.exit(1) else: - try: generate_scl_or_markdown(input_json_file, output_dir); sys.exit(0) - except Exception as e: print(f"Error Crítico (x3): {e}"); traceback.print_exc(); sys.exit(1) \ No newline at end of file + try: + generate_scl_or_markdown(input_json_file, output_dir) + sys.exit(0) + except Exception as e: + print(f"Error Crítico (x3): {e}") + traceback.print_exc() + sys.exit(1)