diff --git a/ToUpload/processors/process_call.py b/ToUpload/processors/process_call.py index 7902697..c3a554e 100644 --- a/ToUpload/processors/process_call.py +++ b/ToUpload/processors/process_call.py @@ -11,14 +11,17 @@ SCL_SUFFIX = "_sympy_processed" def process_call(instruction, network_id, sympy_map, symbol_manager: SymbolManager, data): instr_uid = instruction["instruction_uid"] - instr_type_original = instruction.get("type", "") # Tipo antes de añadir sufijo - if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original: - return False - + # Get original type before potential suffix/error was added by x1 or previous passes + # This requires storing the original type perhaps, or removing known suffixes + # Let's assume 'block_type' (FC/FB) and 'block_name' are correct from x1 block_name = instruction.get("block_name", f"UnknownCall_{instr_uid}") block_type = instruction.get("block_type") # FC, FB instance_db = instruction.get("instance_db") # Nombre del DB de instancia (para FB) + # Check if already processed + if instruction.get("type", "").endswith(SCL_SUFFIX) or "_error" in instruction.get("type", ""): + return False + # Formatear nombres SCL (para la llamada final) block_name_scl = format_variable_name(block_name) instance_db_scl = format_variable_name(instance_db) if instance_db else None @@ -33,91 +36,140 @@ def process_call(instruction, network_id, sympy_map, symbol_manager: SymbolManag # --- Procesar Parámetros de Entrada --- scl_call_params = [] - processed_inputs = {"en"} + processed_inputs = {"en"} # Track processed pins to avoid duplicates if 'en' is also listed elsewhere dependencies_resolved = True - # Ordenar para consistencia + # Iterar sobre las entradas que x1 debería haber poblado + # Ordenar por nombre de pin para consistencia en la llamada SCL input_pin_names = sorted(instruction.get("inputs", {}).keys()) for pin_name in input_pin_names: - if pin_name not in processed_inputs: + if pin_name not in processed_inputs: # Skip 'en' if already handled source_info = instruction["inputs"][pin_name] - # Obtener la representación de la fuente (puede ser SymPy o Constante/String) + + # Get the representation of the source (SymPy, constant, or SCL string) source_sympy_or_const = get_sympy_representation(source_info, network_id, sympy_map, symbol_manager) if source_sympy_or_const is None: # print(f"DEBUG Call {instr_uid}: Input param '{pin_name}' dependency not ready.") dependencies_resolved = False - break # Salir si una dependencia no está lista + break # Exit if one dependency is not ready - # Convertir la expresión/constante a SCL para la llamada - # Simplificar ANTES de convertir? Probablemente no necesario para parámetros de entrada - # a menos que queramos optimizar el valor pasado. Por ahora, convertir directo. + # Convert the expression/constant to SCL for the call + # Simplification of inputs is generally not needed here, convert directly param_scl_value = sympy_expr_to_scl(source_sympy_or_const, symbol_manager) - # El nombre del pin SÍ necesita formateo + # Parameter pin name needs formatting for SCL pin_name_scl = format_variable_name(pin_name) + + # Special check for DB_ANY or ANY_POINTER - pass name directly without := + # We need the original parameter type info for this, which is not in the simplified JSON. + # WORKAROUND: Check if param_scl_value looks like a DB name ("DB_NAME") + # This is heuristic and might be wrong. Ideally, x1 should pass type info. + # For now, we assume standard 'Param := Value' syntax. + # if param_scl_value.startswith('"') and param_scl_value.endswith('"') and block_type == "FC": # Heuristic for DB_ANY? + # scl_call_params.append(f"{pin_name_scl} := {param_scl_value}") # Still use := for clarity? TIA might infer + # else: scl_call_params.append(f"{pin_name_scl} := {param_scl_value}") + processed_inputs.add(pin_name) if not dependencies_resolved: return False - # --- Construcción de la Llamada SCL (similar a antes) --- + # --- Construcción de la Llamada SCL (con parámetros) --- scl_call_body = "" - param_string = ", ".join(scl_call_params) + param_string = ", ".join(scl_call_params) # Join parameters with commas if block_type == "FB": if not instance_db_scl: print(f"Error: Call FB '{block_name_scl}' (UID {instr_uid}) sin instancia.") instruction["scl"] = f"// ERROR: FB Call {block_name_scl} sin instancia" - instruction["type"] = f"Call_FB_error" - return True + instruction["type"] = f"Call_FB_error" # Mark with error + return True # Processed (with error) + # FB Call: InstanceName(Param1 := Value1, Param2 := Value2); scl_call_body = f"{instance_db_scl}({param_string});" elif block_type == "FC": + # FC Call: BlockName(Param1 := Value1, Param2 := Value2); scl_call_body = f"{block_name_scl}({param_string});" else: print(f"Advertencia: Tipo de bloque no soportado para Call UID {instr_uid}: {block_type}") scl_call_body = f"// ERROR: Call a bloque tipo '{block_type}' no soportado: {block_name_scl}" - instruction["type"] = f"Call_{block_type}_error" # Marcar como error + # Mark instruction type with error + instruction["type"] = f"Call_{block_type or 'Unknown'}_error" # Add specific type if known + # --- Aplicar Condición EN (usando la expresión SymPy EN) --- scl_final = "" if sympy_en_expr != sympy.true: - # Simplificar la condición EN ANTES de convertirla a SCL + # Simplify the EN condition before converting to SCL try: #simplified_en_expr = sympy.simplify_logic(sympy_en_expr, force=True) simplified_en_expr = sympy.logic.boolalg.to_dnf(sympy_en_expr, simplify=True) except Exception as e: - print(f"Error simplifying EN for Call {instr_uid}: {e}") + print(f"Error simplifying EN for Call {instr_uid} ({block_name_scl}): {e}") simplified_en_expr = sympy_en_expr # Fallback en_condition_scl = sympy_expr_to_scl(simplified_en_expr, symbol_manager) - indented_call = "\n".join([f" {line}" for line in scl_call_body.splitlines()]) - scl_final = f"IF {en_condition_scl} THEN\n{indented_call}\nEND_IF;" + # Avoid IF TRUE/FALSE blocks + if en_condition_scl == "TRUE": + scl_final = scl_call_body + elif en_condition_scl == "FALSE": + scl_final = f"// Call {block_name_scl} (UID {instr_uid}) condition simplified to FALSE." + # Also update type to avoid further processing? + # instruction["type"] = f"Call_{block_type}{SCL_SUFFIX}_Optimized" + else: + # Indent the call body within the IF block + indented_call = "\n".join([f" {line}" for line in scl_call_body.splitlines()]) + scl_final = f"IF {en_condition_scl} THEN\n{indented_call}\nEND_IF;" else: + # No IF needed if EN is always TRUE scl_final = scl_call_body # --- Actualizar Instrucción y Mapa SymPy --- instruction["scl"] = scl_final # Guardar el SCL final generado - instruction["type"] = (f"Call_{block_type}{SCL_SUFFIX}" if "_error" not in instruction["type"] else instruction["type"]) - # Actualizar sympy_map con el estado ENO (es la expresión SymPy de EN) + # Update instruction type to mark as processed (unless already marked as error) + if "_error" not in instruction.get("type", ""): + instruction["type"] = f"Call_{block_type}{SCL_SUFFIX}" + + # Propagar el estado ENO (es la expresión SymPy de EN) map_key_eno = (network_id, instr_uid, "eno") sympy_map[map_key_eno] = sympy_en_expr # Guardar la expresión SymPy para ENO - # Propagar valores de salida (requiere info de interfaz o heurística) - # Si se sabe que hay una salida 'MyOutput', se podría añadir su SCL al mapa - # Ejemplo MUY simplificado: + # --- Propagar Valores de Salida (Importante pero complejo) --- + # Esto requiere conocer la interfaz del bloque llamado (que no tenemos aquí directamente) + # O asumir convenciones estándar (ej. FCs tienen Ret_Val, FBs tienen outputs en su instancia) + + # Heurística simple: Si es un FC, intentar propagar Ret_Val si existe en outputs + # Si es un FB, las salidas se acceden a través de la instancia (e.g., "MyInstance".Output1) + # Por ahora, dejaremos la propagación de salidas más avanzada para una mejora futura + # o requerirá pasar información de la interfaz del bloque llamado. + + # Ejemplo básico (necesita mejorar): # for pin_name, dest_list in instruction.get("outputs", {}).items(): # if pin_name != 'eno' and dest_list: # Asumir que hay un destino # map_key_out = (network_id, instr_uid, pin_name) + # pin_name_scl = format_variable_name(pin_name) # if block_type == "FB" and instance_db_scl: - # sympy_map[map_key_out] = f"{instance_db_scl}.{format_variable_name(pin_name)}" # Guardar el *string* de acceso SCL - # # Para FCs es más complejo, necesitaría asignación explícita a temp - # # else: # FC output -> necesita temp var - # # temp_var = generate_temp_var_name(...) - # # sympy_map[map_key_out] = temp_var + # # Salida de FB: "Instancia".NombrePin + # output_scl_access = f"{instance_db_scl}.{pin_name_scl}" + # # Podríamos guardar el string SCL o crear/obtener un Symbol + # sympy_out_symbol = symbol_manager.get_symbol(output_scl_access) + # sympy_map[map_key_out] = sympy_out_symbol if sympy_out_symbol else output_scl_access # Prefiere Symbol + # elif block_type == "FC": + # # Salida de FC: Requiere asignar a una variable (temporal o de interfaz) + # # Esto se complica porque el destino está en 'dest_list' + # if len(dest_list) == 1 and dest_list[0].get("type") == "variable": + # target_var_name = format_variable_name(dest_list[0].get("name")) + # # Guardar el nombre del destino SCL que contendrá el valor + # sympy_map[map_key_out] = target_var_name + # # Necesitaríamos modificar scl_final para incluir la asignación: + # # target_var_name := FC_Call(...); (requiere reestructurar la generación SCL) + # else: + # # Múltiples destinos o destino no variable es complejo para FC outputs + # sympy_map[map_key_out] = f"/* TODO: Assign FC output {pin_name_scl} */" + return True @@ -125,7 +177,8 @@ def process_call(instruction, network_id, sympy_map, symbol_manager: SymbolManag # --- Processor Information Function --- def get_processor_info(): """Devuelve la información para las llamadas a FC y FB.""" + # Asegurarse que los type_name coincidan con los usados en x1 y x2 return [ - {'type_name': 'call_fc', 'processor_func': process_call, 'priority': 6}, - {'type_name': 'call_fb', 'processor_func': process_call, 'priority': 6} + {'type_name': 'call_fc', 'processor_func': process_call, 'priority': 6}, # Prioridad alta + {'type_name': 'call_fb', 'processor_func': process_call, 'priority': 6} # Prioridad alta ] \ No newline at end of file diff --git a/paste.py b/paste.py index e69de29..c3a554e 100644 --- a/paste.py +++ b/paste.py @@ -0,0 +1,184 @@ +# processors/process_call.py +# -*- coding: utf-8 -*- +import sympy +import traceback +# Asumiendo que estas funciones ahora existen y están adaptadas +from .processor_utils import get_sympy_representation, sympy_expr_to_scl, format_variable_name, get_target_scl_name +from .symbol_manager import SymbolManager # Necesitamos pasar el symbol_manager + +# Definir sufijo globalmente o importar +SCL_SUFFIX = "_sympy_processed" + +def process_call(instruction, network_id, sympy_map, symbol_manager: SymbolManager, data): + instr_uid = instruction["instruction_uid"] + # Get original type before potential suffix/error was added by x1 or previous passes + # This requires storing the original type perhaps, or removing known suffixes + # Let's assume 'block_type' (FC/FB) and 'block_name' are correct from x1 + block_name = instruction.get("block_name", f"UnknownCall_{instr_uid}") + block_type = instruction.get("block_type") # FC, FB + instance_db = instruction.get("instance_db") # Nombre del DB de instancia (para FB) + + # Check if already processed + if instruction.get("type", "").endswith(SCL_SUFFIX) or "_error" in instruction.get("type", ""): + return False + + # Formatear nombres SCL (para la llamada final) + block_name_scl = format_variable_name(block_name) + instance_db_scl = format_variable_name(instance_db) if instance_db else None + + # --- Manejo de EN --- + en_input = instruction["inputs"].get("en") + sympy_en_expr = get_sympy_representation(en_input, network_id, sympy_map, symbol_manager) if en_input else sympy.true + + if sympy_en_expr is None: + # print(f"DEBUG Call {instr_uid}: EN dependency not ready.") + return False # Dependencia EN no resuelta + + # --- Procesar Parámetros de Entrada --- + scl_call_params = [] + processed_inputs = {"en"} # Track processed pins to avoid duplicates if 'en' is also listed elsewhere + dependencies_resolved = True + + # Iterar sobre las entradas que x1 debería haber poblado + # Ordenar por nombre de pin para consistencia en la llamada SCL + input_pin_names = sorted(instruction.get("inputs", {}).keys()) + + for pin_name in input_pin_names: + if pin_name not in processed_inputs: # Skip 'en' if already handled + source_info = instruction["inputs"][pin_name] + + # Get the representation of the source (SymPy, constant, or SCL string) + source_sympy_or_const = get_sympy_representation(source_info, network_id, sympy_map, symbol_manager) + + if source_sympy_or_const is None: + # print(f"DEBUG Call {instr_uid}: Input param '{pin_name}' dependency not ready.") + dependencies_resolved = False + break # Exit if one dependency is not ready + + # Convert the expression/constant to SCL for the call + # Simplification of inputs is generally not needed here, convert directly + param_scl_value = sympy_expr_to_scl(source_sympy_or_const, symbol_manager) + + # Parameter pin name needs formatting for SCL + pin_name_scl = format_variable_name(pin_name) + + # Special check for DB_ANY or ANY_POINTER - pass name directly without := + # We need the original parameter type info for this, which is not in the simplified JSON. + # WORKAROUND: Check if param_scl_value looks like a DB name ("DB_NAME") + # This is heuristic and might be wrong. Ideally, x1 should pass type info. + # For now, we assume standard 'Param := Value' syntax. + # if param_scl_value.startswith('"') and param_scl_value.endswith('"') and block_type == "FC": # Heuristic for DB_ANY? + # scl_call_params.append(f"{pin_name_scl} := {param_scl_value}") # Still use := for clarity? TIA might infer + # else: + scl_call_params.append(f"{pin_name_scl} := {param_scl_value}") + + processed_inputs.add(pin_name) + + if not dependencies_resolved: + return False + + # --- Construcción de la Llamada SCL (con parámetros) --- + scl_call_body = "" + param_string = ", ".join(scl_call_params) # Join parameters with commas + + if block_type == "FB": + if not instance_db_scl: + print(f"Error: Call FB '{block_name_scl}' (UID {instr_uid}) sin instancia.") + instruction["scl"] = f"// ERROR: FB Call {block_name_scl} sin instancia" + instruction["type"] = f"Call_FB_error" # Mark with error + return True # Processed (with error) + # FB Call: InstanceName(Param1 := Value1, Param2 := Value2); + scl_call_body = f"{instance_db_scl}({param_string});" + elif block_type == "FC": + # FC Call: BlockName(Param1 := Value1, Param2 := Value2); + scl_call_body = f"{block_name_scl}({param_string});" + else: + print(f"Advertencia: Tipo de bloque no soportado para Call UID {instr_uid}: {block_type}") + scl_call_body = f"// ERROR: Call a bloque tipo '{block_type}' no soportado: {block_name_scl}" + # Mark instruction type with error + instruction["type"] = f"Call_{block_type or 'Unknown'}_error" # Add specific type if known + + + # --- Aplicar Condición EN (usando la expresión SymPy EN) --- + scl_final = "" + if sympy_en_expr != sympy.true: + # Simplify the EN condition before converting to SCL + try: + #simplified_en_expr = sympy.simplify_logic(sympy_en_expr, force=True) + simplified_en_expr = sympy.logic.boolalg.to_dnf(sympy_en_expr, simplify=True) + except Exception as e: + print(f"Error simplifying EN for Call {instr_uid} ({block_name_scl}): {e}") + simplified_en_expr = sympy_en_expr # Fallback + en_condition_scl = sympy_expr_to_scl(simplified_en_expr, symbol_manager) + + # Avoid IF TRUE/FALSE blocks + if en_condition_scl == "TRUE": + scl_final = scl_call_body + elif en_condition_scl == "FALSE": + scl_final = f"// Call {block_name_scl} (UID {instr_uid}) condition simplified to FALSE." + # Also update type to avoid further processing? + # instruction["type"] = f"Call_{block_type}{SCL_SUFFIX}_Optimized" + else: + # Indent the call body within the IF block + indented_call = "\n".join([f" {line}" for line in scl_call_body.splitlines()]) + scl_final = f"IF {en_condition_scl} THEN\n{indented_call}\nEND_IF;" + else: + # No IF needed if EN is always TRUE + scl_final = scl_call_body + + # --- Actualizar Instrucción y Mapa SymPy --- + instruction["scl"] = scl_final # Guardar el SCL final generado + + # Update instruction type to mark as processed (unless already marked as error) + if "_error" not in instruction.get("type", ""): + instruction["type"] = f"Call_{block_type}{SCL_SUFFIX}" + + # Propagar el estado ENO (es la expresión SymPy de EN) + map_key_eno = (network_id, instr_uid, "eno") + sympy_map[map_key_eno] = sympy_en_expr # Guardar la expresión SymPy para ENO + + # --- Propagar Valores de Salida (Importante pero complejo) --- + # Esto requiere conocer la interfaz del bloque llamado (que no tenemos aquí directamente) + # O asumir convenciones estándar (ej. FCs tienen Ret_Val, FBs tienen outputs en su instancia) + + # Heurística simple: Si es un FC, intentar propagar Ret_Val si existe en outputs + # Si es un FB, las salidas se acceden a través de la instancia (e.g., "MyInstance".Output1) + # Por ahora, dejaremos la propagación de salidas más avanzada para una mejora futura + # o requerirá pasar información de la interfaz del bloque llamado. + + # Ejemplo básico (necesita mejorar): + # for pin_name, dest_list in instruction.get("outputs", {}).items(): + # if pin_name != 'eno' and dest_list: # Asumir que hay un destino + # map_key_out = (network_id, instr_uid, pin_name) + # pin_name_scl = format_variable_name(pin_name) + # if block_type == "FB" and instance_db_scl: + # # Salida de FB: "Instancia".NombrePin + # output_scl_access = f"{instance_db_scl}.{pin_name_scl}" + # # Podríamos guardar el string SCL o crear/obtener un Symbol + # sympy_out_symbol = symbol_manager.get_symbol(output_scl_access) + # sympy_map[map_key_out] = sympy_out_symbol if sympy_out_symbol else output_scl_access # Prefiere Symbol + # elif block_type == "FC": + # # Salida de FC: Requiere asignar a una variable (temporal o de interfaz) + # # Esto se complica porque el destino está en 'dest_list' + # if len(dest_list) == 1 and dest_list[0].get("type") == "variable": + # target_var_name = format_variable_name(dest_list[0].get("name")) + # # Guardar el nombre del destino SCL que contendrá el valor + # sympy_map[map_key_out] = target_var_name + # # Necesitaríamos modificar scl_final para incluir la asignación: + # # target_var_name := FC_Call(...); (requiere reestructurar la generación SCL) + # else: + # # Múltiples destinos o destino no variable es complejo para FC outputs + # sympy_map[map_key_out] = f"/* TODO: Assign FC output {pin_name_scl} */" + + + return True + + +# --- Processor Information Function --- +def get_processor_info(): + """Devuelve la información para las llamadas a FC y FB.""" + # Asegurarse que los type_name coincidan con los usados en x1 y x2 + return [ + {'type_name': 'call_fc', 'processor_func': process_call, 'priority': 6}, # Prioridad alta + {'type_name': 'call_fb', 'processor_func': process_call, 'priority': 6} # Prioridad alta + ] \ No newline at end of file diff --git a/x1_to_json.py b/x1_to_json.py index 8d43e61..de1a4fb 100644 --- a/x1_to_json.py +++ b/x1_to_json.py @@ -6,9 +6,9 @@ import re from lxml import etree import traceback from collections import defaultdict +import copy # Importar copy para deepcopy # --- Namespaces --- -# Se añade el namespace 'st' para Structured Text ns = { "iface": "http://www.siemens.com/automation/Openness/SW/Interface/v5", "flg": "http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4", @@ -17,44 +17,47 @@ ns = { } -# --- Helper Functions --- -# ... (El resto de las funciones helper: get_multilingual_text, get_symbol_name, etc. permanecen igual) ... -# --- (Incluye aquí todas tus funciones helper sin cambios) --- +# --- (Funciones helper SIN CAMBIOS: get_multilingual_text, get_symbol_name, parse_access, parse_part, parse_call, reconstruct_scl_from_tokens, etc.) --- +# --- (Incluye aquí tus funciones helper sin cambios) --- def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"): # (Sin cambios respecto a la versión anterior) if element is None: return "" try: - xpath_expr = ( - f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{default_lang}']]" - f"/*[local-name()='AttributeList']/*[local-name()='Text']" - ) - text_items = element.xpath(xpath_expr) - if text_items and text_items[0].text is not None: - return text_items[0].text.strip() - xpath_expr = ( - f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{fallback_lang}']]" - f"/*[local-name()='AttributeList']/*[local-name()='Text']" - ) - text_items = element.xpath(xpath_expr) - if text_items and text_items[0].text is not None: - return text_items[0].text.strip() - xpath_expr = f".//*[local-name()='MultilingualTextItem']/*[local-name()='AttributeList']/*[local-name()='Text']" - text_items = element.xpath(xpath_expr) - if text_items and text_items[0].text is not None: - return text_items[0].text.strip() + # Intenta buscar el idioma por defecto + xpath_expr_default = f".//iface:MultilingualTextItem[iface:AttributeList/iface:Culture='{default_lang}']/iface:AttributeList/iface:Text" + text_items_default = element.xpath(xpath_expr_default, namespaces=ns) + if text_items_default and text_items_default[0].text is not None: + return text_items_default[0].text.strip() + + # Intenta buscar el idioma de fallback + xpath_expr_fallback = f".//iface:MultilingualTextItem[iface:AttributeList/iface:Culture='{fallback_lang}']/iface:AttributeList/iface:Text" + text_items_fallback = element.xpath(xpath_expr_fallback, namespaces=ns) + if text_items_fallback and text_items_fallback[0].text is not None: + return text_items_fallback[0].text.strip() + + # Si no encuentra ninguno, toma el primer texto que encuentre + xpath_expr_any = ".//iface:MultilingualTextItem/iface:AttributeList/iface:Text" + text_items_any = element.xpath(xpath_expr_any, namespaces=ns) + if text_items_any and text_items_any[0].text is not None: + return text_items_any[0].text.strip() + + # Fallback si MultilingualText está vacío o tiene una estructura inesperada return "" except Exception as e: print(f"Advertencia: Error extrayendo MultilingualText: {e}") + # traceback.print_exc() # Descomentar para más detalles del error return "" def get_symbol_name(symbol_element): - # (Sin cambios respecto a la versión anterior) + # Adaptado para usar namespace flg if symbol_element is None: return None try: - components = symbol_element.xpath("./*[local-name()='Component']/@Name") + # Asume que Component está dentro de Symbol y ambos están en el namespace flg + components = symbol_element.xpath("./flg:Component/@Name", namespaces=ns) + # Formatear correctamente con comillas dobles return ".".join(f'"{c}"' for c in components) if components else None except Exception as e: print(f"Advertencia: Excepción en get_symbol_name: {e}") @@ -62,16 +65,20 @@ def get_symbol_name(symbol_element): def parse_access(access_element): - # (Sin cambios respecto a la versión anterior) + # Adaptado para usar namespace flg if access_element is None: return None uid = access_element.get("UId") scope = access_element.get("Scope") info = {"uid": uid, "scope": scope, "type": "unknown"} - symbol = access_element.xpath("./*[local-name()='Symbol']") - constant = access_element.xpath("./*[local-name()='Constant']") + + # Buscar Symbol o Constant usando el namespace flg + symbol = access_element.xpath("./flg:Symbol", namespaces=ns) + constant = access_element.xpath("./flg:Constant", namespaces=ns) + if symbol: info["type"] = "variable" + # Llamar a get_symbol_name que ahora espera flg:Symbol info["name"] = get_symbol_name(symbol[0]) if info["name"] is None: info["type"] = "error_parsing_symbol" @@ -79,23 +86,29 @@ def parse_access(access_element): return info elif constant: info["type"] = "constant" - const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']") - const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']") + # Buscar ConstantType y ConstantValue usando el namespace flg + const_type_elem = constant[0].xpath("./flg:ConstantType", namespaces=ns) + const_val_elem = constant[0].xpath("./flg:ConstantValue", namespaces=ns) + + # Extraer texto info["datatype"] = ( - const_type_elem[0].text + const_type_elem[0].text.strip() if const_type_elem and const_type_elem[0].text is not None else "Unknown" ) value_str = ( - const_val_elem[0].text + const_val_elem[0].text.strip() if const_val_elem and const_val_elem[0].text is not None else None ) + if value_str is None: info["type"] = "error_parsing_constant" info["value"] = None print(f"Error: Constante sin valor Access UID={uid}") return info + + # Inferir tipo si es Unknown (igual que antes) if info["datatype"] == "Unknown": val_lower = value_str.lower() if val_lower in ["true", "false"]: @@ -112,7 +125,9 @@ def parse_access(access_element): pass elif "#" in value_str: info["datatype"] = "TypedConstant" - info["value"] = value_str + + info["value"] = value_str # Guardar valor original + # Intentar conversión numérica/booleana (igual que antes) dtype_lower = info["datatype"].lower() val_str_processed = value_str.split("#")[-1] if "#" in value_str else value_str try: @@ -136,17 +151,18 @@ def parse_access(access_element): ) elif dtype_lower in ["real", "lreal"]: info["value"] = float(val_str_processed) - elif dtype_lower == "typedconstant": - info["value"] = value_str + # Mantener string para TypedConstant y otros except (ValueError, TypeError) as e: print( f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}" ) - info["value"] = value_str + info["value"] = value_str # Mantener string original + else: info["type"] = "unknown_structure" print(f"Advertencia: Access UID={uid} no es Symbol ni Constant.") return info + if info["type"] == "variable" and info.get("name") is None: print(f"Error Interno: parse_access var sin nombre UID {uid}.") info["type"] = "error_no_name" @@ -155,7 +171,7 @@ def parse_access(access_element): def parse_part(part_element): - # (Sin cambios respecto a la versión anterior) + # Asume que Part está en namespace flg if part_element is None: return None uid = part_element.get("UId") @@ -165,23 +181,28 @@ def parse_part(part_element): f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}" ) return None + template_values = {} try: - for tv in part_element.xpath("./*[local-name()='TemplateValue']"): + # TemplateValue parece NO tener namespace flg + for tv in part_element.xpath("./TemplateValue"): tv_name = tv.get("Name") tv_type = tv.get("Type") if tv_name and tv_type: template_values[tv_name] = tv_type except Exception as e: print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}") + negated_pins = {} try: - for negated_elem in part_element.xpath("./*[local-name()='Negated']"): + # Negated parece NO tener namespace flg + for negated_elem in part_element.xpath("./Negated"): negated_pin_name = negated_elem.get("Name") if negated_pin_name: negated_pins[negated_pin_name] = True except Exception as e: print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}") + return { "uid": uid, "type": name, @@ -191,7 +212,7 @@ def parse_part(part_element): def parse_call(call_element): - # (Mantiene la corrección para DB de instancia) + # Asume que Call está en namespace flg if call_element is None: return None uid = call_element.get("UId") @@ -200,43 +221,58 @@ def parse_call(call_element): f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}" ) return None - call_info_elem = call_element.xpath("./*[local-name()='CallInfo']") + + # << CORRECCIÓN: CallInfo y sus hijos están en el namespace por defecto (flg) >> + call_info_elem = call_element.xpath("./flg:CallInfo", namespaces=ns) if not call_info_elem: - print(f"Error: Call UID {uid} sin elemento CallInfo.") - return None - call_info = call_info_elem[0] + print(f"Error: Call UID {uid} sin elemento flg:CallInfo.") + # Intentar sin namespace como fallback por si acaso + call_info_elem_no_ns = call_element.xpath("./CallInfo") + if not call_info_elem_no_ns: + print( + f"Error: Call UID {uid} sin elemento CallInfo (probado sin NS tambien)." + ) + return None + else: + # Si se encontró sin NS, usar ese (menos probable pero posible) + print(f"Advertencia: Call UID {uid} encontró CallInfo SIN namespace.") + call_info = call_info_elem_no_ns[0] + + else: + call_info = call_info_elem[0] # Usar el encontrado con namespace + block_name = call_info.get("Name") block_type = call_info.get("BlockType") - instance_name = None - instance_scope = None if not block_name or not block_type: print(f"Error: CallInfo para UID {uid} sin Name o BlockType.") return None + + instance_name = None + instance_scope = None + # Buscar Instance y Component (que también deberían estar en namespace flg) if block_type == "FB": - instance_elem_list = call_info.xpath("./*[local-name()='Instance']") + instance_elem_list = call_info.xpath("./flg:Instance", namespaces=ns) if instance_elem_list: instance_elem = instance_elem_list[0] instance_scope = instance_elem.get("Scope") - component_elem_list = instance_elem.xpath( - "./*[local-name()='Component']" - ) # Busca Component directo + # Buscar Component dentro de Instance + component_elem_list = instance_elem.xpath("./flg:Component", namespaces=ns) if component_elem_list: component_elem = component_elem_list[0] db_name_raw = component_elem.get("Name") if db_name_raw: - instance_name = f'"{db_name_raw}"' # Añade comillas + instance_name = f'"{db_name_raw}"' # Añadir comillas else: print( - f"Advertencia: dentro de para FB Call UID {uid} no tiene atributo 'Name'." + f"Advertencia: en FB Call UID {uid} sin 'Name'." ) else: print( - f"Advertencia: No se encontró dentro de para FB Call UID {uid}. No se pudo obtener el nombre del DB." + f"Advertencia: No se encontró en FB Call UID {uid}." ) else: - print( - f"Advertencia: FB Call '{block_name}' UID {uid} no tiene elemento ." - ) + print(f"Advertencia: FB Call '{block_name}' UID {uid} sin .") + call_data = { "uid": uid, "type": "Call", @@ -249,7 +285,6 @@ def parse_call(call_element): call_data["instance_scope"] = instance_scope return call_data -# ... (Incluye aquí las funciones reconstruct_scl_from_tokens, get_access_text, get_comment_text, reconstruct_stl_from_statementlist, parse_interface_members, parse_network SIN CAMBIOS) ... def reconstruct_scl_from_tokens(st_node): """ @@ -268,19 +303,17 @@ def reconstruct_scl_from_tokens(st_node): if tag == "Token": scl_parts.append(elem.get("Text", "")) elif tag == "Blank": - # Añadir espacios simples, evitar múltiples si ya hay uno antes/después if not scl_parts or not scl_parts[-1].endswith(" "): scl_parts.append(" " * int(elem.get("Num", 1))) - elif int(elem.get("Num", 1)) > 1: # Añadir extras si son más de 1 + elif int(elem.get("Num", 1)) > 1: scl_parts.append(" " * (int(elem.get("Num", 1)) - 1)) elif tag == "NewLine": - # Limpiar espacios antes del salto de línea real if scl_parts: scl_parts[-1] = scl_parts[-1].rstrip() scl_parts.append("\n") elif tag == "Access": scope = elem.get("Scope") - access_str = f"/*_ERR_Scope_{scope}_*/" # Fallback más informativo + access_str = f"/*_ERR_Scope_{scope}_*/" if scope in [ "GlobalVariable", @@ -290,38 +323,38 @@ def reconstruct_scl_from_tokens(st_node): "InputVariable", "OutputVariable", "ConstantVariable", - ]: # Tipos comunes de variables + ]: symbol_elem = elem.xpath("./st:Symbol", namespaces=ns) if symbol_elem: components = symbol_elem[0].xpath("./st:Component", namespaces=ns) symbol_text_parts = [] for i, comp in enumerate(components): name = comp.get("Name", "_ERR_COMP_") - # Añadir punto si no es el primer componente if i > 0: symbol_text_parts.append(".") - # Reconstrucción de comillas (heurística) + # Check for HasQuotes attribute (adjust namespace if needed) has_quotes_elem = comp.xpath( - "../st:BooleanAttribute[@Name='HasQuotes']/text()", + "ancestor::st:Access/st:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns, - ) + ) # Check attribute on Access parent + # print(f"DEBUG HasQuotes check for {name}: {has_quotes_elem}") # Debug has_quotes = ( has_quotes_elem and has_quotes_elem[0].lower() == "true" ) + is_temp = name.startswith("#") + # Apply quotes based on HasQuotes or if it's the first component and not temp if has_quotes or ( - i == 0 and not is_temp - ): # Comillas si HasQuotes o primer componente (no temp) + i == 0 and not is_temp and '"' not in name + ): # Avoid double quotes symbol_text_parts.append(f'"{name}"') else: symbol_text_parts.append(name) - # Manejar índices de array (RECURSIVO) index_access = comp.xpath("./st:Access", namespaces=ns) if index_access: - # Llama recursivamente para obtener el texto de cada índice indices_text = [ reconstruct_scl_from_tokens(idx_node) for idx_node in index_access @@ -339,136 +372,161 @@ def reconstruct_scl_from_tokens(st_node): type_elem = constant_elem[0].xpath( "./st:ConstantType/text()", namespaces=ns ) - const_type = type_elem[0] if type_elem else "" - const_val = val_elem[0] if val_elem else "_ERR_CONSTVAL_" + const_type = ( + type_elem[0].strip() + if type_elem and type_elem[0] is not None + else "" + ) + const_val = ( + val_elem[0].strip() + if val_elem and val_elem[0] is not None + else "_ERR_CONSTVAL_" + ) - # **CORRECCIÓN CLAVE**: Usar el valor extraído - access_str = const_val + # Format based on type + if const_type.lower() == "bool": + access_str = const_val.upper() + elif const_type.lower() == "string": + replaced_val = const_val.replace("'", "''") + access_str = f"'{replaced_val}'" + elif const_type.lower() == "char": + replaced_val = const_val.replace("'", "''") + access_str = f"'{replaced_val}'" + elif const_type.lower() == "time": + access_str = f"T#{const_val}" + elif const_type.lower() == "ltime": + access_str = f"LT#{const_val}" + elif const_type.lower() == "s5time": + access_str = f"S5T#{const_val}" + elif const_type.lower() == "date": + access_str = f"D#{const_val}" + elif const_type.lower() == "dtl": + access_str = f"DTL#{const_val}" + elif const_type.lower() == "dt": + access_str = f"DT#{const_val}" + elif const_type.lower() == "tod": + access_str = f"TOD#{const_val}" + else: + access_str = const_val # For Int, Real, etc. - # Opcional: añadir prefijos T#, L#, etc. si es necesario - # if const_type == "Time": access_str = f"T#{const_val}" - # elif const_type == "LTime": access_str = f"LT#{const_val}" - # ... otros tipos ... else: access_str = "/*_ERR_NOCONST_*/" - # --- Añadir más manejo de scopes aquí si es necesario --- - # elif scope == "Call": access_str = reconstruct_call(elem) - # elif scope == "Expression": access_str = reconstruct_expression(elem) + # Add more scope handling if needed scl_parts.append(access_str) elif tag == "Comment" or tag == "LineComment": - comment_text = "".join(elem.xpath(".//text()")).strip() + # Corrected comment extraction using get_multilingual_text + comment_text = get_multilingual_text( + elem + ) # Pass the or element itself if tag == "Comment": scl_parts.append(f"(* {comment_text} *)") else: scl_parts.append(f"// {comment_text}") - # else: Ignorar otros nodos - # Unir partes, limpiar espacios extra alrededor de operadores y saltos de línea full_scl = "".join(scl_parts) - # Re-indentar líneas después de IF/THEN, etc. (Simplificado) + # Re-indentation (simple approach) output_lines = [] indent_level = 0 + indent_str = " " # Two spaces per indent level for line in full_scl.split("\n"): - line = line.strip() - if not line: - continue # Saltar líneas vacías + trimmed_line = line.strip() + if not trimmed_line: + # output_lines.append("") # Keep empty lines for spacing? Optional. + continue - # Reducir indentación antes de procesar END_IF, ELSE, etc. (simplificado) - if line.startswith( - ("END_IF", "END_WHILE", "END_FOR", "END_CASE", "ELSE", "ELSIF") - ): + # Adjust indent before processing line + if trimmed_line.startswith(("END_", "UNTIL", "ELSE", "ELSIF")): indent_level = max(0, indent_level - 1) - output_lines.append(" " * indent_level + line) # Aplicar indentación + output_lines.append(indent_str * indent_level + trimmed_line) - # Aumentar indentación después de IF, WHILE, FOR, CASE, ELSE, ELSIF (simplificado) + # Adjust indent after processing line if ( - line.endswith("THEN") - or line.endswith("DO") - or line.endswith("OF") - or line == "ELSE" + trimmed_line.endswith(("THEN", "DO", "OF")) + or trimmed_line == "ELSE" + or trimmed_line.startswith("FOR") + or trimmed_line.startswith("WHILE") + or trimmed_line.startswith("CASE") + or trimmed_line.startswith("REPEAT") ): indent_level += 1 - # Nota: Esto no maneja bloques BEGIN/END dentro de SCL + # Handle BEGIN for block structures if necessary (more complex) return "\n".join(output_lines) -# STL (Statement List) Parser - - +# STL Parser (using namespace stl) def get_access_text(access_element): """Reconstruye una representación textual simple de un Access en STL.""" if access_element is None: return "_ERR_ACCESS_" scope = access_element.get("Scope") - # Intenta reconstruir el símbolo - # CORREGIDO: Añadido namespaces=ns symbol_elem = access_element.xpath("./stl:Symbol", namespaces=ns) if symbol_elem: - # CORREGIDO: Añadido namespaces=ns components = symbol_elem[0].xpath("./stl:Component", namespaces=ns) parts = [] - for comp in components: + for i, comp in enumerate(components): name = comp.get("Name", "_ERR_COMP_") - # CORREGIDO: Añadido namespaces=ns + # Check for HasQuotes attribute (usually on Access, check parent?) has_quotes_elem = comp.xpath( - "../stl:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns + "ancestor::stl:Access/stl:BooleanAttribute[@Name='HasQuotes']/text()", + namespaces=ns, ) has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true" + is_temp = name.startswith("#") - # Usar nombre tal cual por ahora - parts.append(name) + if i > 0: + parts.append(".") # Add dot separator + + if has_quotes or (i == 0 and not is_temp and '"' not in name): + parts.append(f'"{name}"') + else: + parts.append(name) - # Añadir índices si existen - # CORREGIDO: Añadido namespaces=ns index_access = comp.xpath("./stl:Access", namespaces=ns) if index_access: indices = [get_access_text(ia) for ia in index_access] parts.append(f"[{','.join(indices)}]") + return "".join(parts) - return ".".join(parts) - - # Intenta reconstruir constante - # CORREGIDO: Añadido namespaces=ns constant_elem = access_element.xpath("./stl:Constant", namespaces=ns) if constant_elem: - # CORREGIDO: Añadido namespaces=ns val_elem = constant_elem[0].xpath("./stl:ConstantValue/text()", namespaces=ns) - type_elem = constant_elem[0].xpath( - "./stl:ConstantType/text()", namespaces=ns - ) # Obtener tipo para mejor formato - const_type = type_elem[0] if type_elem else "" - const_val = val_elem[0] if val_elem else "_ERR_CONST_" - # Añadir prefijo de tipo si es necesario (ej. T# , L#) - Simplificado - if const_type == "Time": - return f"T#{const_val}" - if const_type == "ARef": - return f"{const_val}" # No necesita prefijo - # Añadir más tipos si es necesario - return const_val # Valor directo para otros tipos + type_elem = constant_elem[0].xpath("./stl:ConstantType/text()", namespaces=ns) + const_type = ( + type_elem[0].strip() if type_elem and type_elem[0] is not None else "" + ) + const_val = ( + val_elem[0].strip() + if val_elem and val_elem[0] is not None + else "_ERR_CONST_" + ) + + if const_type.lower() == "time": + return f"T#{const_val}" + if const_type.lower() == "s5time": + return f"S5T#{const_val}" + if const_type.lower() == "date": + return f"D#{const_val}" + if const_type.lower() == "dt": + return f"DT#{const_val}" # Added DT + # Add more type prefixes if needed (LTIME, TOD, DTL...) + return const_val - # Intenta reconstruir etiqueta - # CORREGIDO: Añadido namespaces=ns label_elem = access_element.xpath("./stl:Label", namespaces=ns) if label_elem: - name = label_elem[0].get("Name", "_ERR_LABEL_") - return name + return label_elem[0].get("Name", "_ERR_LABEL_") - # Intenta reconstruir acceso indirecto (simplificado) - # CORREGIDO: Añadido namespaces=ns indirect_elem = access_element.xpath("./stl:Indirect", namespaces=ns) if indirect_elem: reg = indirect_elem[0].get("Register", "AR?") offset_str = indirect_elem[0].get("BitOffset", "0") area = indirect_elem[0].get("Area", "DB") width = indirect_elem[0].get("Width", "X") - - # Convertir BitOffset a formato P#Byte.Bit try: bit_offset = int(offset_str) byte_offset = bit_offset // 8 @@ -476,37 +534,34 @@ def get_access_text(access_element): p_format_offset = f"P#{byte_offset}.{bit_in_byte}" except ValueError: p_format_offset = "P#?.?" - - # Formatear ancho - width_map = {"Bit": "X", "Byte": "B", "Word": "W", "Double": "D"} - width_char = width_map.get( - width, width[0] if width else "?" - ) # Usa primera letra si no mapeado - + width_map = { + "Bit": "X", + "Byte": "B", + "Word": "W", + "Double": "D", + "Long": "D", + } # Added Long->D + width_char = width_map.get(width, width[0] if width else "?") return f"{area}{width_char}[{reg},{p_format_offset}]" - # Intenta reconstruir dirección absoluta - # CORREGIDO: Añadido namespaces=ns address_elem = access_element.xpath("./stl:Address", namespaces=ns) if address_elem: area = address_elem[0].get("Area", "??") bit_offset_str = address_elem[0].get("BitOffset", "0") - addr_type_str = address_elem[0].get("Type", "Bool") # Obtener tipo para ancho + addr_type_str = address_elem[0].get("Type", "Bool") try: bit_offset = int(bit_offset_str) byte_offset = bit_offset // 8 bit_in_byte = bit_offset % 8 - # Determinar ancho basado en tipo (simplificación) - addr_width = "X" # Default a Bit + addr_width = "X" # Default if addr_type_str == "Byte": addr_width = "B" elif addr_type_str == "Word": addr_width = "W" - elif addr_type_str in ["DWord", "DInt"]: - addr_width = "D" - # Añadir más tipos si es necesario (Real, etc.) - - # Mapear Area para STL estándar + elif addr_type_str in ["DWord", "DInt", "Real", "Time", "DT"]: + addr_width = "D" # Added types + elif addr_type_str in ["LReal", "LTime", "LWord", "LInt", "ULInt"]: + addr_width = "L" # Handle 64-bit? Assume L? Needs check. area_map = { "Input": "I", "Output": "Q", @@ -515,164 +570,153 @@ def get_access_text(access_element): "PeripheryOutput": "PQ", "DB": "DB", "DI": "DI", - "Local": "L", # L no siempre válido aquí + "Local": "L", "Timer": "T", "Counter": "C", } stl_area = area_map.get(area, area) - # Manejar DB/DI que necesitan número de bloque if stl_area in ["DB", "DI"]: block_num = address_elem[0].get("BlockNumber") if block_num: - return f"{stl_area}{block_num}.{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DB1.DBX0.1 - else: # Acceso con registro DB/DI - return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DBX0.1 + return f"{stl_area}{block_num}.{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" + else: + return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Register access DBX, DIX etc. elif stl_area in ["T", "C"]: - return f"{stl_area}{byte_offset}" # Los timers/contadores solo usan el número - else: # I, Q, M, L, PI, PQ - return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: M10.1, I0.0 + return f"{stl_area}{byte_offset}" # T 5, C 10 + else: + return ( + f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # M10.1, I0.0 + ) except ValueError: return f"{area}?{bit_offset_str}?" + call_info_elem = access_element.xpath("./stl:CallInfo", namespaces=ns) + if call_info_elem: # Handle Call as operand (e.g., CALL FC10) + name = call_info_elem[0].get("Name", "_ERR_CALL_") + btype = call_info_elem[0].get("BlockType", "FC") + instance_node = call_info_elem[0].xpath( + "./stl:Instance/stl:Component/@Name", namespaces=ns + ) + if btype == "FB" and instance_node: + return f'"{instance_node[0]}"' # Return DB name for FB call operand + else: + return f'{btype} "{name}"' # Return FC "Name" or similar + return f"_{scope}_?" # Fallback -def get_comment_text(comment_element): - """Extrae texto de un LineComment o Comment.""" +def get_comment_text_stl(comment_element): + """Extrae texto de un LineComment o Comment para STL.""" if comment_element is None: return "" - # Usar get_multilingual_text si los comentarios son multilingües - # Si no, extraer texto directamente - ml_texts = comment_element.xpath( - ".//mlt:MultilingualTextItem/mlt:AttributeList/mlt:Text/text()", - namespaces={ - "mlt": "http://www.siemens.com/automation/Openness/SW/Interface/v5" - }, - ) # Asumiendo ns - if ml_texts: - # Podrías intentar obtener un idioma específico o simplemente el primero - return ml_texts[0].strip() if ml_texts else "" - - # Fallback a texto directo si no hay estructura multilingüe - text_nodes = comment_element.xpath("./text()") - return "".join(text_nodes).strip() + # STL Comments are directly under the element, not usually Multilingual + text_nodes = comment_element.xpath("./stl:Text/text()", namespaces=ns) + if text_nodes: + return text_nodes[0].strip() + # Fallback if structure is different + # return "".join(comment_element.xpath(".//text()")).strip() + return "" # Return empty if no found def reconstruct_stl_from_statementlist(statement_list_node): """Reconstruye el código STL como una cadena de texto desde .""" if statement_list_node is None: return "// Error: StatementList node not found.\n" - stl_lines = [] - # CORREGIDO: Añadido namespaces=ns statements = statement_list_node.xpath("./stl:StlStatement", namespaces=ns) for stmt in statements: line_parts = [] - line_comment = "" # Comentario al final de la línea + inline_comment = "" # Comments after code on the same line - # 1. Comentarios al inicio de la línea (como líneas separadas //) - # CORREGIDO: Añadido namespaces=ns + # 1. Initial Comments (full line //) initial_comments = stmt.xpath( - "child::stl:Comment | child::stl:LineComment", namespaces=ns + "child::stl:Comment[not(@Inserted='true')] | child::stl:LineComment[not(@Inserted='true')]", + namespaces=ns, ) for comm in initial_comments: - comment_text = get_comment_text(comm) + comment_text = get_comment_text_stl(comm) if comment_text: - # Dividir comentarios multilínea en varias líneas // for comment_line in comment_text.splitlines(): - stl_lines.append(f"// {comment_line}") + stl_lines.append( + f"// {comment_line}" + ) # Add as separate comment lines - # 2. Etiqueta (si existe) - # CORREGIDO: Añadido namespaces=ns + # 2. Label label_decl = stmt.xpath("./stl:LabelDeclaration", namespaces=ns) label_str = "" if label_decl: - # CORREGIDO: Añadido namespaces=ns - label_name_nodes = label_decl[0].xpath("./stl:Label/@Name", namespaces=ns) - if label_name_nodes: - label_str = f"{label_name_nodes[0]}:" - # Buscar comentarios DENTRO de LabelDeclaration pero después de Label - # CORREGIDO: Añadido namespaces=ns - label_comments = label_decl[0].xpath( - "./stl:Comment | ./stl:LineComment", namespaces=ns - ) - for lcomm in label_comments: - comment_text = get_comment_text(lcomm) - if comment_text: - line_comment += ( - f" // {comment_text}" # Añadir al comentario de línea - ) + label_name = label_decl[0].xpath("./stl:Label/@Name", namespaces=ns) + if label_name: + label_str = f"{label_name[0]}:" + # Get comments after label but before instruction + label_comments = label_decl[0].xpath( + "./stl:Comment[@Inserted='true'] | ./stl:LineComment[@Inserted='true']", + namespaces=ns, + ) + for lcomm in label_comments: + inline_comment += f" // {get_comment_text_stl(lcomm)}" - # 3. Token de Instrucción STL - # CORREGIDO: Añadido namespaces=ns + if label_str: + line_parts.append(label_str) + + # 3. Instruction Token instruction_token = stmt.xpath("./stl:StlToken", namespaces=ns) instruction_str = "" if instruction_token: token_text = instruction_token[0].get("Text", "_ERR_TOKEN_") - instruction_str = token_text - # Comentarios asociados directamente al token - # CORREGIDO: Añadido namespaces=ns + # Check if it's an empty line marker + if token_text == "EMPTY_LINE": + stl_lines.append("") # Add an empty line + continue # Skip rest of processing for this statement + elif token_text == "COMMENT": # Handle full-line comment marker if needed + pass # Already handled by initial comments? Check XML example. + else: + instruction_str = token_text + + # Comments directly associated with the token token_comments = instruction_token[0].xpath( - "./stl:Comment | ./stl:LineComment", namespaces=ns + "./stl:Comment[@Inserted='true'] | ./stl:LineComment[@Inserted='true']", + namespaces=ns, ) for tcomm in token_comments: - comment_text = get_comment_text(tcomm) - if comment_text: - line_comment += ( - f" // {comment_text}" # Añadir al comentario de línea - ) + inline_comment += f" // {get_comment_text_stl(tcomm)}" - # 4. Acceso/Operando STL - # CORREGIDO: Añadido namespaces=ns + if instruction_str: + # Add tab if label exists + line_parts.append("\t" + instruction_str if label_str else instruction_str) + + # 4. Access/Operand access_elem = stmt.xpath("./stl:Access", namespaces=ns) access_str = "" if access_elem: access_text = get_access_text(access_elem[0]) access_str = access_text - # Comentarios DENTRO del Access (pueden ser de línea o bloque) - # CORREGIDO: Añadido namespaces=ns + # Comments inside Access (can be block or line) access_comments = access_elem[0].xpath( - "child::stl:LineComment | child::stl:Comment", namespaces=ns + "child::stl:Comment[@Inserted='true'] | child::stl:LineComment[@Inserted='true']", + namespaces=ns, ) for acc_comm in access_comments: - comment_text = get_comment_text(acc_comm) - if comment_text: - line_comment += ( - f" // {comment_text}" # Añadir al comentario de línea - ) + inline_comment += f" // {get_comment_text_stl(acc_comm)}" - # Construir la línea: Etiqueta (si hay) + Tab + Instrucción + Espacio + Operando (si hay) + Comentario(s) - current_line = "" - if label_str: - current_line += label_str - if instruction_str: - if current_line: # Si ya había etiqueta, añadir tabulador - current_line += "\t" - current_line += instruction_str if access_str: - if current_line: # Si ya había algo, añadir espacio - current_line += " " - current_line += access_str - if line_comment: - # Añadir espacio antes del comentario si hay código en la línea - if current_line.strip(): - current_line += f" {line_comment}" - else: # Si la línea estaba vacía (solo comentarios iniciales), poner el comentario de línea - current_line = line_comment + line_parts.append(access_str) + + # Build the line + current_line = " ".join(line_parts) # Join parts with space + if inline_comment: + current_line += f"\t{inline_comment.strip()}" # Add comment with tab - # Añadir la línea construida solo si no está vacía if current_line.strip(): - stl_lines.append(current_line.rstrip()) # Eliminar espacios finales + stl_lines.append(current_line.rstrip()) return "\n".join(stl_lines) -# DB Parser - - +# DB Parser (using namespace iface) def parse_interface_members(member_elements): """ Parsea recursivamente una lista de elementos de una interfaz o estructura. @@ -685,12 +729,12 @@ def parse_interface_members(member_elements): for member in member_elements: member_name = member.get("Name") member_dtype = member.get("Datatype") - member_remanence = member.get("Remanence", "NonRetain") # Default si no existe - member_accessibility = member.get("Accessibility", "Public") # Default + member_remanence = member.get("Remanence", "NonRetain") + member_accessibility = member.get("Accessibility", "Public") if not member_name or not member_dtype: print( - f"Advertencia: Miembro sin nombre o tipo de dato encontrado. Saltando." + "Advertencia: Miembro sin nombre o tipo de dato encontrado. Saltando." ) continue @@ -699,50 +743,41 @@ def parse_interface_members(member_elements): "datatype": member_dtype, "remanence": member_remanence, "accessibility": member_accessibility, - "start_value": None, # Para valores simples o structs/arrays inicializados globalmente + "start_value": None, "comment": None, - "children": [], # Para structs - "array_elements": {}, # Para arrays (índice -> valor) + "children": [], + "array_elements": {}, } - # Extraer comentario del miembro - # Usar namespace iface comment_node = member.xpath("./iface:Comment", namespaces=ns) if comment_node: - # Llama a get_multilingual_text que ya maneja el namespace iface internamente member_info["comment"] = get_multilingual_text(comment_node[0]) - # Extraer valor inicial (para tipos simples) - # Usar namespace iface start_value_node = member.xpath("./iface:StartValue", namespaces=ns) if start_value_node: constant_name = start_value_node[0].get("ConstantName") - if constant_name: - member_info["start_value"] = constant_name - else: - member_info["start_value"] = ( + member_info["start_value"] = ( + constant_name + if constant_name + else ( start_value_node[0].text if start_value_node[0].text is not None else "" ) + ) - # --- Manejar Structs Anidados --- - # Usar namespace iface + # --- Structs Anidados --- nested_sections = member.xpath( "./iface:Sections/iface:Section/iface:Member", namespaces=ns ) if nested_sections: - member_info["children"] = parse_interface_members( - nested_sections - ) # Llamada recursiva + member_info["children"] = parse_interface_members(nested_sections) - # --- Manejar Arrays --- - if member_dtype.lower().startswith("array["): - # Usar namespace iface + # --- Arrays --- + if isinstance(member_dtype, str) and member_dtype.lower().startswith("array["): subelements = member.xpath("./iface:Subelement", namespaces=ns) for sub in subelements: - path = sub.get("Path") - # Usar namespace iface + path = sub.get("Path") # Path is usually the index '0', '1', ... sub_start_value_node = sub.xpath("./iface:StartValue", namespaces=ns) if path and sub_start_value_node: constant_name = sub_start_value_node[0].get("ConstantName") @@ -756,38 +791,27 @@ def parse_interface_members(member_elements): ) ) member_info["array_elements"][path] = value - else: - # Usar namespace iface - sub_comment_node = sub.xpath("./iface:Comment", namespaces=ns) - if path and sub_comment_node: - # member_info["array_comments"][path] = get_multilingual_text(sub_comment_node[0]) - pass + # Optionally parse subelement comments if needed members_data.append(member_info) - return members_data -# --- Main Parsing Function --- - - +# --- Main Network Parsing Function --- def parse_network(network_element): """ - Parsea una red, extrae lógica y añade conexiones EN implícitas. - Maneja wires con múltiples destinos. (Función original adaptada para namespaces) + Parsea una red LAD/FBD, extrae lógica y añade conexiones EN implícitas. + Devuelve None o un diccionario con 'error' si falla FlgNet. """ if network_element is None: return { "id": "ERROR", "title": "Invalid Network Element", - "comment": "", "logic": [], "error": "Input element was None", } network_id = network_element.get("ID") - - # Extracción Título/Comentario (usar namespace iface para MultilingualText) title_element = network_element.xpath( ".//iface:MultilingualText[@CompositionName='Title']", namespaces=ns ) @@ -796,40 +820,48 @@ def parse_network(network_element): if title_element else f"Network {network_id}" ) - # Asume que el comentario está en ObjectList dentro de CompileUnit comment_element = network_element.xpath( - "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" - ) + "./ObjectList/MultilingualText[@CompositionName='Comment']", namespaces=ns + ) # Corrected path? network_comment = ( get_multilingual_text(comment_element[0]) if comment_element else "" ) - # Buscar FlgNet usando namespace flg - flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) - if not flgnet_list: - # Intentar buscar directamente si está en la raíz (caso SCL/STL simple?) - # Esta parte puede necesitar ajuste si FlgNet no es el contenedor principal - # para lógica SCL/STL tokenizada. - # Si no hay FlgNet, no podemos parsear lógica LAD/FBD. - # El parseo de SCL/STL se hace en convert_xml_to_json, así que aquí devolvemos error si no hay FlgNet. - return { - "id": network_id, - "title": network_title, - "comment": network_comment, - "logic": [], - "language": "Unknown", # Podríamos intentar leer el lenguaje aquí también - "error": "FlgNet not found for LAD/FBD parsing", - } - flgnet = flgnet_list[0] + # Buscar NetworkSource y luego FlgNet (ambos usan namespace flg) + network_source_node = network_element.xpath(".//flg:NetworkSource", namespaces=ns) + if not network_source_node: + # Try finding FlgNet directly under CompileUnit if NetworkSource is missing (less common) + flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) + if not flgnet_list: + return { + "id": network_id, + "title": network_title, + "comment": network_comment, + "logic": [], + "error": "NetworkSource/FlgNet not found", + } + else: + flgnet = flgnet_list[0] + else: + flgnet_list = network_source_node[0].xpath("./flg:FlgNet", namespaces=ns) + if not flgnet_list: + return { + "id": network_id, + "title": network_title, + "comment": network_comment, + "logic": [], + "error": "FlgNet not found inside NetworkSource", + } + else: + flgnet = flgnet_list[0] - # 1. Parsear Access, Parts y Calls (llaman a funciones que ya usan ns) + # 1. Parse Access, Parts, Calls (use namespace flg) access_map = { acc_info["uid"]: acc_info - for acc in flgnet.xpath(".//flg:Access", namespaces=ns) # Usa ns + for acc in flgnet.xpath(".//flg:Access", namespaces=ns) if (acc_info := parse_access(acc)) and acc_info["type"] != "unknown" } parts_and_calls_map = {} - # Usa ns instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns) for element in instruction_elements: parsed_info = None @@ -837,24 +869,29 @@ def parse_network(network_element): if tag_name == "Part": parsed_info = parse_part(element) elif tag_name == "Call": - parsed_info = parse_call(element) + parsed_info = parse_call(element) # parse_call ahora busca flg:CallInfo + if parsed_info and "uid" in parsed_info: + # Verifica si parse_call tuvo éxito (si no, devuelve None) + if tag_name == "Call" and parsed_info is None: + print( + f"Advertencia: Falló el parseo de Call UID={element.get('UId')}. Ignorando." + ) + continue # Saltar esta instrucción si parse_call falló parts_and_calls_map[parsed_info["uid"]] = parsed_info - else: + elif tag_name == "Call" and parsed_info is None: + # Si parse_call devolvió None directamente print( - f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}" + f"Advertencia: Part/Call inválido ignorado en red {network_id} (UID={element.get('UId')})" ) - # 2. Parsear Wires (con namespaces) + # 2. Parse Wires (use namespace flg) wire_connections = defaultdict(list) source_connections = defaultdict(list) eno_outputs = defaultdict(list) - # Cachear QNames con namespace flg - flg_ns_uri = ns["flg"] - qname_powerrail = etree.QName(flg_ns_uri, "Powerrail") - qname_identcon = etree.QName(flg_ns_uri, "IdentCon") - qname_namecon = etree.QName(flg_ns_uri, "NameCon") - # Usa ns + qname_powerrail = etree.QName(ns["flg"], "Powerrail") + qname_identcon = etree.QName(ns["flg"], "IdentCon") + qname_namecon = etree.QName(ns["flg"], "NameCon") for wire in flgnet.xpath(".//flg:Wire", namespaces=ns): children = wire.getchildren() if len(children) < 2: @@ -864,34 +901,48 @@ def parse_network(network_element): if source_elem.tag == qname_powerrail: source_uid, source_pin = "POWERRAIL", "out" elif source_elem.tag == qname_identcon: - source_uid, source_pin = source_elem.get("UId"), "value" + source_uid, source_pin = ( + source_elem.get("UId"), + "value", + ) # IdentCon usually represents an Access node output elif source_elem.tag == qname_namecon: source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name") if source_uid is None: continue + source_info = (source_uid, source_pin) for dest_elem in children[1:]: dest_uid, dest_pin = None, None + # Destination can also be IdentCon (Access node input) or NameCon (Instruction pin input) if dest_elem.tag == qname_identcon: - dest_uid, dest_pin = dest_elem.get("UId"), "value" + dest_uid, dest_pin = ( + dest_elem.get("UId"), + "value", + ) # Input to an Access node? Unlikely. Usually NameCon. Let's assume NameCon primarily for destination. elif dest_elem.tag == qname_namecon: dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name") + if dest_uid is not None and dest_pin is not None: dest_key = (dest_uid, dest_pin) + # Check if dest_uid is an instruction or an access node + # This logic seems okay, maps source to destination key if source_info not in wire_connections[dest_key]: wire_connections[dest_key].append(source_info) + + # Build reverse map: source -> list of destinations source_key = (source_uid, source_pin) dest_info = (dest_uid, dest_pin) if dest_info not in source_connections[source_key]: source_connections[source_key].append(dest_info) + + # Track ENO outputs specifically if source_pin == "eno" and source_uid in parts_and_calls_map: if dest_info not in eno_outputs[source_uid]: eno_outputs[source_uid].append(dest_info) - # 3. Construcción Lógica Inicial (sin cambios en lógica, pero verificar llamadas) + # 3. Build Initial Logic Structure all_logic_steps = {} - # Define SCL_SUFFIX, por ejemplo, "_scl" o "_sympy_processed" - SCL_SUFFIX = "_sympy_processed" # Asegúrate que esto coincida con x2_process.py + SCL_SUFFIX = "_sympy_processed" # Define suffix functional_block_types = [ "Move", "Add", @@ -910,7 +961,7 @@ def parse_network(network_element): "CTU", "CTD", "CTUD", - ] # Añadidos timers/counters SCL + ] rlo_generators = [ "Contact", "O", @@ -925,109 +976,152 @@ def parse_network(network_element): "PBox", "NBox", "Not", - ] # Añadido Not - for instruction_uid, instruction_info in parts_and_calls_map.items(): - instruction_repr = {"instruction_uid": instruction_uid, **instruction_info} + ] + + # --- CORRECCIÓN: Iterar sobre los UIDs que SÍ están en parts_and_calls_map --- + # --- Esto evita procesar UIDs de Calls que fallaron en parse_call --- + valid_instruction_uids = list(parts_and_calls_map.keys()) + + for instruction_uid in valid_instruction_uids: + instruction_info = parts_and_calls_map[instruction_uid] + # Make a deep copy to avoid modifying the original map entry + instruction_repr = copy.deepcopy(instruction_info) + instruction_repr["instruction_uid"] = instruction_uid # Ensure UID is present instruction_repr["inputs"] = {} instruction_repr["outputs"] = {} - original_type = instruction_info["type"] + + original_type = instruction_repr["type"] # Type from parse_part/parse_call current_type = original_type input_pin_mapping = {} output_pin_mapping = {} - # --- Manejo Especial Tipos --- - if original_type == "SdCoil": - current_type = "Se" - input_pin_mapping = {"in": "s", "operand": "timer", "value": "tv"} - output_pin_mapping = {"out": "q"} - elif original_type in ["Se", "Sd", "TON", "TOF", "TP"]: - input_pin_mapping = { - "s": "s", - "in": "in", - "tv": "tv", - "pt": "pt", - "r": "r", - "timer": "timer", - } - output_pin_mapping = {"q": "q", "Q": "Q", "rt": "rt", "ET": "ET"} + + # Base set of possible pins - can be expanded + possible_input_pins = set(["en", "in", "in1", "in2", "pre"]) + + # Dynamically add pins based on instruction type (simplified list) + if original_type in ["Contact", "Coil", "SCoil", "RCoil"]: + possible_input_pins.add("operand") + elif original_type in [ + "Add", + "Sub", + "Mul", + "Div", + "Mod", + "Eq", + "Ne", + "Gt", + "Lt", + "Ge", + "Le", + ]: + possible_input_pins.update(["in1", "in2"]) + elif original_type in ["TON", "TOF", "TP", "Se", "Sd", "SdCoil"]: + possible_input_pins.update( + ["s", "tv", "r", "timer", "pt", "value", "operand"] + ) elif original_type in ["CTU", "CTD", "CTUD"]: - input_pin_mapping = { - "cu": "CU", - "cd": "CD", - "r": "R", - "ld": "LD", - "pv": "PV", - "counter": "counter", - } - output_pin_mapping = {"qu": "QU", "qd": "QD", "cv": "CV"} - instruction_repr["type"] = current_type - possible_input_pins = set( - [ - "en", - "in", - "in1", - "in2", - "s", - "r", - "tv", - "value", - "operand", - "timer", - "bit", - "clk", - "pv", - "cu", - "cd", - "ld", - "pre", - "SRCBLK", - "PT", - ] - ) # Añadido PT - for xml_pin_name in possible_input_pins: - dest_key = (instruction_uid, xml_pin_name) + possible_input_pins.update(["cu", "cd", "r", "ld", "pv", "counter"]) + elif original_type in ["PBox", "NBox"]: + possible_input_pins.update(["bit", "clk"]) + elif original_type == "BLKMOV": + possible_input_pins.add("SRCBLK") + + # Special Handling for Call Parameters + elif original_type == "Call": + # Find the original XML element for this call using the correct namespace + call_xml_element_list = flgnet.xpath( + f".//flg:Call[@UId='{instruction_uid}']", namespaces=ns + ) + if call_xml_element_list: + call_xml_element = call_xml_element_list[0] + # --- USAR flg:CallInfo y flg:Parameter --- + call_info_node_list = call_xml_element.xpath( + "./flg:CallInfo", namespaces=ns + ) + if call_info_node_list: + call_info_node = call_info_node_list[0] + call_param_names = call_info_node.xpath( + "./flg:Parameter/@Name", namespaces=ns + ) + if call_param_names: + possible_input_pins.update(call_param_names) + # print(f"DEBUG Call UID={instruction_uid}: Found params: {call_param_names}. Possible pins now: {possible_input_pins}") + else: + print( + f"Advertencia: Call UID={instruction_uid}: No tags found under ." + ) + else: + # Try without namespace as fallback? Unlikely needed if flg is default + call_info_node_list_no_ns = call_xml_element.xpath("./CallInfo") + if call_info_node_list_no_ns: + print( + f"Advertencia: Call UID={instruction_uid}: Found WITHOUT namespace." + ) + call_param_names_no_ns = call_info_node_list_no_ns[0].xpath( + "./Parameter/@Name" + ) + if call_param_names_no_ns: + possible_input_pins.update(call_param_names_no_ns) + print( + f"DEBUG Call UID={instruction_uid} (no NS): Found params: {call_param_names_no_ns}. Possible pins now: {possible_input_pins}" + ) + else: + print( + f"Advertencia: Call UID={instruction_uid}: No tags found under (no NS)." + ) + + else: + print( + f"Error: Call UID={instruction_uid}: No (or CallInfo) element found." + ) + else: + print( + f"Error: No se pudo encontrar el elemento para UID={instruction_uid} en el XPath." + ) + + # Populate Inputs from Wire Connections + for pin_name in possible_input_pins: + dest_key = (instruction_uid, pin_name) if dest_key in wire_connections: sources_list = wire_connections[dest_key] input_sources_repr = [] + # print(f"DEBUG Wire Input: Instr={instruction_uid}, Pin={pin_name}, Sources={sources_list}") # Debug for source_uid, source_pin in sources_list: if source_uid == "POWERRAIL": input_sources_repr.append({"type": "powerrail"}) elif source_uid in access_map: - input_sources_repr.append(access_map[source_uid]) - elif source_uid in parts_and_calls_map: + input_sources_repr.append(copy.deepcopy(access_map[source_uid])) + elif ( + source_uid in parts_and_calls_map + ): # Check if source is a valid instruction source_instr_info = parts_and_calls_map[source_uid] - source_original_type = source_instr_info["type"] - source_output_mapping = {} - if source_original_type == "SdCoil": - source_output_mapping = {"out": "q"} - elif source_original_type in ["Se", "Sd", "TON", "TOF", "TP"]: - source_output_mapping = { - "q": "q", - "Q": "Q", - "rt": "rt", - "ET": "ET", - } - elif source_original_type in ["CTU", "CTD", "CTUD"]: - source_output_mapping = {"qu": "QU", "qd": "QD", "cv": "CV"} - mapped_source_pin = source_output_mapping.get( - source_pin, source_pin - ) input_sources_repr.append( { "type": "connection", - "source_instruction_type": source_original_type, + "source_instruction_type": source_instr_info["type"], "source_instruction_uid": source_uid, - "source_pin": mapped_source_pin, + "source_pin": source_pin, # Use the actual source pin name } ) else: + # Source UID not found in instructions or access nodes + print( + f"Advertencia: Fuente desconocida UID={source_uid} conectada a {instruction_uid}.{pin_name}" + ) input_sources_repr.append( {"type": "unknown_source", "uid": source_uid} ) - json_pin_name = input_pin_mapping.get(xml_pin_name, xml_pin_name) - if len(input_sources_repr) == 1: - instruction_repr["inputs"][json_pin_name] = input_sources_repr[0] - elif len(input_sources_repr) > 1: - instruction_repr["inputs"][json_pin_name] = input_sources_repr + + # Apply input pin mapping if needed (e.g. SdCoil) + json_pin_name = input_pin_mapping.get(pin_name, pin_name) + instruction_repr["inputs"][json_pin_name] = ( + input_sources_repr[0] + if len(input_sources_repr) == 1 + else input_sources_repr + ) + # print(f"DEBUG Populated Input: Instr={instruction_uid}, Pin={json_pin_name}, Value={instruction_repr['inputs'][json_pin_name]}") # Debug + + # Populate Outputs (Simplified - just record direct variable assignments) possible_output_pins = set( [ "out", @@ -1038,161 +1132,128 @@ def parse_network(network_element): "RET_VAL", "DSTBLK", "rt", - "rtbcd", "cv", - "cvbcd", "QU", "QD", "ET", ] - ) # Añadido ET - for xml_pin_name in possible_output_pins: - source_key = (instruction_uid, xml_pin_name) + ) + if original_type == "BLKMOV": + possible_output_pins.add("DSTBLK") + + for pin_name in possible_output_pins: + source_key = (instruction_uid, pin_name) if source_key in source_connections: - json_pin_name = output_pin_mapping.get(xml_pin_name, xml_pin_name) + json_pin_name = output_pin_mapping.get(pin_name, pin_name) if json_pin_name not in instruction_repr["outputs"]: instruction_repr["outputs"][json_pin_name] = [] for dest_uid, dest_pin in source_connections[source_key]: - if dest_uid in access_map: + if ( + dest_uid in access_map + ): # Only track connections to variables/constants + dest_operand_copy = copy.deepcopy(access_map[dest_uid]) if ( - access_map[dest_uid] + dest_operand_copy not in instruction_repr["outputs"][json_pin_name] ): instruction_repr["outputs"][json_pin_name].append( - access_map[dest_uid] + dest_operand_copy ) + all_logic_steps[instruction_uid] = instruction_repr - # 4. Inferencia EN (sin cambios en lógica) + # 4. EN Inference (Simplified logic as before) + # --- (Esta sección puede permanecer igual, opera sobre all_logic_steps) --- processed_blocks_en_inference = set() - something_changed = True - inference_passes = 0 - max_inference_passes = len(all_logic_steps) + 5 try: sorted_uids_for_en = sorted( all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float("inf"), ) except ValueError: - sorted_uids_for_en = sorted(all_logic_steps.keys()) + sorted_uids_for_en = sorted(all_logic_steps.keys()) # Fallback sort + ordered_logic_list_for_en = [ all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps ] - while something_changed and inference_passes < max_inference_passes: - something_changed = False - inference_passes += 1 - for i, instruction in enumerate(ordered_logic_list_for_en): - part_uid = instruction["instruction_uid"] - part_type_original = ( - instruction["type"].replace(SCL_SUFFIX, "").replace("_error", "") - ) # Usa SCL_SUFFIX - if ( - part_type_original in functional_block_types - and "en" not in instruction["inputs"] - and part_uid not in processed_blocks_en_inference - ): - inferred_en_source = None - if i > 0: - for j in range(i - 1, -1, -1): - prev_instr = ordered_logic_list_for_en[j] - prev_uid = prev_instr["instruction_uid"] - prev_type_original = ( - prev_instr["type"] - .replace(SCL_SUFFIX, "") - .replace("_error", "") - ) - if prev_type_original in rlo_generators: + + for i, instruction in enumerate(ordered_logic_list_for_en): + part_uid = instruction["instruction_uid"] + # Leer el tipo actual de la instrucción ya parseada + part_type_original = ( + instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "") + ) + # La lógica de inferencia EN no cambia + if ( + part_type_original in functional_block_types + and "en" not in instruction["inputs"] + and part_uid not in processed_blocks_en_inference + ): + inferred_en_source = None + if i > 0: + # Look backwards + for j in range(i - 1, -1, -1): + prev_instr = ordered_logic_list_for_en[j] + prev_uid = prev_instr["instruction_uid"] + prev_type_original = ( + prev_instr.get("type", "") + .replace(SCL_SUFFIX, "") + .replace("_error", "") + ) + if prev_type_original in rlo_generators: # Found RLO source + inferred_en_source = { + "type": "connection", + "source_instruction_uid": prev_uid, + "source_instruction_type": prev_type_original, + "source_pin": "out", + } + break + elif ( + prev_type_original in functional_block_types + ): # Found block with potential ENO + if (prev_uid, "eno") in source_connections: inferred_en_source = { "type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, - "source_pin": "out", + "source_pin": "eno", } - break - elif prev_type_original in functional_block_types: - source_key_eno = (prev_uid, "eno") - if source_key_eno in source_connections: - inferred_en_source = { - "type": "connection", - "source_instruction_uid": prev_uid, - "source_instruction_type": prev_type_original, - "source_pin": "eno", - } - break - else: - continue - elif prev_type_original in [ - "Coil", - "SCoil", - "RCoil", - "SetCoil", - "ResetCoil", - "SdCoil", - ]: - break - if inferred_en_source: - all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source - processed_blocks_en_inference.add(part_uid) - something_changed = True + break # Stop searching + elif prev_type_original in [ + "Coil", + "SCoil", + "RCoil", + "SetCoil", + "ResetCoil", + "SdCoil", + ]: + break # Coils terminate flow - # 5. Añadir lógica ENO interesante (sin cambios en lógica) + if inferred_en_source is None: + inferred_en_source = {"type": "powerrail"} + + # Update the instruction in the main dictionary + if part_uid in all_logic_steps: + all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source + processed_blocks_en_inference.add(part_uid) + + # 5. ENO Logic (Simplified as before) + # --- (Esta sección puede permanecer igual) --- for source_instr_uid, eno_destinations in eno_outputs.items(): if source_instr_uid not in all_logic_steps: continue - interesting_eno_logic = [] - for dest_uid, dest_pin in eno_destinations: - is_direct_en_connection = False - if dest_uid in parts_and_calls_map and dest_pin == "en": - try: - source_idx = sorted_uids_for_en.index(source_instr_uid) - dest_idx = sorted_uids_for_en.index(dest_uid) - if ( - dest_idx == source_idx + 1 - and parts_and_calls_map[dest_uid]["type"] - in functional_block_types - ): - is_direct_en_connection = True - except ValueError: - pass - if not is_direct_en_connection: - target_info = {"target_pin": dest_pin} - if dest_uid in parts_and_calls_map: - target_info.update( - { - "target_type": "instruction", - "target_uid": dest_uid, - "target_name": parts_and_calls_map[dest_uid].get( - "name", parts_and_calls_map[dest_uid].get("type") - ), - } - ) - elif dest_uid in access_map: - target_info.update( - { - "target_type": "operand", - "target_details": access_map[dest_uid], - } - ) - else: - target_info.update( - {"target_type": "unknown", "target_uid": dest_uid} - ) - interesting_eno_logic.append(target_info) - if interesting_eno_logic: - all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic + all_logic_steps[source_instr_uid]["eno_destinations"] = eno_destinations - # 6. Ordenar y Devolver - network_logic_final = [ + # 6. Order and Return + # --- (Esta sección puede permanecer igual) --- + final_logic_list = [ all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps ] - # Determinar lenguaje de la red para devolverlo network_lang = "Unknown" if network_element is not None: - attr_list_net = network_element.xpath("./*[local-name()='AttributeList']") + attr_list_net = network_element.xpath("./AttributeList") if attr_list_net: - lang_node_net = attr_list_net[0].xpath( - "./*[local-name()='ProgrammingLanguage']/text()" - ) + lang_node_net = attr_list_net[0].xpath("./ProgrammingLanguage/text()") if lang_node_net: network_lang = lang_node_net[0].strip() @@ -1201,11 +1262,13 @@ def parse_network(network_element): "title": network_title, "comment": network_comment, "language": network_lang, - "logic": network_logic_final, + "logic": final_logic_list, } -# --- Main Conversion Function --- +# --- Main Conversion Function (convert_xml_to_json) --- +# --- (Mantén la función convert_xml_to_json como antes, --- +# --- asegurándote de que llama a la versión actualizada de parse_network) --- def convert_xml_to_json(xml_filepath, json_filepath): print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") if not os.path.exists(xml_filepath): @@ -1218,16 +1281,19 @@ def convert_xml_to_json(xml_filepath, json_filepath): root = tree.getroot() print("Paso 1: Parseo XML completado.") - # --- MODIFICADO: Buscar FC, FB, GlobalDB o OB --- - print("Paso 2: Buscando el bloque SW.Blocks.FC, SW.Blocks.FB, SW.Blocks.GlobalDB o SW.Blocks.OB...") - block_list = root.xpath("//*[local-name()='SW.Blocks.FC' or local-name()='SW.Blocks.FB' or local-name()='SW.Blocks.GlobalDB' or local-name()='SW.Blocks.OB']") # <-- Añadido OB + # --- Buscar bloque principal (FC, FB, GlobalDB, OB) --- + print( + "Paso 2: Buscando el bloque SW.Blocks.FC, SW.Blocks.FB, SW.Blocks.GlobalDB o SW.Blocks.OB..." + ) + block_list = root.xpath( + "//*[local-name()='SW.Blocks.FC' or local-name()='SW.Blocks.FB' or local-name()='SW.Blocks.GlobalDB' or local-name()='SW.Blocks.OB']" + ) block_type_found = None the_block = None if block_list: the_block = block_list[0] - # Obtener el nombre real de la etiqueta encontrada block_tag_name = etree.QName(the_block.tag).localname if block_tag_name == "SW.Blocks.FC": block_type_found = "FC" @@ -1235,42 +1301,39 @@ def convert_xml_to_json(xml_filepath, json_filepath): block_type_found = "FB" elif block_tag_name == "SW.Blocks.GlobalDB": block_type_found = "GlobalDB" - elif block_tag_name == "SW.Blocks.OB": # <-- Añadido caso OB - block_type_found = "OB" # <-- Establecer tipo OB - print(f"Paso 2: Bloque {block_tag_name} encontrado (ID={the_block.get('ID')}).") + elif block_tag_name == "SW.Blocks.OB": + block_type_found = "OB" + print( + f"Paso 2: Bloque {block_tag_name} encontrado (ID={the_block.get('ID')})." + ) else: - # Mensaje de error actualizado - print("Error Crítico: No se encontró el elemento raíz del bloque (, , o ) usando XPath.") - # --- Añadir Debugging --- - print(f"DEBUG: Tag del elemento raíz del XML: {root.tag}") - print(f"DEBUG: Primeros hijos del raíz:") - for i, child in enumerate(root.getchildren()): - if i < 5: # Imprimir solo los primeros 5 para no saturar - print(f"DEBUG: - Hijo {i+1}: {child.tag}") - else: - print("DEBUG: - ... (más hijos)") - break - # --- Fin Debugging --- - return # Salir si no se encuentra el bloque principal + print( + "Error Crítico: No se encontró el elemento raíz del bloque (, , o ) usando XPath." + ) + # ... (debug info) ... + return + # --- Extraer atributos del bloque --- print("Paso 3: Extrayendo atributos del bloque...") - attribute_list_node = the_block.xpath("./*[local-name()='AttributeList']") + # AttributeList no parece tener namespace en los ejemplos + attribute_list_node = the_block.xpath("./AttributeList") block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown" if attribute_list_node: attr_list = attribute_list_node[0] - name_node = attr_list.xpath("./*[local-name()='Name']/text()") + # Name, Number, ProgrammingLanguage no parecen tener namespace + name_node = attr_list.xpath("./Name/text()") block_name_val = name_node[0].strip() if name_node else block_name_val - num_node = attr_list.xpath("./*[local-name()='Number']/text()") + num_node = attr_list.xpath("./Number/text()") try: block_number_val = int(num_node[0]) if num_node else None except ValueError: block_number_val = None - lang_node = attr_list.xpath( - "./*[local-name()='ProgrammingLanguage']/text()" + lang_node = attr_list.xpath("./ProgrammingLanguage/text()") + block_lang_val = ( + lang_node[0].strip() + if lang_node + else ("DB" if block_type_found == "GlobalDB" else "Unknown") ) - # Para DBs, el lenguaje principal es 'DB'. Para OBs/FCs/FBs puede ser SCL, STL, LAD, FBD etc. - block_lang_val = lang_node[0].strip() if lang_node else ("DB" if block_type_found == "GlobalDB" else "Unknown") - print( f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'" ) @@ -1278,79 +1341,92 @@ def convert_xml_to_json(xml_filepath, json_filepath): print( f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}." ) - # Asignar 'DB' como lenguaje si es un GlobalDB y no se encontró explícitamente if block_type_found == "GlobalDB": block_lang_val = "DB" + # --- Extraer comentario del bloque --- + # ObjectList y MultilingualText no parecen tener namespace block_comment_val = "" comment_node_list = the_block.xpath( - "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" + "./ObjectList/MultilingualText[@CompositionName='Comment']" ) if comment_node_list: - block_comment_val = get_multilingual_text(comment_node_list[0]) + block_comment_val = get_multilingual_text( + comment_node_list[0] + ) # Usa namespaces iface internamente print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'") - # --- MODIFICADO: Añadir block_type al resultado --- + # --- Crear diccionario resultado --- result = { "block_name": block_name_val, "block_number": block_number_val, - "language": block_lang_val, # Lenguaje del bloque (SCL, LAD, DB, etc.) - "block_type": block_type_found, # Tipo de bloque (FC, FB, GlobalDB, OB) + "language": block_lang_val, + "block_type": block_type_found, "block_comment": block_comment_val, "interface": {}, "networks": [], } + # --- Extraer interfaz --- print("Paso 4: Extrayendo la interfaz del bloque...") - # La estructura de la interfaz suele ser la misma para FC/FB/OB y la sección Static para DB - # Si Interface está dentro de AttributeList - interface_node_list = attribute_list_node[0].xpath(".//*[local-name()='Interface']") if attribute_list_node else [] + # Interface está dentro de AttributeList, no tiene namespace. Sections/Member sí usan iface. + interface_node_list = ( + attribute_list_node[0].xpath("./Interface") if attribute_list_node else [] + ) if interface_node_list: interface_node = interface_node_list[0] print("Paso 4: Nodo Interface encontrado.") - # Usar parse_interface_members para extraer todas las secciones - # Esta función ya maneja structs anidados y arrays + # Sections/Section/Member usan namespace iface all_sections = interface_node.xpath(".//iface:Section", namespaces=ns) - for section in all_sections: - section_name = section.get("Name") - if not section_name: - continue - # Obtener los miembros directos de esta sección - # Asegurarse de no obtener miembros de secciones anidadas accidentalmente - members_in_section = section.xpath("./iface:Member", namespaces=ns) - if members_in_section: - result["interface"][section_name] = parse_interface_members(members_in_section) + if all_sections: + for section in all_sections: + section_name = section.get("Name") + if not section_name: + continue + members_in_section = section.xpath("./iface:Member", namespaces=ns) + if members_in_section: + result["interface"][section_name] = parse_interface_members( + members_in_section + ) + else: + print( + "Advertencia: Nodo Interface no contiene secciones ." + ) if not result["interface"]: - print("Advertencia: Interface encontrada pero sin secciones iface:Section válidas.") - else: - # Para GlobalDB, la interfaz podría no estar explícita o ser solo la sección Static + print( + "Advertencia: Interface encontrada pero sin secciones procesables." + ) + else: # Manejo especial para DB si no hay if block_type_found == "GlobalDB": - # Intentar buscar directamente los miembros bajo Sections/Section Name="Static" - static_members = the_block.xpath(".//iface:Section[@Name='Static']/iface:Member", namespaces=ns) - if static_members: - print("Paso 4: Encontrada sección Static para GlobalDB.") - result["interface"]["Static"] = parse_interface_members(static_members) - else: - print("Advertencia: No se encontró sección 'Static' para GlobalDB.") - else: # FC/FB/OB - print(f"Advertencia: No se encontró para bloque {block_type_found}.") + static_members = the_block.xpath( + ".//iface:Section[@Name='Static']/iface:Member", namespaces=ns + ) + if static_members: + print("Paso 4: Encontrada sección Static para GlobalDB.") + result["interface"]["Static"] = parse_interface_members( + static_members + ) + else: + print("Advertencia: No se encontró sección 'Static' para GlobalDB.") + else: + print( + f"Advertencia: No se encontró para bloque {block_type_found}." + ) if not result["interface"]: - print("Advertencia: No se pudo extraer información de la interfaz.") - + print("Advertencia: No se pudo extraer información de la interfaz.") + # --- Procesar redes (CompileUnits) --- print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...") networks_processed_count = 0 - result["networks"] = [] # Initialize networks list here - object_list_node = the_block.xpath("./*[local-name()='ObjectList']") + result["networks"] = [] + # ObjectList no parece tener namespace, SW.Blocks.CompileUnit tampoco + object_list_node = the_block.xpath("./ObjectList") if object_list_node: - # Buscar CompileUnits (para FC/FB/OB) - compile_units = object_list_node[0].xpath( - "./*[local-name()='SW.Blocks.CompileUnit']" - ) + compile_units = object_list_node[0].xpath("./SW.Blocks.CompileUnit") print( f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit." ) @@ -1359,202 +1435,152 @@ def convert_xml_to_json(xml_filepath, json_filepath): networks_processed_count += 1 network_id = network_elem.get("ID") if not network_id: - print(" Advertencia: Se encontró CompileUnit sin ID. Saltando.") continue - # --- Detectar lenguaje de la red --- - attribute_list = network_elem.xpath("./*[local-name()='AttributeList']") - programming_language = "LAD" # Default a LAD si no se especifica - network_source_node = None # Nodo - + # Detectar lenguaje de la red (AttributeList/ProgrammingLanguage sin namespace) + attribute_list = network_elem.xpath("./AttributeList") + programming_language = "LAD" # Default if attribute_list: - lang_node = attribute_list[0].xpath( - "./*[local-name()='ProgrammingLanguage']/text()" - ) + lang_node = attribute_list[0].xpath("./ProgrammingLanguage/text()") if lang_node: programming_language = lang_node[0].strip() - # Obtener el nodo NetworkSource para pasarlo a los parsers - network_source_list = attribute_list[0].xpath( - "./*[local-name()='NetworkSource']" - ) - if network_source_list: - network_source_node = network_source_list[0] print( f" - Procesando Red ID={network_id}, Lenguaje={programming_language}" ) - # --- Extraer título y comentario (común) --- - title_element = network_elem.xpath( - ".//*[local-name()='MultilingualText'][@CompositionName='Title']" - ) - network_title = ( - get_multilingual_text(title_element[0]) - if title_element - else f"Network {network_id}" - ) - - comment_element = network_elem.xpath( - "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" - ) - network_comment = ( - get_multilingual_text(comment_element[0]) if comment_element else "" - ) - - # --- Procesar según el lenguaje --- + # Procesar según lenguaje parsed_network_data = None - if programming_language == "SCL": + if programming_language in ["LAD", "FBD", "GRAPH"]: + # Llamar a parse_network (que ahora maneja errores de FlgNet) + parsed_network_data = parse_network(network_elem) + if parsed_network_data and not parsed_network_data.get("error"): + parsed_network_data["language"] = programming_language + elif parsed_network_data and parsed_network_data.get("error"): + print( + f" Error parseando Red {network_id}: {parsed_network_data['error']}" + ) + # Mantener la red con el error para x2/x3 + parsed_network_data["language"] = ( + programming_language # Asegurar lenguaje + ) + else: # parse_network devolvió None (error inesperado) + print( + f" Error fatal: parse_network devolvió None para Red {network_id}" + ) + parsed_network_data = { + "id": network_id, + "language": programming_language, + "logic": [], + "error": "parse_network failed", + } + + elif programming_language == "SCL": + network_source_node = network_elem.xpath( + ".//flg:NetworkSource", namespaces=ns + ) # NetworkSource sí usa flg structured_text_node = ( - network_source_node.xpath("./st:StructuredText", namespaces=ns) - if network_source_node is not None + network_source_node[0].xpath( + "./st:StructuredText", namespaces=ns + ) + if network_source_node else None ) - - reconstructed_scl = f"// SCL extraction failed for Network {network_id}: StructuredText node not found.\n" + reconstructed_scl = f"// SCL extraction failed: Node not found.\n" if structured_text_node: - print( - f" Reconstruyendo SCL desde tokens para red {network_id}..." - ) reconstructed_scl = reconstruct_scl_from_tokens( structured_text_node[0] ) - # print(f" ... SCL reconstruido (parcial):\n{reconstructed_scl[:200]}...") # Preview opcional - else: - print( - f" Advertencia: No se encontró nodo para red SCL {network_id}." - ) - parsed_network_data = { "id": network_id, - "title": network_title, - "comment": network_comment, "language": "SCL", "logic": [ { - "instruction_uid": f"SCL_{network_id}", # UID inventado + "instruction_uid": f"SCL_{network_id}", "type": "RAW_SCL_CHUNK", "scl": reconstructed_scl, } ], } - # --- NUEVO MANEJO STL --- elif programming_language == "STL": + network_source_node = network_elem.xpath( + ".//flg:NetworkSource", namespaces=ns + ) statement_list_node = ( - network_source_node.xpath("./stl:StatementList", namespaces=ns) - if network_source_node is not None + network_source_node[0].xpath( + "./stl:StatementList", namespaces=ns + ) + if network_source_node else None ) - - reconstructed_stl = f"// STL extraction failed for Network {network_id}: StatementList node not found.\n" + reconstructed_stl = f"// STL extraction failed: Node not found.\n" if statement_list_node: - print( - f" Reconstruyendo STL desde StatementList para red {network_id}..." - ) - # Llama a la nueva función de reconstrucción STL reconstructed_stl = reconstruct_stl_from_statementlist( statement_list_node[0] ) - # print(f" ... STL reconstruido (parcial):\n{reconstructed_stl[:200]}...") # Preview opcional - else: - print( - f" Advertencia: No se encontró nodo para red STL {network_id}." - ) - - # Guardar como un chunk de texto crudo parsed_network_data = { "id": network_id, - "title": network_title, - "comment": network_comment, - "language": "STL", # Indicar que es STL + "language": "STL", "logic": [ { - "instruction_uid": f"STL_{network_id}", # UID inventado - "type": "RAW_STL_CHUNK", # Nuevo tipo para identificarlo - "stl": reconstructed_stl, # Guardar el texto reconstruido + "instruction_uid": f"STL_{network_id}", + "type": "RAW_STL_CHUNK", + "stl": reconstructed_stl, } ], } - elif programming_language in ["LAD", "FBD", "GRAPH"]: # GRAPH también usa FlgNet - # Para LAD/FBD/GRAPH, llamar a parse_network - # Nota: parse_network espera el *CompileUnit* element, no el NetworkSource - parsed_network_data = parse_network(network_elem) - if parsed_network_data: - parsed_network_data["language"] = ( - programming_language # Asegurar que el lenguaje se guarda - ) - if parsed_network_data.get("error"): - # Si parse_network devuelve error (ej. no encontró FlgNet), lo registramos - print( - f" Error al parsear red {programming_language} ID={network_id}: {parsed_network_data['error']}" - ) - # Si es un error esperado para este lenguaje (ej. GRAPH sin FlgNet?), creamos placeholder - if "FlgNet not found" in parsed_network_data.get("error", ""): - parsed_network_data = { - "id": network_id, - "title": network_title, - "comment": network_comment, - "language": programming_language, - "logic": [{"instruction_uid": f"PLC_{network_id}", "type": "UNSUPPORTED_CONTENT", "info": f"Contenido {programming_language} sin FlgNet"}], - "error": parsed_network_data.get("error") # Mantener el error original - } - else: - print(f" Red {programming_language} ID={network_id} parseada.") - - else: # parse_network devolvió None (error interno) - print( - f" Error: parse_network devolvió None para red {programming_language} ID={network_id}" - ) - # Crear placeholder de error - parsed_network_data = { - "id": network_id, - "title": network_title, - "comment": network_comment, - "language": programming_language, - "logic": [{"instruction_uid": f"ERR_{network_id}", "type": "PARSING_ERROR", "info": "parse_network returned None"}], - "error": "parse_network returned None" - } - - - else: - # Manejar otros lenguajes o casos inesperados - print( - f" Advertencia: Lenguaje no soportado o inesperado '{programming_language}' en red ID={network_id}. Creando placeholder." - ) + else: # Lenguaje no soportado parsed_network_data = { "id": network_id, - "title": network_title, - "comment": network_comment, "language": programming_language, "logic": [ { "instruction_uid": f"UNS_{network_id}", "type": "UNSUPPORTED_LANG", - "info": f"Network {network_id} uses unsupported language: {programming_language}", + "info": f"Language {programming_language} not supported", } ], - "error": f"Unsupported language: {programming_language}" + "error": "Unsupported language", } - # Añadir la red procesada (si es válida) al resultado + # Añadir título y comentario a la red parseada if parsed_network_data: + title_element = network_elem.xpath( + ".//iface:MultilingualText[@CompositionName='Title']", + namespaces=ns, + ) + parsed_network_data["title"] = ( + get_multilingual_text(title_element[0]) + if title_element + else f"Network {network_id}" + ) + comment_element = network_elem.xpath( + "./ObjectList/MultilingualText[@CompositionName='Comment']", + namespaces=ns, + ) # Path relativo a CompileUnit + parsed_network_data["comment"] = ( + get_multilingual_text(comment_element[0]) + if comment_element + else "" + ) result["networks"].append(parsed_network_data) - # --- Fin del bucle for network_elem --- - if networks_processed_count == 0 and block_type_found != "GlobalDB": print( - f"Advertencia: ObjectList para bloque {block_type_found} no contenía elementos SW.Blocks.CompileUnit." + f"Advertencia: ObjectList para {block_type_found} sin SW.Blocks.CompileUnit." ) - # Para DBs, no esperamos CompileUnits elif block_type_found == "GlobalDB": print("Paso 5: Saltando búsqueda de CompileUnits para GlobalDB.") - else: # No se encontró ObjectList - print(f"Advertencia: No se encontró ObjectList para el bloque {block_type_found}.") - + else: + print( + f"Advertencia: No se encontró ObjectList para el bloque {block_type_found}." + ) + # --- Escribir JSON --- print("Paso 6: Escribiendo el resultado en el archivo JSON...") + # ... (resto del código de escritura y manejo de errores igual) ... if not result["interface"]: print("ADVERTENCIA FINAL: 'interface' está vacía.") if not result["networks"] and block_type_found != "GlobalDB": @@ -1570,18 +1596,7 @@ def convert_xml_to_json(xml_filepath, json_filepath): ) except TypeError as e: print(f"Error Crítico: Problema al serializar a JSON. Error: {e}") - print("--- Datos problemáticos (parcial) ---") - # Intentar imprimir partes del diccionario para depurar - for k, v in result.items(): - try: - json.dumps({k: v}) # Intentar serializar cada parte - except TypeError: - print(f"Error serializando clave '{k}': {type(v)}") - if isinstance(v, list) and v: - print(f" Primer elemento tipo: {type(v[0])}") - elif isinstance(v, dict) and v: - print(f" Primeras claves: {list(v.keys())[:5]}") - print("--- Fin Datos problemáticos ---") + # ... (debug de serialización) ... except etree.XMLSyntaxError as e: print( @@ -1589,39 +1604,35 @@ def convert_xml_to_json(xml_filepath, json_filepath): ) except Exception as e: print(f"Error Crítico: Error inesperado durante la conversión: {e}") - print("--- Traceback ---") traceback.print_exc() - print("--- Fin Traceback ---") if __name__ == "__main__": - # Imports necesarios solo para la ejecución como script principal + # --- (La sección __main__ permanece igual que en la respuesta anterior) --- import argparse import os import sys + import traceback - # Configurar ArgumentParser para recibir la ruta del XML obligatoria parser = argparse.ArgumentParser( - description="Convert Simatic XML (LAD/FBD/SCL/STL/OB/DB) to simplified JSON. Expects XML filepath as argument." # Actualizada descripción + description="Convert Simatic XML (LAD/FBD/SCL/STL/OB/DB) to simplified JSON. Expects XML filepath as argument." ) parser.add_argument( - "xml_filepath", # Argumento posicional obligatorio + "xml_filepath", help="Path to the input XML file passed from the main script (x0_main.py).", ) - args = parser.parse_args() # Parsea los argumentos de sys.argv + args = parser.parse_args() + xml_input_file = args.xml_filepath - xml_input_file = args.xml_filepath # Obtiene la ruta del argumento - - # Verificar si el archivo de entrada existe (es una buena práctica aunque x0 lo haga) if not os.path.exists(xml_input_file): - print(f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'") - sys.exit(1) # Salir si el archivo no existe + print( + f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'", + file=sys.stderr, + ) + sys.exit(1) - # Derivar nombre base para archivo de salida JSON - # El archivo JSON se guardará en el mismo directorio que el XML de entrada xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0] - output_dir = os.path.dirname(xml_input_file) # Directorio del XML de entrada - # Asegurarse de que el directorio de salida exista (aunque debería si el XML existe) + output_dir = os.path.dirname(xml_input_file) os.makedirs(output_dir, exist_ok=True) json_output_file = os.path.join(output_dir, f"{xml_filename_base}_simplified.json") @@ -1629,12 +1640,13 @@ if __name__ == "__main__": f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'" ) - # Llamar a la función principal de conversión del script try: convert_xml_to_json(xml_input_file, json_output_file) + sys.exit(0) # Éxito except Exception as e: - print(f"Error Crítico (x1) durante la conversión de '{xml_input_file}': {e}") - import traceback # Asegurarse de que traceback está importado aquí - - traceback.print_exc() - sys.exit(1) # Salir con error si la función principal falla \ No newline at end of file + print( + f"Error Crítico (x1) durante la conversión de '{xml_input_file}': {e}", + file=sys.stderr, + ) + traceback.print_exc(file=sys.stderr) + sys.exit(1) # Fallo