From 6c604176a1cb4bfe776a30220373b0bc3a215ab3 Mon Sep 17 00:00:00 2001 From: Miguel Date: Sun, 20 Apr 2025 03:22:13 +0200 Subject: [PATCH] Exportando DBs tambien --- paste.py | 273 +++++++++++++++++ x0_main.py | 107 ++++--- x1_to_json.py | 683 +++++++++++++++++++++++++++---------------- x2_process.py | 485 ++++++++++++++++++++----------- x3_generate_scl.py | 707 +++++++++++++++++++++++++++++---------------- 5 files changed, 1549 insertions(+), 706 deletions(-) create mode 100644 paste.py diff --git a/paste.py b/paste.py new file mode 100644 index 0000000..3410e82 --- /dev/null +++ b/paste.py @@ -0,0 +1,273 @@ +# x3_generate_scl.py +# -*- coding: utf-8 -*- +import json +import os +import re +import argparse +import sys +import traceback + +# --- Importar Utilidades (mantener como estaba) --- +try: + from processors.processor_utils import format_variable_name + SCL_SUFFIX = "_sympy_processed" + GROUPED_COMMENT = "// Logic included in grouped IF" +except ImportError: + print("Advertencia: No se pudo importar 'format_variable_name'. Usando fallback.") + def format_variable_name(name): # Fallback BÁSICO + if not name: return "_INVALID_NAME_" + if name.startswith('"') and name.endswith('"'): return name + prefix = "#" if name.startswith("#") else "" + if prefix: name = name[1:] + if name and name[0].isdigit(): name = "_" + name + name = re.sub(r"[^a-zA-Z0-9_]", "_", name) + return prefix + name + SCL_SUFFIX = "_sympy_processed" + GROUPED_COMMENT = "// Logic included in grouped IF" + +# --- NUEVA FUNCIÓN para formatear valores iniciales SCL --- +def format_scl_start_value(value, datatype): + """Formatea un valor para la inicialización SCL según el tipo.""" + if value is None: return None + datatype_lower = datatype.lower() if datatype else "" + value_str = str(value) + + if "bool" in datatype_lower: + return "TRUE" if value_str.lower() == 'true' else "FALSE" + elif "string" in datatype_lower: + escaped_value = value_str.replace("'", "''") + if escaped_value.startswith("'") and escaped_value.endswith("'"): escaped_value = escaped_value[1:-1] + return f"'{escaped_value}'" + elif "char" in datatype_lower: # Añadido Char + escaped_value = value_str.replace("'", "''") + if escaped_value.startswith("'") and escaped_value.endswith("'"): escaped_value = escaped_value[1:-1] + return f"'{escaped_value}'" + elif any(t in datatype_lower for t in ["int", "byte", "word", "dint", "dword", "lint", "lword", "sint", "usint", "uint", "udint", "ulint"]): # Ampliado + try: return str(int(value_str)) + except ValueError: + if re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', value_str): return value_str + return f"'{value_str}'" # O como string si no es entero ni símbolo + elif "real" in datatype_lower or "lreal" in datatype_lower: + try: + f_val = float(value_str); s_val = str(f_val) + if '.' not in s_val and 'e' not in s_val.lower(): s_val += ".0" + return s_val + except ValueError: + if re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', value_str): return value_str + return f"'{value_str}'" + elif "time" in datatype_lower: # Añadido Time, S5Time, LTime + # Quitar T#, LT#, S5T# si existen + prefix = "" + if value_str.upper().startswith("T#"): prefix="T#"; value_str = value_str[2:] + elif value_str.upper().startswith("LT#"): prefix="LT#"; value_str = value_str[3:] + elif value_str.upper().startswith("S5T#"): prefix="S5T#"; value_str = value_str[4:] + # Devolver con el prefijo correcto o T# por defecto si no había + if prefix: return f"{prefix}{value_str}" + elif "s5time" in datatype_lower: return f"S5T#{value_str}" + elif "ltime" in datatype_lower: return f"LT#{value_str}" + else: return f"T#{value_str}" # Default a TIME + elif "date" in datatype_lower: # Añadido Date, DT, TOD + if value_str.upper().startswith("D#"): return value_str + elif "dt" in datatype_lower or "date_and_time" in datatype_lower: + if value_str.upper().startswith("DT#"): return value_str + else: return f"DT#{value_str}" # Añadir prefijo DT# + elif "tod" in datatype_lower or "time_of_day" in datatype_lower: + if value_str.upper().startswith("TOD#"): return value_str + else: return f"TOD#{value_str}" # Añadir prefijo TOD# + else: return f"D#{value_str}" # Default a Date + # Fallback genérico + else: + if re.match(r'^[a-zA-Z_][a-zA-Z0-9_."#\[\]]+$', value_str): # Permitir más caracteres en símbolos/tipos + # Si es un UDT o Struct complejo, podría venir con comillas, quitarlas + if value_str.startswith('"') and value_str.endswith('"'): + return value_str[1:-1] + return value_str + else: + escaped_value = value_str.replace("'", "''") + return f"'{escaped_value}'" + +# --- NUEVA FUNCIÓN RECURSIVA para generar declaraciones SCL (VAR/STRUCT/ARRAY) --- + +# --- Función Principal de Generación SCL (MODIFICADA) --- +def generate_scl(processed_json_filepath, output_scl_filepath): + """Genera un archivo SCL a partir del JSON procesado (FC/FB o DB).""" + + if not os.path.exists(processed_json_filepath): + print(f"Error: Archivo JSON procesado no encontrado en '{processed_json_filepath}'") + return + + print(f"Cargando JSON procesado desde: {processed_json_filepath}") + try: + with open(processed_json_filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + except Exception as e: + print(f"Error al cargar o parsear JSON: {e}"); traceback.print_exc(); return + + # --- Extracción de Información del Bloque (Común) --- + block_name = data.get('block_name', 'UnknownBlock') + block_number = data.get('block_number') + block_lang_original = data.get('language', 'Unknown') # Será "DB" para Data Blocks + block_type = data.get('block_type', 'Unknown') # FC, FB, GlobalDB + block_comment = data.get('block_comment', '') + scl_block_name = format_variable_name(block_name) # Nombre SCL seguro + print(f"Generando SCL para: {block_type} '{scl_block_name}' (Original: {block_name}, Lang: {block_lang_original})") + scl_output = [] + + # --- GENERACIÓN PARA DATA BLOCK (DB) --- + if block_lang_original == "DB": + print("Modo de generación: DATA_BLOCK") + scl_output.append(f"// Block Type: {block_type}") + scl_output.append(f"// Block Name (Original): {block_name}") + if block_number: scl_output.append(f"// Block Number: {block_number}") + if block_comment: scl_output.append(f"// Block Comment: {block_comment}") + scl_output.append("") + scl_output.append(f"DATA_BLOCK \"{scl_block_name}\"") + scl_output.append("{ S7_Optimized_Access := 'TRUE' }") # Asumir optimizado + scl_output.append("VERSION : 0.1") + scl_output.append("") + interface_data = data.get('interface', {}) + static_vars = interface_data.get('Static', []) + if static_vars: + scl_output.append("VAR") + scl_output.extend(generate_scl_declarations(static_vars, indent_level=1)) + scl_output.append("END_VAR") + scl_output.append("") + else: + print("Advertencia: No se encontró sección 'Static' o está vacía en la interfaz del DB.") + scl_output.append("VAR"); scl_output.append("END_VAR"); scl_output.append("") + scl_output.append("BEGIN"); scl_output.append(""); scl_output.append("END_DATA_BLOCK") + + # --- GENERACIÓN PARA FUNCTION BLOCK / FUNCTION (FC/FB) --- + else: + print("Modo de generación: FUNCTION_BLOCK / FUNCTION") + scl_block_keyword = "FUNCTION_BLOCK" if block_type == "FB" else "FUNCTION" + # Cabecera del Bloque + scl_output.append(f"// Block Type: {block_type}") + scl_output.append(f"// Block Name (Original): {block_name}") + if block_number: scl_output.append(f"// Block Number: {block_number}") + scl_output.append(f"// Original Language: {block_lang_original}") + if block_comment: scl_output.append(f"// Block Comment: {block_comment}") + scl_output.append("") + # Manejar tipo de retorno para FUNCTION + return_type = "Void" # Default + interface_data = data.get('interface', {}) + if scl_block_keyword == "FUNCTION" and interface_data.get('Return'): + return_member = interface_data['Return'][0] # Asumir un solo valor de retorno + return_type_raw = return_member.get('datatype', 'Void') + return_type = return_type_raw.strip('"') if return_type_raw.startswith('"') and return_type_raw.endswith('"') else return_type_raw + # Añadir comillas si es UDT + if return_type != return_type_raw: return_type = f'"{return_type}"' + + scl_output.append(f"{scl_block_keyword} \"{scl_block_name}\" : {return_type}" if scl_block_keyword == "FUNCTION" else f"{scl_block_keyword} \"{scl_block_name}\"") + scl_output.append("{ S7_Optimized_Access := 'TRUE' }") + scl_output.append("VERSION : 0.1"); scl_output.append("") + + # Declaraciones de Interfaz FC/FB + section_order = ["Input", "Output", "InOut", "Static", "Temp", "Constant"] # Return ya está en cabecera + declared_temps = set() + for section_name in section_order: + vars_in_section = interface_data.get(section_name, []) + if vars_in_section: + scl_section_keyword = f"VAR_{section_name.upper()}" + if section_name == "Static": scl_section_keyword = "VAR_STAT" + if section_name == "Temp": scl_section_keyword = "VAR_TEMP" + if section_name == "Constant": scl_section_keyword = "CONSTANT" + scl_output.append(scl_section_keyword) + scl_output.extend(generate_scl_declarations(vars_in_section, indent_level=1)) + if section_name == "Temp": declared_temps.update(format_variable_name(v.get("name")) for v in vars_in_section if v.get("name")) + scl_output.append("END_VAR"); scl_output.append("") + + # Declaraciones VAR_TEMP adicionales detectadas + temp_vars = set() + temp_pattern = re.compile(r'"?#(_temp_[a-zA-Z0-9_]+)"?|"?(_temp_[a-zA-Z0-9_]+)"?') + for network in data.get('networks', []): + for instruction in network.get('logic', []): + scl_code = instruction.get('scl', ''); edge_update_code = instruction.get('_edge_mem_update_scl','') + code_to_scan = (scl_code if scl_code else '') + '\n' + (edge_update_code if edge_update_code else '') + if code_to_scan: + found_temps = temp_pattern.findall(code_to_scan) + for temp_tuple in found_temps: + temp_name = next((t for t in temp_tuple if t), None) + if temp_name: temp_vars.add("#"+temp_name if not temp_name.startswith("#") else temp_name) + additional_temps = sorted(list(temp_vars - declared_temps)) + if additional_temps: + if not interface_data.get("Temp"): scl_output.append("VAR_TEMP") + for var_name in additional_temps: + scl_name = format_variable_name(var_name); inferred_type = "Bool" # Asumir Bool + scl_output.append(f" {scl_name} : {inferred_type}; // Auto-generated temporary") + if not interface_data.get("Temp"): scl_output.append("END_VAR"); scl_output.append("") + + # Cuerpo del Bloque FC/FB + scl_output.append("BEGIN"); scl_output.append("") + # Iterar por redes y lógica (como antes, incluyendo manejo STL Markdown) + for i, network in enumerate(data.get('networks', [])): + network_title = network.get('title', f'Network {network.get("id")}') + network_comment = network.get('comment', '') + network_lang = network.get('language', 'LAD') + scl_output.append(f" // Network {i+1}: {network_title} (Original Language: {network_lang})") + if network_comment: + for line in network_comment.splitlines(): scl_output.append(f" // {line}") + scl_output.append("") + network_has_code = False + if network_lang == "STL": + network_has_code = True + if network.get('logic') and network['logic'][0].get("type") == "RAW_STL_CHUNK": + raw_stl_code = network['logic'][0].get("stl", "// ERROR: STL code missing") + scl_output.append(f" {'//'} ```STL") + for stl_line in raw_stl_code.splitlines(): scl_output.append(f" {stl_line}") + scl_output.append(f" {'//'} ```") + else: scl_output.append(" // ERROR: Contenido STL inesperado.") + else: # LAD, FBD, SCL, etc. + for instruction in network.get('logic', []): + instruction_type = instruction.get("type", ""); scl_code = instruction.get('scl', ''); is_grouped = instruction.get("grouped", False) + if is_grouped: continue + if (instruction_type.endswith(SCL_SUFFIX) or instruction_type in ["RAW_SCL_CHUNK", "UNSUPPORTED_LANG"]) and scl_code: + is_only_comment = all(line.strip().startswith("//") for line in scl_code.splitlines() if line.strip()) + is_if_block = scl_code.strip().startswith("IF") + if not is_only_comment or is_if_block: + network_has_code = True + for line in scl_code.splitlines(): scl_output.append(f" {line}") + if network_has_code: scl_output.append("") + else: scl_output.append(f" // Network did not produce printable SCL code."); scl_output.append("") + # Fin del bloque FC/FB + scl_output.append(f"END_{scl_block_keyword}") + + # --- Escritura del Archivo SCL (Común) --- + print(f"Escribiendo archivo SCL en: {output_scl_filepath}") + try: + with open(output_scl_filepath, 'w', encoding='utf-8') as f: + for line in scl_output: f.write(line + '\n') + print("Generación de SCL completada.") + except Exception as e: + print(f"Error al escribir el archivo SCL: {e}"); traceback.print_exc() + + +# --- Ejecución (sin cambios) --- +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Generate final SCL file from processed JSON (FC/FB/DB).") + parser.add_argument("source_xml_filepath", help="Path to the original source XML file.") + args = parser.parse_args() + source_xml_file = args.source_xml_filepath + + if not os.path.exists(source_xml_file): + print(f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON procesado.") + # Continuar, pero verificar existencia del JSON procesado + + xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0] + base_dir = os.path.dirname(source_xml_file) + input_json_file = os.path.join(base_dir, f"{xml_filename_base}_simplified_processed.json") + output_scl_file = os.path.join(base_dir, f"{xml_filename_base}_simplified_processed.scl") + + print(f"(x3) Generando SCL: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_scl_file)}'") + + if not os.path.exists(input_json_file): + print(f"Error Fatal (x3): Archivo JSON procesado no encontrado: '{input_json_file}'") + print(f"Asegúrate de que 'x1_to_json.py' y 'x2_process.py' se ejecutaron correctamente para '{os.path.relpath(source_xml_file)}'.") + sys.exit(1) + else: + try: + generate_scl(input_json_file, output_scl_file) + except Exception as e: + print(f"Error Crítico (x3) durante la generación de SCL desde '{input_json_file}': {e}") + traceback.print_exc() + sys.exit(1) \ No newline at end of file diff --git a/x0_main.py b/x0_main.py index f14f918..950a973 100644 --- a/x0_main.py +++ b/x0_main.py @@ -3,7 +3,8 @@ import subprocess import os import sys import locale -import glob # <--- Importar glob para buscar archivos +import glob # <--- Importar glob para buscar archivos + # (Función get_console_encoding y variable CONSOLE_ENCODING como en la respuesta anterior) def get_console_encoding(): @@ -11,12 +12,14 @@ def get_console_encoding(): try: return locale.getpreferredencoding(False) except Exception: - return 'cp1252' + return "cp1252" + CONSOLE_ENCODING = get_console_encoding() # Descomenta la siguiente línea si quieres ver la codificación detectada: # print(f"Detected console encoding: {CONSOLE_ENCODING}") + # (Función run_script como en la respuesta anterior, usando CONSOLE_ENCODING) def run_script(script_name, xml_arg): """Runs a given script with the specified XML file argument.""" @@ -24,12 +27,14 @@ def run_script(script_name, xml_arg): command = [sys.executable, script_path, xml_arg] print(f"\n--- Running {script_name} with argument: {xml_arg} ---") try: - result = subprocess.run(command, - check=True, - capture_output=True, - text=True, - encoding=CONSOLE_ENCODING, - errors='replace') # 'replace' para evitar errores + result = subprocess.run( + command, + check=True, + capture_output=True, + text=True, + encoding=CONSOLE_ENCODING, + errors="replace", + ) # 'replace' para evitar errores # Imprimir stdout y stderr # Eliminar saltos de línea extra al final si existen @@ -49,26 +54,35 @@ def run_script(script_name, xml_arg): except subprocess.CalledProcessError as e: print(f"Error running {script_name}:") print(f"Return code: {e.returncode}") - stdout_decoded = e.stdout.decode(CONSOLE_ENCODING, errors='replace').strip() if isinstance(e.stdout, bytes) else (e.stdout or "").strip() - stderr_decoded = e.stderr.decode(CONSOLE_ENCODING, errors='replace').strip() if isinstance(e.stderr, bytes) else (e.stderr or "").strip() + stdout_decoded = ( + e.stdout.decode(CONSOLE_ENCODING, errors="replace").strip() + if isinstance(e.stdout, bytes) + else (e.stdout or "").strip() + ) + stderr_decoded = ( + e.stderr.decode(CONSOLE_ENCODING, errors="replace").strip() + if isinstance(e.stderr, bytes) + else (e.stderr or "").strip() + ) if stdout_decoded: - print("--- Stdout ---") - print(stdout_decoded) + print("--- Stdout ---") + print(stdout_decoded) if stderr_decoded: - print("--- Stderr ---") - print(stderr_decoded) + print("--- Stderr ---") + print(stderr_decoded) print("--------------") return False except Exception as e: print(f"An unexpected error occurred while running {script_name}: {e}") return False + # --- NUEVA FUNCIÓN PARA SELECCIONAR ARCHIVO --- def select_xml_file(): """Busca archivos .xml, los lista y pide al usuario que elija uno.""" print("No XML file specified. Searching for XML files in current directory...") # Buscar archivos .xml en el directorio actual (.) - xml_files = sorted(glob.glob('*.xml')) # sorted para orden alfabético + xml_files = sorted(glob.glob("*.xml")) # sorted para orden alfabético if not xml_files: print("Error: No .xml files found in the current directory.") @@ -80,7 +94,9 @@ def select_xml_file(): while True: try: - choice = input(f"Enter the number of the file to process (1-{len(xml_files)}): ") + choice = input( + f"Enter the number of the file to process (1-{len(xml_files)}): " + ) choice_num = int(choice) if 1 <= choice_num <= len(xml_files): selected_file = xml_files[choice_num - 1] @@ -90,9 +106,11 @@ def select_xml_file(): print("Invalid choice. Please enter a number from the list.") except ValueError: print("Invalid input. Please enter a number.") - except EOFError: # Manejar si la entrada se cierra inesperadamente - print("\nSelection cancelled.") - sys.exit(1) + except EOFError: # Manejar si la entrada se cierra inesperadamente + print("\nSelection cancelled.") + sys.exit(1) + + # --- FIN NUEVA FUNCIÓN --- @@ -100,30 +118,36 @@ if __name__ == "__main__": # Imports necesarios para esta sección import os import sys - import glob # Asegúrate de que glob esté importado al principio del archivo + import glob # Asegúrate de que glob esté importado al principio del archivo # Directorio base donde buscar los archivos XML (relativo al script) base_search_dir = "XML Project" - script_dir = os.path.dirname(__file__) # Directorio donde está x0_main.py + script_dir = os.path.dirname(__file__) # Directorio donde está x0_main.py xml_project_dir = os.path.join(script_dir, base_search_dir) print(f"Buscando archivos XML recursivamente en: '{xml_project_dir}'") # Verificar si el directorio 'XML Project' existe if not os.path.isdir(xml_project_dir): - print(f"Error: El directorio '{xml_project_dir}' no existe o no es un directorio.") - print("Por favor, crea el directorio 'XML Project' en la misma carpeta que este script y coloca tus archivos XML dentro.") + print( + f"Error: El directorio '{xml_project_dir}' no existe o no es un directorio." + ) + print( + "Por favor, crea el directorio 'XML Project' en la misma carpeta que este script y coloca tus archivos XML dentro." + ) sys.exit(1) # Buscar todos los archivos .xml recursivamente dentro de xml_project_dir # Usamos os.path.join para construir la ruta de búsqueda correctamente # y '**/*.xml' para la recursividad con glob - search_pattern = os.path.join(xml_project_dir, '**', '*.xml') + search_pattern = os.path.join(xml_project_dir, "**", "*.xml") xml_files_found = glob.glob(search_pattern, recursive=True) if not xml_files_found: - print(f"No se encontraron archivos XML en '{xml_project_dir}' o sus subdirectorios.") - sys.exit(0) # Salir limpiamente si no hay archivos + print( + f"No se encontraron archivos XML en '{xml_project_dir}' o sus subdirectorios." + ) + sys.exit(0) # Salir limpiamente si no hay archivos print(f"Se encontraron {len(xml_files_found)} archivos XML para procesar:") # Ordenar para un procesamiento predecible (opcional) @@ -141,7 +165,9 @@ if __name__ == "__main__": processed_count = 0 failed_count = 0 for xml_filepath in xml_files_found: - print(f"\n--- Iniciando pipeline para: {os.path.relpath(xml_filepath, script_dir)} ---") + print( + f"\n--- Iniciando pipeline para: {os.path.relpath(xml_filepath, script_dir)} ---" + ) # Usar la ruta absoluta para evitar problemas si los scripts cambian de directorio absolute_xml_filepath = os.path.abspath(xml_filepath) @@ -150,26 +176,37 @@ if __name__ == "__main__": # La función run_script ya está definida en tu script x0_main.py success = True if not run_script(script1, absolute_xml_filepath): - print(f"\nPipeline falló en el script '{script1}' para el archivo: {os.path.relpath(xml_filepath, script_dir)}") + print( + f"\nPipeline falló en el script '{script1}' para el archivo: {os.path.relpath(xml_filepath, script_dir)}" + ) success = False elif not run_script(script2, absolute_xml_filepath): - print(f"\nPipeline falló en el script '{script2}' para el archivo: {os.path.relpath(xml_filepath, script_dir)}") + print( + f"\nPipeline falló en el script '{script2}' para el archivo: {os.path.relpath(xml_filepath, script_dir)}" + ) success = False elif not run_script(script3, absolute_xml_filepath): - print(f"\nPipeline falló en el script '{script3}' para el archivo: {os.path.relpath(xml_filepath, script_dir)}") + print( + f"\nPipeline falló en el script '{script3}' para el archivo: {os.path.relpath(xml_filepath, script_dir)}" + ) success = False if success: - print(f"--- Pipeline completado exitosamente para: {os.path.relpath(xml_filepath, script_dir)} ---") + print( + f"--- Pipeline completado exitosamente para: {os.path.relpath(xml_filepath, script_dir)} ---" + ) processed_count += 1 else: failed_count += 1 - print(f"--- Pipeline falló para: {os.path.relpath(xml_filepath, script_dir)} ---") - + print( + f"--- Pipeline falló para: {os.path.relpath(xml_filepath, script_dir)} ---" + ) print("\n--- Resumen Final del Procesamiento ---") print(f"Total de archivos XML encontrados: {len(xml_files_found)}") - print(f"Archivos procesados exitosamente por el pipeline completo: {processed_count}") + print( + f"Archivos procesados exitosamente por el pipeline completo: {processed_count}" + ) print(f"Archivos que fallaron en algún punto del pipeline: {failed_count}") print("---------------------------------------") xml_filename = None @@ -217,4 +254,4 @@ if __name__ == "__main__": else: print("\nPipeline failed at script:", script2) else: - print("\nPipeline failed at script:", script1) \ No newline at end of file + print("\nPipeline failed at script:", script1) diff --git a/x1_to_json.py b/x1_to_json.py index 74e78d8..71ac19b 100644 --- a/x1_to_json.py +++ b/x1_to_json.py @@ -46,6 +46,7 @@ def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"): print(f"Advertencia: Error extrayendo MultilingualText: {e}") return "" + def get_symbol_name(symbol_element): # (Sin cambios respecto a la versión anterior) if symbol_element is None: @@ -57,6 +58,7 @@ def get_symbol_name(symbol_element): print(f"Advertencia: Excepción en get_symbol_name: {e}") return None + def parse_access(access_element): # (Sin cambios respecto a la versión anterior) if access_element is None: @@ -149,6 +151,7 @@ def parse_access(access_element): return info return info + def parse_part(part_element): # (Sin cambios respecto a la versión anterior) if part_element is None: @@ -184,6 +187,7 @@ def parse_part(part_element): "negated_pins": negated_pins, } + def parse_call(call_element): # (Mantiene la corrección para DB de instancia) if call_element is None: @@ -243,8 +247,10 @@ def parse_call(call_element): call_data["instance_scope"] = instance_scope return call_data + # SCL (Structured Text) Parser + def reconstruct_scl_from_tokens(st_node): """ Reconstruye SCL desde , mejorando el manejo de @@ -263,10 +269,10 @@ def reconstruct_scl_from_tokens(st_node): scl_parts.append(elem.get("Text", "")) elif tag == "Blank": # Añadir espacios simples, evitar múltiples si ya hay uno antes/después - if not scl_parts or not scl_parts[-1].endswith(' '): - scl_parts.append(" " * int(elem.get("Num", 1))) - elif int(elem.get("Num", 1)) > 1: # Añadir extras si son más de 1 - scl_parts.append(" " * (int(elem.get("Num", 1))-1)) + if not scl_parts or not scl_parts[-1].endswith(" "): + scl_parts.append(" " * int(elem.get("Num", 1))) + elif int(elem.get("Num", 1)) > 1: # Añadir extras si son más de 1 + scl_parts.append(" " * (int(elem.get("Num", 1)) - 1)) elif tag == "NewLine": # Limpiar espacios antes del salto de línea real if scl_parts: @@ -274,9 +280,17 @@ def reconstruct_scl_from_tokens(st_node): scl_parts.append("\n") elif tag == "Access": scope = elem.get("Scope") - access_str = f"/*_ERR_Scope_{scope}_*/" # Fallback más informativo + access_str = f"/*_ERR_Scope_{scope}_*/" # Fallback más informativo - if scope in ["GlobalVariable", "LocalVariable", "TempVariable", "InOutVariable", "InputVariable", "OutputVariable", "ConstantVariable"]: # Tipos comunes de variables + if scope in [ + "GlobalVariable", + "LocalVariable", + "TempVariable", + "InOutVariable", + "InputVariable", + "OutputVariable", + "ConstantVariable", + ]: # Tipos comunes de variables symbol_elem = elem.xpath("./st:Symbol", namespaces=ns) if symbol_elem: components = symbol_elem[0].xpath("./st:Component", namespaces=ns) @@ -288,11 +302,18 @@ def reconstruct_scl_from_tokens(st_node): symbol_text_parts.append(".") # Reconstrucción de comillas (heurística) - has_quotes_elem = comp.xpath("../st:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns) - has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true" - is_temp = name.startswith('#') + has_quotes_elem = comp.xpath( + "../st:BooleanAttribute[@Name='HasQuotes']/text()", + namespaces=ns, + ) + has_quotes = ( + has_quotes_elem and has_quotes_elem[0].lower() == "true" + ) + is_temp = name.startswith("#") - if has_quotes or (i == 0 and not is_temp): # Comillas si HasQuotes o primer componente (no temp) + if has_quotes or ( + i == 0 and not is_temp + ): # Comillas si HasQuotes o primer componente (no temp) symbol_text_parts.append(f'"{name}"') else: symbol_text_parts.append(name) @@ -300,17 +321,24 @@ def reconstruct_scl_from_tokens(st_node): # Manejar índices de array (RECURSIVO) index_access = comp.xpath("./st:Access", namespaces=ns) if index_access: - # Llama recursivamente para obtener el texto de cada índice - indices_text = [reconstruct_scl_from_tokens(idx_node) for idx_node in index_access] - symbol_text_parts.append(f"[{','.join(indices_text)}]") + # Llama recursivamente para obtener el texto de cada índice + indices_text = [ + reconstruct_scl_from_tokens(idx_node) + for idx_node in index_access + ] + symbol_text_parts.append(f"[{','.join(indices_text)}]") access_str = "".join(symbol_text_parts) elif scope == "LiteralConstant": constant_elem = elem.xpath("./st:Constant", namespaces=ns) if constant_elem: - val_elem = constant_elem[0].xpath("./st:ConstantValue/text()", namespaces=ns) - type_elem = constant_elem[0].xpath("./st:ConstantType/text()", namespaces=ns) + val_elem = constant_elem[0].xpath( + "./st:ConstantValue/text()", namespaces=ns + ) + type_elem = constant_elem[0].xpath( + "./st:ConstantType/text()", namespaces=ns + ) const_type = type_elem[0] if type_elem else "" const_val = val_elem[0] if val_elem else "_ERR_CONSTVAL_" @@ -322,7 +350,7 @@ def reconstruct_scl_from_tokens(st_node): # elif const_type == "LTime": access_str = f"LT#{const_val}" # ... otros tipos ... else: - access_str = "/*_ERR_NOCONST_*/" + access_str = "/*_ERR_NOCONST_*/" # --- Añadir más manejo de scopes aquí si es necesario --- # elif scope == "Call": access_str = reconstruct_call(elem) # elif scope == "Expression": access_str = reconstruct_expression(elem) @@ -330,39 +358,48 @@ def reconstruct_scl_from_tokens(st_node): scl_parts.append(access_str) elif tag == "Comment" or tag == "LineComment": - comment_text = "".join(elem.xpath(".//text()")).strip() - if tag == "Comment": - scl_parts.append(f"(* {comment_text} *)") - else: - scl_parts.append(f"// {comment_text}") + comment_text = "".join(elem.xpath(".//text()")).strip() + if tag == "Comment": + scl_parts.append(f"(* {comment_text} *)") + else: + scl_parts.append(f"// {comment_text}") # else: Ignorar otros nodos # Unir partes, limpiar espacios extra alrededor de operadores y saltos de línea full_scl = "".join(scl_parts) - # Re-indentar líneas después de IF/THEN, etc. (Simplificado) output_lines = [] indent_level = 0 - for line in full_scl.split('\n'): + for line in full_scl.split("\n"): line = line.strip() - if not line: continue # Saltar líneas vacías + if not line: + continue # Saltar líneas vacías # Reducir indentación antes de procesar END_IF, ELSE, etc. (simplificado) - if line.startswith(('END_IF', 'END_WHILE', 'END_FOR', 'END_CASE', 'ELSE', 'ELSIF')): - indent_level = max(0, indent_level - 1) + if line.startswith( + ("END_IF", "END_WHILE", "END_FOR", "END_CASE", "ELSE", "ELSIF") + ): + indent_level = max(0, indent_level - 1) - output_lines.append(" " * indent_level + line) # Aplicar indentación + output_lines.append(" " * indent_level + line) # Aplicar indentación # Aumentar indentación después de IF, WHILE, FOR, CASE, ELSE, ELSIF (simplificado) - if line.endswith('THEN') or line.endswith('DO') or line.endswith('OF') or line == 'ELSE': - indent_level += 1 + if ( + line.endswith("THEN") + or line.endswith("DO") + or line.endswith("OF") + or line == "ELSE" + ): + indent_level += 1 # Nota: Esto no maneja bloques BEGIN/END dentro de SCL return "\n".join(output_lines) + # STL (Statement List) Parser + def get_access_text(access_element): """Reconstruye una representación textual simple de un Access en STL.""" if access_element is None: @@ -379,7 +416,9 @@ def get_access_text(access_element): for comp in components: name = comp.get("Name", "_ERR_COMP_") # CORREGIDO: Añadido namespaces=ns - has_quotes_elem = comp.xpath("../stl:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns) + has_quotes_elem = comp.xpath( + "../stl:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns + ) has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true" # Usar nombre tal cual por ahora @@ -389,8 +428,8 @@ def get_access_text(access_element): # CORREGIDO: Añadido namespaces=ns index_access = comp.xpath("./stl:Access", namespaces=ns) if index_access: - indices = [get_access_text(ia) for ia in index_access] - parts.append(f"[{','.join(indices)}]") + indices = [get_access_text(ia) for ia in index_access] + parts.append(f"[{','.join(indices)}]") return ".".join(parts) @@ -400,14 +439,18 @@ def get_access_text(access_element): if constant_elem: # CORREGIDO: Añadido namespaces=ns val_elem = constant_elem[0].xpath("./stl:ConstantValue/text()", namespaces=ns) - type_elem = constant_elem[0].xpath("./stl:ConstantType/text()", namespaces=ns) # Obtener tipo para mejor formato + type_elem = constant_elem[0].xpath( + "./stl:ConstantType/text()", namespaces=ns + ) # Obtener tipo para mejor formato const_type = type_elem[0] if type_elem else "" const_val = val_elem[0] if val_elem else "_ERR_CONST_" # Añadir prefijo de tipo si es necesario (ej. T# , L#) - Simplificado - if const_type == "Time": return f"T#{const_val}" - if const_type == "ARef": return f"{const_val}" # No necesita prefijo + if const_type == "Time": + return f"T#{const_val}" + if const_type == "ARef": + return f"{const_val}" # No necesita prefijo # Añadir más tipos si es necesario - return const_val # Valor directo para otros tipos + return const_val # Valor directo para otros tipos # Intenta reconstruir etiqueta # CORREGIDO: Añadido namespaces=ns @@ -436,7 +479,9 @@ def get_access_text(access_element): # Formatear ancho width_map = {"Bit": "X", "Byte": "B", "Word": "W", "Double": "D"} - width_char = width_map.get(width, width[0] if width else "?") # Usa primera letra si no mapeado + width_char = width_map.get( + width, width[0] if width else "?" + ) # Usa primera letra si no mapeado return f"{area}{width_char}[{reg},{p_format_offset}]" @@ -446,49 +491,66 @@ def get_access_text(access_element): if address_elem: area = address_elem[0].get("Area", "??") bit_offset_str = address_elem[0].get("BitOffset", "0") - addr_type_str = address_elem[0].get("Type", "Bool") # Obtener tipo para ancho + addr_type_str = address_elem[0].get("Type", "Bool") # Obtener tipo para ancho try: - bit_offset = int(bit_offset_str) - byte_offset = bit_offset // 8 - bit_in_byte = bit_offset % 8 - # Determinar ancho basado en tipo (simplificación) - addr_width = "X" # Default a Bit - if addr_type_str == "Byte": addr_width = "B" - elif addr_type_str == "Word": addr_width = "W" - elif addr_type_str in ["DWord", "DInt"]: addr_width = "D" - # Añadir más tipos si es necesario (Real, etc.) + bit_offset = int(bit_offset_str) + byte_offset = bit_offset // 8 + bit_in_byte = bit_offset % 8 + # Determinar ancho basado en tipo (simplificación) + addr_width = "X" # Default a Bit + if addr_type_str == "Byte": + addr_width = "B" + elif addr_type_str == "Word": + addr_width = "W" + elif addr_type_str in ["DWord", "DInt"]: + addr_width = "D" + # Añadir más tipos si es necesario (Real, etc.) - # Mapear Area para STL estándar - area_map = { "Input": "I", "Output": "Q", "Memory": "M", - "PeripheryInput": "PI", "PeripheryOutput": "PQ", - "DB": "DB", "DI": "DI", "Local": "L", # L no siempre válido aquí - "Timer": "T", "Counter": "C" } - stl_area = area_map.get(area, area) + # Mapear Area para STL estándar + area_map = { + "Input": "I", + "Output": "Q", + "Memory": "M", + "PeripheryInput": "PI", + "PeripheryOutput": "PQ", + "DB": "DB", + "DI": "DI", + "Local": "L", # L no siempre válido aquí + "Timer": "T", + "Counter": "C", + } + stl_area = area_map.get(area, area) - # Manejar DB/DI que necesitan número de bloque - if stl_area in ["DB", "DI"]: - block_num = address_elem[0].get("BlockNumber") - if block_num: - return f"{stl_area}{block_num}.{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DB1.DBX0.1 - else: # Acceso con registro DB/DI - return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DBX0.1 - elif stl_area in ["T", "C"]: - return f"{stl_area}{byte_offset}" # Los timers/contadores solo usan el número - else: # I, Q, M, L, PI, PQ - return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: M10.1, I0.0 + # Manejar DB/DI que necesitan número de bloque + if stl_area in ["DB", "DI"]: + block_num = address_elem[0].get("BlockNumber") + if block_num: + return f"{stl_area}{block_num}.{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DB1.DBX0.1 + else: # Acceso con registro DB/DI + return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DBX0.1 + elif stl_area in ["T", "C"]: + return f"{stl_area}{byte_offset}" # Los timers/contadores solo usan el número + else: # I, Q, M, L, PI, PQ + return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: M10.1, I0.0 except ValueError: - return f"{area}?{bit_offset_str}?" + return f"{area}?{bit_offset_str}?" + + return f"_{scope}_?" # Fallback - return f"_{scope}_?" # Fallback def get_comment_text(comment_element): """Extrae texto de un LineComment o Comment.""" - if comment_element is None: return "" + if comment_element is None: + return "" # Usar get_multilingual_text si los comentarios son multilingües # Si no, extraer texto directamente - ml_texts = comment_element.xpath(".//mlt:MultilingualTextItem/mlt:AttributeList/mlt:Text/text()", - namespaces={'mlt': "http://www.siemens.com/automation/Openness/SW/Interface/v5"}) # Asumiendo ns + ml_texts = comment_element.xpath( + ".//mlt:MultilingualTextItem/mlt:AttributeList/mlt:Text/text()", + namespaces={ + "mlt": "http://www.siemens.com/automation/Openness/SW/Interface/v5" + }, + ) # Asumiendo ns if ml_texts: # Podrías intentar obtener un idioma específico o simplemente el primero return ml_texts[0].strip() if ml_texts else "" @@ -497,6 +559,7 @@ def get_comment_text(comment_element): text_nodes = comment_element.xpath("./text()") return "".join(text_nodes).strip() + def reconstruct_stl_from_statementlist(statement_list_node): """Reconstruye el código STL como una cadena de texto desde .""" if statement_list_node is None: @@ -508,11 +571,13 @@ def reconstruct_stl_from_statementlist(statement_list_node): for stmt in statements: line_parts = [] - line_comment = "" # Comentario al final de la línea + line_comment = "" # Comentario al final de la línea # 1. Comentarios al inicio de la línea (como líneas separadas //) # CORREGIDO: Añadido namespaces=ns - initial_comments = stmt.xpath("child::stl:Comment | child::stl:LineComment", namespaces=ns) + initial_comments = stmt.xpath( + "child::stl:Comment | child::stl:LineComment", namespaces=ns + ) for comm in initial_comments: comment_text = get_comment_text(comm) if comment_text: @@ -531,70 +596,185 @@ def reconstruct_stl_from_statementlist(statement_list_node): label_str = f"{label_name_nodes[0]}:" # Buscar comentarios DENTRO de LabelDeclaration pero después de Label # CORREGIDO: Añadido namespaces=ns - label_comments = label_decl[0].xpath("./stl:Comment | ./stl:LineComment", namespaces=ns) + label_comments = label_decl[0].xpath( + "./stl:Comment | ./stl:LineComment", namespaces=ns + ) for lcomm in label_comments: comment_text = get_comment_text(lcomm) - if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea + if comment_text: + line_comment += ( + f" // {comment_text}" # Añadir al comentario de línea + ) # 3. Token de Instrucción STL # CORREGIDO: Añadido namespaces=ns instruction_token = stmt.xpath("./stl:StlToken", namespaces=ns) instruction_str = "" if instruction_token: - token_text = instruction_token[0].get("Text", "_ERR_TOKEN_") - instruction_str = token_text - # Comentarios asociados directamente al token - # CORREGIDO: Añadido namespaces=ns - token_comments = instruction_token[0].xpath("./stl:Comment | ./stl:LineComment", namespaces=ns) - for tcomm in token_comments: - comment_text = get_comment_text(tcomm) - if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea + token_text = instruction_token[0].get("Text", "_ERR_TOKEN_") + instruction_str = token_text + # Comentarios asociados directamente al token + # CORREGIDO: Añadido namespaces=ns + token_comments = instruction_token[0].xpath( + "./stl:Comment | ./stl:LineComment", namespaces=ns + ) + for tcomm in token_comments: + comment_text = get_comment_text(tcomm) + if comment_text: + line_comment += ( + f" // {comment_text}" # Añadir al comentario de línea + ) # 4. Acceso/Operando STL # CORREGIDO: Añadido namespaces=ns access_elem = stmt.xpath("./stl:Access", namespaces=ns) access_str = "" if access_elem: - access_text = get_access_text(access_elem[0]) - access_str = access_text - # Comentarios DENTRO del Access (pueden ser de línea o bloque) - # CORREGIDO: Añadido namespaces=ns - access_comments = access_elem[0].xpath("child::stl:LineComment | child::stl:Comment", namespaces=ns) - for acc_comm in access_comments: - comment_text = get_comment_text(acc_comm) - if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea + access_text = get_access_text(access_elem[0]) + access_str = access_text + # Comentarios DENTRO del Access (pueden ser de línea o bloque) + # CORREGIDO: Añadido namespaces=ns + access_comments = access_elem[0].xpath( + "child::stl:LineComment | child::stl:Comment", namespaces=ns + ) + for acc_comm in access_comments: + comment_text = get_comment_text(acc_comm) + if comment_text: + line_comment += ( + f" // {comment_text}" # Añadir al comentario de línea + ) # Construir la línea: Etiqueta (si hay) + Tab + Instrucción + Espacio + Operando (si hay) + Comentario(s) current_line = "" if label_str: current_line += label_str if instruction_str: - if current_line: # Si ya había etiqueta, añadir tabulador + if current_line: # Si ya había etiqueta, añadir tabulador current_line += "\t" current_line += instruction_str if access_str: - if current_line: # Si ya había algo, añadir espacio + if current_line: # Si ya había algo, añadir espacio current_line += " " - current_line += access_str + current_line += access_str if line_comment: # Añadir espacio antes del comentario si hay código en la línea if current_line.strip(): current_line += f" {line_comment}" - else: # Si la línea estaba vacía (solo comentarios iniciales), poner el comentario de línea - current_line = line_comment + else: # Si la línea estaba vacía (solo comentarios iniciales), poner el comentario de línea + current_line = line_comment # Añadir la línea construida solo si no está vacía if current_line.strip(): - stl_lines.append(current_line.rstrip()) # Eliminar espacios finales + stl_lines.append(current_line.rstrip()) # Eliminar espacios finales return "\n".join(stl_lines) + +# DB Parser + + +def parse_interface_members(member_elements): + """ + Parsea recursivamente una lista de elementos de una interfaz o estructura. + Maneja miembros simples, structs anidados y arrays con valores iniciales. + """ + members_data = [] + if not member_elements: + return members_data + + for member in member_elements: + member_name = member.get("Name") + member_dtype = member.get("Datatype") + member_remanence = member.get("Remanence", "NonRetain") # Default si no existe + member_accessibility = member.get("Accessibility", "Public") # Default + + if not member_name or not member_dtype: + print( + f"Advertencia: Miembro sin nombre o tipo de dato encontrado. Saltando." + ) + continue + + member_info = { + "name": member_name, + "datatype": member_dtype, + "remanence": member_remanence, + "accessibility": member_accessibility, + "start_value": None, # Para valores simples o structs/arrays inicializados globalmente + "comment": None, + "children": [], # Para structs + "array_elements": {}, # Para arrays (índice -> valor) + } + + # Extraer comentario del miembro + # Usar namespace iface + comment_node = member.xpath("./iface:Comment", namespaces=ns) + if comment_node: + # Llama a get_multilingual_text que ya maneja el namespace iface internamente + member_info["comment"] = get_multilingual_text(comment_node[0]) + + # Extraer valor inicial (para tipos simples) + # Usar namespace iface + start_value_node = member.xpath("./iface:StartValue", namespaces=ns) + if start_value_node: + constant_name = start_value_node[0].get("ConstantName") + if constant_name: + member_info["start_value"] = constant_name + else: + member_info["start_value"] = ( + start_value_node[0].text + if start_value_node[0].text is not None + else "" + ) + + # --- Manejar Structs Anidados --- + # Usar namespace iface + nested_sections = member.xpath( + "./iface:Sections/iface:Section/iface:Member", namespaces=ns + ) + if nested_sections: + member_info["children"] = parse_interface_members( + nested_sections + ) # Llamada recursiva + + # --- Manejar Arrays --- + if member_dtype.lower().startswith("array["): + # Usar namespace iface + subelements = member.xpath("./iface:Subelement", namespaces=ns) + for sub in subelements: + path = sub.get("Path") + # Usar namespace iface + sub_start_value_node = sub.xpath("./iface:StartValue", namespaces=ns) + if path and sub_start_value_node: + constant_name = sub_start_value_node[0].get("ConstantName") + value = ( + constant_name + if constant_name + else ( + sub_start_value_node[0].text + if sub_start_value_node[0].text is not None + else "" + ) + ) + member_info["array_elements"][path] = value + else: + # Usar namespace iface + sub_comment_node = sub.xpath("./iface:Comment", namespaces=ns) + if path and sub_comment_node: + # member_info["array_comments"][path] = get_multilingual_text(sub_comment_node[0]) + pass + + members_data.append(member_info) + + return members_data + + # --- Main Parsing Function --- + def parse_network(network_element): """ Parsea una red, extrae lógica y añade conexiones EN implícitas. - Maneja wires con múltiples destinos. + Maneja wires con múltiples destinos. (Función original adaptada para namespaces) """ if network_element is None: return { @@ -607,15 +787,16 @@ def parse_network(network_element): network_id = network_element.get("ID") - # --- Extracción Título/Comentario (sin cambios respecto a la última versión) --- + # Extracción Título/Comentario (usar namespace iface para MultilingualText) title_element = network_element.xpath( - ".//*[local-name()='MultilingualText'][@CompositionName='Title']" + ".//iface:MultilingualText[@CompositionName='Title']", namespaces=ns ) network_title = ( get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}" ) + # Asume que el comentario está en ObjectList dentro de CompileUnit comment_element = network_element.xpath( "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" ) @@ -623,31 +804,31 @@ def parse_network(network_element): get_multilingual_text(comment_element[0]) if comment_element else "" ) + # Buscar FlgNet usando namespace flg flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) if not flgnet_list: - # print(f"Advertencia: FlgNet no encontrado en Red ID={network_id}. Puede estar vacía o ser comentario.") return { "id": network_id, "title": network_title, "comment": network_comment, "logic": [], + "language": "Unknown", "error": "FlgNet not found", } flgnet = flgnet_list[0] - # 1. Parsear Access, Parts y Calls (sin cambios) + # 1. Parsear Access, Parts y Calls (llaman a funciones que ya usan ns) access_map = { acc_info["uid"]: acc_info - for acc in flgnet.xpath(".//flg:Access", namespaces=ns) + for acc in flgnet.xpath(".//flg:Access", namespaces=ns) # Usa ns if (acc_info := parse_access(acc)) and acc_info["type"] != "unknown" } parts_and_calls_map = {} + # Usa ns instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns) for element in instruction_elements: parsed_info = None - tag_name = etree.QName( - element.tag - ).localname # Obtener nombre local de la etiqueta + tag_name = etree.QName(element.tag).localname if tag_name == "Part": parsed_info = parse_part(element) elif tag_name == "Call": @@ -659,85 +840,50 @@ def parse_network(network_element): f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}" ) - # --- 2. Parsear Wires (MODIFICADO para multi-destino) --- - wire_connections = defaultdict( - list - ) # (dest_uid, dest_pin) -> [(src_uid, src_pin), ...] - source_connections = defaultdict( - list - ) # (src_uid, src_pin) -> [(dest_uid, dest_pin), ...] - eno_outputs = defaultdict( - list - ) # src_uid -> [(dest_uid, dest_pin), ...] (conexiones DESDE eno) - - flg_ns_uri = ns["flg"] # Cache namespace URI + # 2. Parsear Wires (con namespaces) + wire_connections = defaultdict(list) + source_connections = defaultdict(list) + eno_outputs = defaultdict(list) + # Cachear QNames con namespace flg + flg_ns_uri = ns["flg"] qname_powerrail = etree.QName(flg_ns_uri, "Powerrail") qname_identcon = etree.QName(flg_ns_uri, "IdentCon") qname_namecon = etree.QName(flg_ns_uri, "NameCon") - + # Usa ns for wire in flgnet.xpath(".//flg:Wire", namespaces=ns): children = wire.getchildren() if len(children) < 2: - continue # Ignorar wires sin fuente y al menos un destino - + continue source_elem = children[0] source_uid, source_pin = None, None - - # Determinar fuente if source_elem.tag == qname_powerrail: source_uid, source_pin = "POWERRAIL", "out" elif source_elem.tag == qname_identcon: - source_uid, source_pin = ( - source_elem.get("UId"), - "value", - ) # Acceso a variable/constante + source_uid, source_pin = source_elem.get("UId"), "value" elif source_elem.tag == qname_namecon: - source_uid, source_pin = source_elem.get("UId"), source_elem.get( - "Name" - ) # Salida de instrucción - + source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name") if source_uid is None: - continue # No se pudo determinar la fuente - - source_info = (source_uid, source_pin) # Par de fuente - - # Iterar sobre TODOS los posibles destinos (desde el segundo hijo en adelante) + continue + source_info = (source_uid, source_pin) for dest_elem in children[1:]: dest_uid, dest_pin = None, None - - # Determinar destino if dest_elem.tag == qname_identcon: - dest_uid, dest_pin = ( - dest_elem.get("UId"), - "value", - ) # Entrada a variable/constante (Coil, etc.) + dest_uid, dest_pin = dest_elem.get("UId"), "value" elif dest_elem.tag == qname_namecon: - dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get( - "Name" - ) # Entrada a instrucción - - # Guardar conexiones si son válidas + dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name") if dest_uid is not None and dest_pin is not None: - # Mapa de Conexiones (Destino -> [Fuentes]) dest_key = (dest_uid, dest_pin) if source_info not in wire_connections[dest_key]: wire_connections[dest_key].append(source_info) - - # Mapa de Fuentes (Fuente -> [Destinos]) source_key = (source_uid, source_pin) dest_info = (dest_uid, dest_pin) if dest_info not in source_connections[source_key]: source_connections[source_key].append(dest_info) - - # Registrar conexiones que SALEN de un pin 'eno' if source_pin == "eno" and source_uid in parts_and_calls_map: if dest_info not in eno_outputs[source_uid]: eno_outputs[source_uid].append(dest_info) - # else: # Debug opcional si un elemento no es destino válido - # print(f"Advertencia: Elemento en Wire {wire.get('UId')} no es destino válido: {etree.tostring(dest_elem)}") - # --- FIN MODIFICACIÓN Wire --- - # 3. Construcción Lógica Inicial (sin cambios) + # 3. Construcción Lógica Inicial (sin cambios en lógica, pero verificar llamadas) all_logic_steps = {} functional_block_types = [ "Move", @@ -751,7 +897,13 @@ def parse_network(network_element): "Se", "Sd", "BLKMOV", - ] + "TON", + "TOF", + "TP", + "CTU", + "CTD", + "CTUD", + ] # Añadidos timers/counters SCL rlo_generators = [ "Contact", "O", @@ -765,46 +917,42 @@ def parse_network(network_element): "Xor", "PBox", "NBox", - ] + "Not", + ] # Añadido Not for instruction_uid, instruction_info in parts_and_calls_map.items(): - # Copiar info básica instruction_repr = {"instruction_uid": instruction_uid, **instruction_info} instruction_repr["inputs"] = {} instruction_repr["outputs"] = {} - - # --- INICIO: Manejo Especial SdCoil y otros Timers --- original_type = instruction_info["type"] current_type = original_type - input_pin_mapping = {} # Mapa XML pin -> JSON pin - output_pin_mapping = {} # Mapa XML pin -> JSON pin - + input_pin_mapping = {} + output_pin_mapping = {} + # --- Manejo Especial Tipos --- if original_type == "SdCoil": - print( - f" Advertencia: Reinterpretando 'SdCoil' (UID: {instruction_uid}) como 'Se' (Pulse Timer)." - ) - current_type = "Se" # Tratarlo como Se (TP) + current_type = "Se" + input_pin_mapping = {"in": "s", "operand": "timer", "value": "tv"} + output_pin_mapping = {"out": "q"} + elif original_type in ["Se", "Sd", "TON", "TOF", "TP"]: input_pin_mapping = { - "in": "s", # Pin XML 'in' mapea a JSON 's' (Start) - "operand": "timer", # Pin XML 'operand' mapea a JSON 'timer' (Instance) - "value": "tv", # Pin XML 'value' mapea a JSON 'tv' (Time Value) + "s": "s", + "in": "in", + "tv": "tv", + "pt": "pt", + "r": "r", + "timer": "timer", } - output_pin_mapping = {"out": "q"} # Pin XML 'out' mapea a JSON 'q' (Output) - elif original_type in ["Se", "Sd"]: - # Mapear pines estándar de Se/Sd para consistencia con TP/TON - input_pin_mapping = {"s": "s", "tv": "tv", "r": "r", "timer": "timer"} - output_pin_mapping = { - "q": "q", - "rt": "rt", # "rtbcd": "rtbcd" (ignorar BCD) + output_pin_mapping = {"q": "q", "Q": "Q", "rt": "rt", "ET": "ET"} + elif original_type in ["CTU", "CTD", "CTUD"]: + input_pin_mapping = { + "cu": "CU", + "cd": "CD", + "r": "R", + "ld": "LD", + "pv": "PV", + "counter": "counter", } - # Añadir otros mapeos si son necesarios para otros bloques (ej. Contadores) - # elif original_type == "CTU": - # input_pin_mapping = {"cu": "cu", "r": "r", "pv": "pv", "counter": "counter"} # 'counter' es inventado para instancia - # output_pin_mapping = {"qu": "qu", "cv": "cv"} - # --- FIN Manejo Especial --- - - instruction_repr["type"] = current_type # Actualizar el tipo si se cambió - - # Mapear Entradas usando el mapeo de pines + output_pin_mapping = {"qu": "QU", "qd": "QD", "cv": "CV"} + instruction_repr["type"] = current_type possible_input_pins = set( [ "en", @@ -825,14 +973,14 @@ def parse_network(network_element): "ld", "pre", "SRCBLK", + "PT", ] - ) # Ampliar con pines conocidos + ) # Añadido PT for xml_pin_name in possible_input_pins: dest_key = (instruction_uid, xml_pin_name) if dest_key in wire_connections: sources_list = wire_connections[dest_key] input_sources_repr = [] - # ... (lógica existente para obtener input_sources_repr de sources_list) ... for source_uid, source_pin in sources_list: if source_uid == "POWERRAIL": input_sources_repr.append({"type": "powerrail"}) @@ -841,36 +989,38 @@ def parse_network(network_element): elif source_uid in parts_and_calls_map: source_instr_info = parts_and_calls_map[source_uid] source_original_type = source_instr_info["type"] - # Obtener el mapeo de salida para el tipo de la fuente (si existe) source_output_mapping = {} if source_original_type == "SdCoil": source_output_mapping = {"out": "q"} - elif source_original_type in ["Se", "Sd"]: - source_output_mapping = {"q": "q", "rt": "rt"} - - # Usar el pin mapeado si existe, sino el original - mapped_source_pin = source_output_mapping.get(source_pin, source_pin) - - input_sources_repr.append({ - "type": "connection", - "source_instruction_type": source_original_type, # Guardar tipo original puede ser útil - "source_instruction_uid": source_uid, - "source_pin": mapped_source_pin # <-- USAR PIN MAPEADO - }) + elif source_original_type in ["Se", "Sd", "TON", "TOF", "TP"]: + source_output_mapping = { + "q": "q", + "Q": "Q", + "rt": "rt", + "ET": "ET", + } + elif source_original_type in ["CTU", "CTD", "CTUD"]: + source_output_mapping = {"qu": "QU", "qd": "QD", "cv": "CV"} + mapped_source_pin = source_output_mapping.get( + source_pin, source_pin + ) + input_sources_repr.append( + { + "type": "connection", + "source_instruction_type": source_original_type, + "source_instruction_uid": source_uid, + "source_pin": mapped_source_pin, + } + ) else: input_sources_repr.append( {"type": "unknown_source", "uid": source_uid} ) - - # Usar el nombre de pin mapeado para el JSON json_pin_name = input_pin_mapping.get(xml_pin_name, xml_pin_name) - if len(input_sources_repr) == 1: instruction_repr["inputs"][json_pin_name] = input_sources_repr[0] elif len(input_sources_repr) > 1: instruction_repr["inputs"][json_pin_name] = input_sources_repr - - # Mapear Salidas usando el mapeo de pines possible_output_pins = set( [ "out", @@ -886,17 +1036,15 @@ def parse_network(network_element): "cvbcd", "QU", "QD", + "ET", ] - ) + ) # Añadido ET for xml_pin_name in possible_output_pins: source_key = (instruction_uid, xml_pin_name) if source_key in source_connections: - # Usar el nombre de pin mapeado para el JSON json_pin_name = output_pin_mapping.get(xml_pin_name, xml_pin_name) - if json_pin_name not in instruction_repr["outputs"]: instruction_repr["outputs"][json_pin_name] = [] - for dest_uid, dest_pin in source_connections[source_key]: if dest_uid in access_map: if ( @@ -906,10 +1054,9 @@ def parse_network(network_element): instruction_repr["outputs"][json_pin_name].append( access_map[dest_uid] ) - all_logic_steps[instruction_uid] = instruction_repr - # 4. Inferencia EN (sin cambios) + # 4. Inferencia EN (sin cambios en lógica) processed_blocks_en_inference = set() something_changed = True inference_passes = 0 @@ -930,8 +1077,8 @@ def parse_network(network_element): for i, instruction in enumerate(ordered_logic_list_for_en): part_uid = instruction["instruction_uid"] part_type_original = ( - instruction["type"].replace("_scl", "").replace("_error", "") - ) + instruction["type"].replace(SCL_SUFFIX, "").replace("_error", "") + ) # Usa SCL_SUFFIX if ( part_type_original in functional_block_types and "en" not in instruction["inputs"] @@ -943,7 +1090,9 @@ def parse_network(network_element): prev_instr = ordered_logic_list_for_en[j] prev_uid = prev_instr["instruction_uid"] prev_type_original = ( - prev_instr["type"].replace("_scl", "").replace("_error", "") + prev_instr["type"] + .replace(SCL_SUFFIX, "") + .replace("_error", "") ) if prev_type_original in rlo_generators: inferred_en_source = { @@ -979,7 +1128,7 @@ def parse_network(network_element): processed_blocks_en_inference.add(part_uid) something_changed = True - # 5. Añadir lógica ENO interesante (sin cambios) + # 5. Añadir lógica ENO interesante (sin cambios en lógica) for source_instr_uid, eno_destinations in eno_outputs.items(): if source_instr_uid not in all_logic_steps: continue @@ -1025,17 +1174,30 @@ def parse_network(network_element): if interesting_eno_logic: all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic - # 6. Ordenar y Devolver (sin cambios) + # 6. Ordenar y Devolver network_logic_final = [ all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps ] + # Determinar lenguaje de la red para devolverlo + network_lang = "Unknown" + if network_element is not None: + attr_list_net = network_element.xpath("./*[local-name()='AttributeList']") + if attr_list_net: + lang_node_net = attr_list_net[0].xpath( + "./*[local-name()='ProgrammingLanguage']/text()" + ) + if lang_node_net: + network_lang = lang_node_net[0].strip() + return { "id": network_id, "title": network_title, "comment": network_comment, + "language": network_lang, "logic": network_logic_final, } + def convert_xml_to_json(xml_filepath, json_filepath): print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") if not os.path.exists(xml_filepath): @@ -1047,25 +1209,37 @@ def convert_xml_to_json(xml_filepath, json_filepath): tree = etree.parse(xml_filepath, parser) root = tree.getroot() print("Paso 1: Parseo XML completado.") - print("Paso 2: Buscando el bloque SW.Blocks.FC...") # Asume FC primero - block_list = root.xpath("//*[local-name()='SW.Blocks.FC']") - block_type_found = "FC" - if not block_list: - block_list = root.xpath( - "//*[local-name()='SW.Blocks.FB']" - ) # Busca FB si no hay FC - block_type_found = "FB" - if not block_list: - print("Error Crítico: No se encontró ni .") - return - else: - print( - "Advertencia: Se encontró en lugar de ." - ) - the_block = block_list[0] - print( - f"Paso 2: Bloque SW.Blocks.{block_type_found} encontrado (ID={the_block.get('ID')})." - ) + print("Paso 2: Buscando el bloque SW.Blocks.FC, SW.Blocks.FB o SW.Blocks.GlobalDB...") + # --- MODIFICADO: Buscar FC, FB o GlobalDB --- + block_list = root.xpath("//*[local-name()='SW.Blocks.FC' or local-name()='SW.Blocks.FB' or local-name()='SW.Blocks.GlobalDB']") + block_type_found = None + the_block = None + + if block_list: + the_block = block_list[0] + # Obtener el nombre real de la etiqueta encontrada + block_tag_name = etree.QName(the_block.tag).localname + if block_tag_name == "SW.Blocks.FC": + block_type_found = "FC" + elif block_tag_name == "SW.Blocks.FB": + block_type_found = "FB" + elif block_tag_name == "SW.Blocks.GlobalDB": + block_type_found = "GlobalDB" # Identificar el tipo DB + print(f"Paso 2: Bloque {block_tag_name} encontrado (ID={the_block.get('ID')}).") + else: + # Mensaje de error más específico y añadimos depuración + print("Error Crítico: No se encontró el elemento raíz del bloque (, o ) usando XPath.") + # --- Añadir Debugging --- + print(f"DEBUG: Tag del elemento raíz del XML: {root.tag}") + print(f"DEBUG: Primeros hijos del raíz:") + for i, child in enumerate(root.getchildren()): + if i < 5: # Imprimir solo los primeros 5 para no saturar + print(f"DEBUG: - Hijo {i+1}: {child.tag}") + else: + print("DEBUG: - ... (más hijos)") + break + # --- Fin Debugging --- + return # Salir si no se encuentra el bloque principal print("Paso 3: Extrayendo atributos del bloque...") attribute_list_node = the_block.xpath("./*[local-name()='AttributeList']") block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown" @@ -1241,24 +1415,30 @@ def convert_xml_to_json(xml_filepath, json_filepath): reconstructed_stl = f"// STL extraction failed for Network {network_id}: StatementList node not found.\n" if statement_list_node: - print(f" Reconstruyendo STL desde StatementList para red {network_id}...") + print( + f" Reconstruyendo STL desde StatementList para red {network_id}..." + ) # Llama a la nueva función de reconstrucción STL - reconstructed_stl = reconstruct_stl_from_statementlist(statement_list_node[0]) + reconstructed_stl = reconstruct_stl_from_statementlist( + statement_list_node[0] + ) # print(f" ... STL reconstruido (parcial):\n{reconstructed_stl[:200]}...") # Preview opcional else: - print(f" Advertencia: No se encontró nodo para red STL {network_id}.") + print( + f" Advertencia: No se encontró nodo para red STL {network_id}." + ) # Guardar como un chunk de texto crudo parsed_network_data = { "id": network_id, "title": network_title, "comment": network_comment, - "language": "STL", # Indicar que es STL + "language": "STL", # Indicar que es STL "logic": [ { - "instruction_uid": f"STL_{network_id}", # UID inventado - "type": "RAW_STL_CHUNK", # Nuevo tipo para identificarlo - "stl": reconstructed_stl, # Guardar el texto reconstruido + "instruction_uid": f"STL_{network_id}", # UID inventado + "type": "RAW_STL_CHUNK", # Nuevo tipo para identificarlo + "stl": reconstructed_stl, # Guardar el texto reconstruido } ], } @@ -1340,6 +1520,7 @@ def convert_xml_to_json(xml_filepath, json_filepath): traceback.print_exc() print("--- Fin Traceback ---") + if __name__ == "__main__": # Imports necesarios solo para la ejecución como script principal import argparse @@ -1351,27 +1532,29 @@ if __name__ == "__main__": description="Convert Simatic XML (LAD/FBD/SCL/STL) to simplified JSON. Expects XML filepath as argument." ) parser.add_argument( - "xml_filepath", # Argumento posicional obligatorio + "xml_filepath", # Argumento posicional obligatorio help="Path to the input XML file passed from the main script (x0_main.py).", ) - args = parser.parse_args() # Parsea los argumentos de sys.argv + args = parser.parse_args() # Parsea los argumentos de sys.argv - xml_input_file = args.xml_filepath # Obtiene la ruta del argumento + xml_input_file = args.xml_filepath # Obtiene la ruta del argumento # Verificar si el archivo de entrada existe (es una buena práctica aunque x0 lo haga) if not os.path.exists(xml_input_file): print(f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'") - sys.exit(1) # Salir si el archivo no existe + sys.exit(1) # Salir si el archivo no existe # Derivar nombre base para archivo de salida JSON # El archivo JSON se guardará en el mismo directorio que el XML de entrada xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0] - output_dir = os.path.dirname(xml_input_file) # Directorio del XML de entrada + output_dir = os.path.dirname(xml_input_file) # Directorio del XML de entrada # Asegurarse de que el directorio de salida exista (aunque debería si el XML existe) os.makedirs(output_dir, exist_ok=True) json_output_file = os.path.join(output_dir, f"{xml_filename_base}_simplified.json") - print(f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'") + print( + f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'" + ) # Llamar a la función principal de conversión del script # Asumiendo que tu función principal se llama convert_xml_to_json(input_path, output_path) @@ -1380,6 +1563,6 @@ if __name__ == "__main__": except Exception as e: print(f"Error Crítico (x1) durante la conversión de '{xml_input_file}': {e}") import traceback - traceback.print_exc() - sys.exit(1) # Salir con error si la función principal falla + traceback.print_exc() + sys.exit(1) # Salir con error si la función principal falla diff --git a/x2_process.py b/x2_process.py index fd124d8..a0f3f93 100644 --- a/x2_process.py +++ b/x2_process.py @@ -5,29 +5,30 @@ import os import copy import traceback import re -import importlib -import sys -import sympy # Import sympy +import importlib +import sys +import sympy # Import sympy # Import necessary components from processors directory from processors.processor_utils import ( - format_variable_name, # Keep if used outside processors - sympy_expr_to_scl, # Needed for IF grouping and maybe others + format_variable_name, # Keep if used outside processors + sympy_expr_to_scl, # Needed for IF grouping and maybe others # get_target_scl_name might be used here? Unlikely. ) -from processors.symbol_manager import SymbolManager # Import the manager +from processors.symbol_manager import SymbolManager # Import the manager # --- Constantes y Configuración --- # SCL_SUFFIX = "_scl" # Old suffix -SCL_SUFFIX = "_sympy_processed" # New suffix to indicate processing method +SCL_SUFFIX = "_sympy_processed" # New suffix to indicate processing method GROUPED_COMMENT = "// Logic included in grouped IF" -SIMPLIFIED_IF_COMMENT = "// Simplified IF condition by script" # May still be useful +SIMPLIFIED_IF_COMMENT = "// Simplified IF condition by script" # May still be useful # Global data dictionary (consider passing 'data' as argument if needed elsewhere) # It's currently used by process_group_ifs implicitly via the outer scope, # which works but passing it explicitly might be cleaner. data = {} + def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): """ Busca condiciones (ya procesadas -> tienen expr SymPy en sympy_map) @@ -37,24 +38,45 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): (Esta es la implementación de la función como la tenías en el archivo original) """ instr_uid = instruction["instruction_uid"] - instr_type_original = instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "") + instr_type_original = ( + instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "") + ) made_change = False # Check if this instruction *could* generate a condition suitable for grouping # It must have been processed by the new SymPy method if ( - not instruction.get("type", "").endswith(SCL_SUFFIX) # Check if processed by new method + not instruction.get("type", "").endswith( + SCL_SUFFIX + ) # Check if processed by new method or "_error" in instruction.get("type", "") or instruction.get("grouped", False) - or instr_type_original not in [ # Original types that produce boolean results - "Contact", "O", "Eq", "Ne", "Gt", "Lt", "Ge", "Le", "PBox", "NBox", "And", "Xor", "Not" # Add others like comparison + or instr_type_original + not in [ # Original types that produce boolean results + "Contact", + "O", + "Eq", + "Ne", + "Gt", + "Lt", + "Ge", + "Le", + "PBox", + "NBox", + "And", + "Xor", + "Not", # Add others like comparison ] ): return False # Avoid reagruping if SCL already contains a complex IF (less likely now) current_scl = instruction.get("scl", "") - if current_scl.strip().startswith("IF") and "END_IF;" in current_scl and GROUPED_COMMENT not in current_scl: + if ( + current_scl.strip().startswith("IF") + and "END_IF;" in current_scl + and GROUPED_COMMENT not in current_scl + ): return False # *** Get the SymPy expression for the condition *** @@ -62,20 +84,34 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): sympy_condition_expr = sympy_map.get(map_key_out) # No SymPy expression found or trivial conditions - if sympy_condition_expr is None or sympy_condition_expr in [sympy.true, sympy.false]: + if sympy_condition_expr is None or sympy_condition_expr in [ + sympy.true, + sympy.false, + ]: return False # --- Find consumer instructions (logic similar to before) --- grouped_instructions_cores = [] consumer_instr_list = [] - network_logic = next((net["logic"] for net in data["networks"] if net["id"] == network_id), []) - if not network_logic: return False + network_logic = next( + (net["logic"] for net in data["networks"] if net["id"] == network_id), [] + ) + if not network_logic: + return False - groupable_types = [ # Types whose *final SCL* we want to group - "Move", "Add", "Sub", "Mul", "Div", "Mod", "Convert", - "Call_FC", "Call_FB", # Assuming these generate final SCL in their processors now + groupable_types = [ # Types whose *final SCL* we want to group + "Move", + "Add", + "Sub", + "Mul", + "Div", + "Mod", + "Convert", + "Call_FC", + "Call_FB", # Assuming these generate final SCL in their processors now # SCoil/RCoil might also be groupable if their SCL is final assignment - "SCoil", "RCoil" + "SCoil", + "RCoil", ] for consumer_instr in network_logic: @@ -84,31 +120,45 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): continue consumer_en = consumer_instr.get("inputs", {}).get("en") - consumer_type = consumer_instr.get("type", "") # Current type suffix matters - consumer_type_original = consumer_type.replace(SCL_SUFFIX, "").replace("_error", "") + consumer_type = consumer_instr.get("type", "") # Current type suffix matters + consumer_type_original = consumer_type.replace(SCL_SUFFIX, "").replace( + "_error", "" + ) is_enabled_by_us = False - if ( isinstance(consumer_en, dict) and consumer_en.get("type") == "connection" and - consumer_en.get("source_instruction_uid") == instr_uid and - consumer_en.get("source_pin") == "out"): + if ( + isinstance(consumer_en, dict) + and consumer_en.get("type") == "connection" + and consumer_en.get("source_instruction_uid") == instr_uid + and consumer_en.get("source_pin") == "out" + ): is_enabled_by_us = True # Check if consumer is groupable AND has its final SCL generated # The suffix check needs adjustment based on how terminating processors set it. # Assuming processors like Move, Add, Call, SCoil, RCoil NOW generate final SCL and add a suffix. - if ( is_enabled_by_us and consumer_type.endswith(SCL_SUFFIX) and # Or a specific "final_scl" suffix - consumer_type_original in groupable_types ): + if ( + is_enabled_by_us + and consumer_type.endswith(SCL_SUFFIX) # Or a specific "final_scl" suffix + and consumer_type_original in groupable_types + ): consumer_scl = consumer_instr.get("scl", "") # Extract core SCL (logic is similar, maybe simpler if SCL is cleaner now) core_scl = None if consumer_scl: - # If consumer SCL itself is an IF generated by EN, take the body - if consumer_scl.strip().startswith("IF"): - match = re.search(r"THEN\s*(.*?)\s*END_IF;", consumer_scl, re.DOTALL | re.IGNORECASE) - core_scl = match.group(1).strip() if match else None - elif not consumer_scl.strip().startswith("//"): # Otherwise, take the whole line if not comment - core_scl = consumer_scl.strip() + # If consumer SCL itself is an IF generated by EN, take the body + if consumer_scl.strip().startswith("IF"): + match = re.search( + r"THEN\s*(.*?)\s*END_IF;", + consumer_scl, + re.DOTALL | re.IGNORECASE, + ) + core_scl = match.group(1).strip() if match else None + elif not consumer_scl.strip().startswith( + "//" + ): # Otherwise, take the whole line if not comment + core_scl = consumer_scl.strip() if core_scl: grouped_instructions_cores.append(core_scl) @@ -116,15 +166,19 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): # --- If groupable consumers found --- if len(grouped_instructions_cores) > 1: - print(f"INFO: Agrupando {len(grouped_instructions_cores)} instr. bajo condición de {instr_type_original} UID {instr_uid}") + print( + f"INFO: Agrupando {len(grouped_instructions_cores)} instr. bajo condición de {instr_type_original} UID {instr_uid}" + ) # *** Simplify the SymPy condition *** try: - #simplified_expr = sympy.simplify_logic(sympy_condition_expr, force=True) - simplified_expr = sympy.logic.boolalg.to_dnf(sympy_condition_expr, simplify=True) + # simplified_expr = sympy.simplify_logic(sympy_condition_expr, force=True) + simplified_expr = sympy.logic.boolalg.to_dnf( + sympy_condition_expr, simplify=True + ) except Exception as e: print(f"Error simplifying condition for grouping UID {instr_uid}: {e}") - simplified_expr = sympy_condition_expr # Fallback + simplified_expr = sympy_condition_expr # Fallback # *** Convert simplified condition to SCL string *** condition_scl_simplified = sympy_expr_to_scl(simplified_expr, symbol_manager) @@ -132,7 +186,9 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): # *** Build the grouped IF SCL *** scl_grouped_lines = [f"IF {condition_scl_simplified} THEN"] for core_line in grouped_instructions_cores: - indented_core = "\n".join([f" {line.strip()}" for line in core_line.splitlines()]) + indented_core = "\n".join( + [f" {line.strip()}" for line in core_line.splitlines()] + ) scl_grouped_lines.append(indented_core) scl_grouped_lines.append("END_IF;") final_grouped_scl = "\n".join(scl_grouped_lines) @@ -147,18 +203,19 @@ def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): return made_change + def load_processors(processors_dir="processors"): """ Escanea el directorio, importa módulos, construye el mapa y una lista ordenada por prioridad. """ processor_map = {} - processor_list_unsorted = [] # Lista para guardar (priority, type_name, func) - default_priority = 10 # Prioridad si no se define en get_processor_info + processor_list_unsorted = [] # Lista para guardar (priority, type_name, func) + default_priority = 10 # Prioridad si no se define en get_processor_info if not os.path.isdir(processors_dir): print(f"Error: Directorio de procesadores no encontrado: '{processors_dir}'") - return processor_map, [] # Devuelve mapa vacío y lista vacía + return processor_map, [] # Devuelve mapa vacío y lista vacía print(f"Cargando procesadores desde: '{processors_dir}'") processors_package = os.path.basename(processors_dir) @@ -170,7 +227,9 @@ def load_processors(processors_dir="processors"): try: module = importlib.import_module(full_module_name) - if hasattr(module, 'get_processor_info') and callable(module.get_processor_info): + if hasattr(module, "get_processor_info") and callable( + module.get_processor_info + ): processor_info = module.get_processor_info() info_list = [] if isinstance(processor_info, dict): @@ -178,29 +237,51 @@ def load_processors(processors_dir="processors"): elif isinstance(processor_info, list): info_list = processor_info else: - print(f" Advertencia: get_processor_info en {full_module_name} devolvió tipo inesperado. Se ignora.") + print( + f" Advertencia: get_processor_info en {full_module_name} devolvió tipo inesperado. Se ignora." + ) continue for info in info_list: - if isinstance(info, dict) and 'type_name' in info and 'processor_func' in info: - type_name = info['type_name'].lower() - processor_func = info['processor_func'] + if ( + isinstance(info, dict) + and "type_name" in info + and "processor_func" in info + ): + type_name = info["type_name"].lower() + processor_func = info["processor_func"] # Obtener prioridad, usar default si no existe - priority = info.get('priority', default_priority) + priority = info.get("priority", default_priority) if callable(processor_func): if type_name in processor_map: - print(f" Advertencia: '{type_name}' en {full_module_name} sobrescribe definición anterior.") + print( + f" Advertencia: '{type_name}' en {full_module_name} sobrescribe definición anterior." + ) processor_map[type_name] = processor_func # Añadir a la lista para ordenar - processor_list_unsorted.append({'priority': priority, 'type_name': type_name, 'func': processor_func}) - print(f" - Cargado '{type_name}' (Prio: {priority}) desde {module_name_rel}.py") + processor_list_unsorted.append( + { + "priority": priority, + "type_name": type_name, + "func": processor_func, + } + ) + print( + f" - Cargado '{type_name}' (Prio: {priority}) desde {module_name_rel}.py" + ) else: - print(f" Advertencia: 'processor_func' para '{type_name}' en {full_module_name} no es callable.") + print( + f" Advertencia: 'processor_func' para '{type_name}' en {full_module_name} no es callable." + ) else: - print(f" Advertencia: Entrada inválida en {full_module_name}: {info}") + print( + f" Advertencia: Entrada inválida en {full_module_name}: {info}" + ) else: - print(f" Advertencia: Módulo {module_name_rel}.py no tiene 'get_processor_info'.") + print( + f" Advertencia: Módulo {module_name_rel}.py no tiene 'get_processor_info'." + ) except ImportError as e: print(f"Error importando {full_module_name}: {e}") @@ -209,22 +290,24 @@ def load_processors(processors_dir="processors"): traceback.print_exc() # Ordenar la lista por prioridad (menor primero) - processor_list_sorted = sorted(processor_list_unsorted, key=lambda x: x['priority']) + processor_list_sorted = sorted(processor_list_unsorted, key=lambda x: x["priority"]) print(f"\nTotal de tipos de procesadores cargados: {len(processor_map)}") - print(f"Orden de procesamiento por prioridad: {[item['type_name'] for item in processor_list_sorted]}") + print( + f"Orden de procesamiento por prioridad: {[item['type_name'] for item in processor_list_sorted]}" + ) # Devolver el mapa (para lookup rápido si es necesario) y la lista ordenada return processor_map, processor_list_sorted + # --- Bucle Principal de Procesamiento (Modificado para STL) --- def process_json_to_scl(json_filepath): """ - Lee el JSON simplificado, aplica los procesadores dinámicamente cargados - siguiendo un orden de prioridad (ignorando redes STL), y guarda el JSON procesado. + Lee JSON simplificado, aplica procesadores dinámicos (ignorando redes STL y bloques DB), + y guarda JSON procesado. """ - global data # Necesario para que load_processors y process_group_ifs (definidas fuera) puedan acceder a ella. - # Considerar pasar 'data' como argumento si es posible refactorizar. + global data if not os.path.exists(json_filepath): print(f"Error: JSON no encontrado: {json_filepath}") @@ -232,48 +315,83 @@ def process_json_to_scl(json_filepath): print(f"Cargando JSON desde: {json_filepath}") try: with open(json_filepath, "r", encoding="utf-8") as f: - data = json.load(f) # Carga en 'data' global + data = json.load(f) except Exception as e: print(f"Error al cargar JSON: {e}") traceback.print_exc() return - # --- Carga dinámica de procesadores --- + # --- Obtener lenguaje del bloque principal --- + block_language = data.get("language", "Unknown") + block_type = data.get("block_type", "Unknown") # FC, FB, GlobalDB + print(f"Procesando bloque tipo: {block_type}, Lenguaje principal: {block_language}") + + # --- SI ES UN DB, SALTAR EL PROCESAMIENTO LÓGICO --- + if block_language == "DB": + print( + "INFO: El bloque es un Data Block (DB). Saltando procesamiento lógico de x2." + ) + # Simplemente guardamos una copia (o el mismo archivo si no se requiere sufijo) + output_filename = json_filepath.replace( + "_simplified.json", "_simplified_processed.json" + ) + print(f"Guardando JSON de DB (sin cambios lógicos) en: {output_filename}") + try: + with open(output_filename, "w", encoding="utf-8") as f: + json.dump(data, f, indent=4, ensure_ascii=False) + print("Guardado de DB completado.") + except Exception as e: + print(f"Error Crítico al guardar JSON del DB: {e}") + traceback.print_exc() + return # <<< SALIR TEMPRANO PARA DBs + + # --- SI NO ES DB, CONTINUAR CON EL PROCESAMIENTO LÓGICO (FC/FB) --- + print("INFO: El bloque es FC/FB. Iniciando procesamiento lógico...") + script_dir = os.path.dirname(__file__) - processors_dir_path = os.path.join(script_dir, 'processors') + processors_dir_path = os.path.join(script_dir, "processors") processor_map, sorted_processors = load_processors(processors_dir_path) if not processor_map: print("Error crítico: No se cargaron procesadores. Abortando.") return - # --- Crear mapas de acceso por red --- network_access_maps = {} - # (La lógica para llenar network_access_maps no cambia, puedes copiarla de tu original) + # Crear mapas de acceso por red (copiado/adaptado de versión anterior) for network in data.get("networks", []): net_id = network["id"] current_access_map = {} for instr in network.get("logic", []): - for _, source in instr.get("inputs", {}).items(): - sources_to_check = (source if isinstance(source, list) else ([source] if isinstance(source, dict) else [])) - for src in sources_to_check: - if (isinstance(src, dict) and src.get("uid") and src.get("type") in ["variable", "constant"]): - current_access_map[src["uid"]] = src - for _, dest_list in instr.get("outputs", {}).items(): - if isinstance(dest_list, list): - for dest in dest_list: - if (isinstance(dest, dict) and dest.get("uid") and dest.get("type") in ["variable", "constant"]): - current_access_map[dest["uid"]] = dest + for _, source in instr.get("inputs", {}).items(): + sources_to_check = ( + source + if isinstance(source, list) + else ([source] if isinstance(source, dict) else []) + ) + for src in sources_to_check: + if ( + isinstance(src, dict) + and src.get("uid") + and src.get("type") in ["variable", "constant"] + ): + current_access_map[src["uid"]] = src + for _, dest_list in instr.get("outputs", {}).items(): + if isinstance(dest_list, list): + for dest in dest_list: + if ( + isinstance(dest, dict) + and dest.get("uid") + and dest.get("type") in ["variable", "constant"] + ): + current_access_map[dest["uid"]] = dest network_access_maps[net_id] = current_access_map - # --- Inicializar mapa SymPy y SymbolManager --- symbol_manager = SymbolManager() sympy_map = {} - max_passes = 30 passes = 0 processing_complete = False - print("\n--- Iniciando Bucle de Procesamiento Iterativo (con SymPy y prioridad) ---") + print("\n--- Iniciando Bucle de Procesamiento Iterativo (FC/FB) ---") while passes < max_passes and not processing_complete: passes += 1 made_change_in_base_pass = False @@ -284,90 +402,99 @@ def process_json_to_scl(json_filepath): # --- FASE 1: Procesadores Base (Ignorando STL) --- print(f" Fase 1 (SymPy Base - Orden por Prioridad):") - num_sympy_processed_this_pass = 0 + num_sympy_processed_this_pass = 0 # Resetear contador para el pase for processor_info in sorted_processors: - current_type_name = processor_info['type_name'] - func_to_call = processor_info['func'] - + current_type_name = processor_info["type_name"] + func_to_call = processor_info["func"] for network in data.get("networks", []): network_id = network["id"] - network_lang = network.get("language", "LAD") # Obtener lenguaje de la red - - # *** IGNORAR REDES STL EN ESTA FASE *** + network_lang = network.get("language", "LAD") if network_lang == "STL": - continue # Saltar al siguiente network + continue # Saltar STL access_map = network_access_maps.get(network_id, {}) network_logic = network.get("logic", []) - for instruction in network_logic: instr_uid = instruction.get("instruction_uid") instr_type_original = instruction.get("type", "Unknown") - - # Saltar si ya procesado, error, agrupado o es chunk STL/SCL/Unsupported - if (instr_type_original.endswith(SCL_SUFFIX) + if ( + instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original or instruction.get("grouped", False) - or instr_type_original in ["RAW_STL_CHUNK", "RAW_SCL_CHUNK", "UNSUPPORTED_LANG"]): + or instr_type_original + in ["RAW_STL_CHUNK", "RAW_SCL_CHUNK", "UNSUPPORTED_LANG"] + ): continue - - # Determinar tipo efectivo (como antes) lookup_key = instr_type_original.lower() effective_type_name = lookup_key if instr_type_original == "Call": - block_type = instruction.get("block_type", "").upper() - if block_type == "FC": effective_type_name = "call_fc" - elif block_type == "FB": effective_type_name = "call_fb" + block_type = instruction.get("block_type", "").upper() + if block_type == "FC": + effective_type_name = "call_fc" + elif block_type == "FB": + effective_type_name = "call_fb" - # Llamar al procesador si coincide el tipo if effective_type_name == current_type_name: try: - # Pasa sympy_map, symbol_manager y data - changed = func_to_call(instruction, network_id, sympy_map, symbol_manager, data) + changed = func_to_call( + instruction, network_id, sympy_map, symbol_manager, data + ) if changed: made_change_in_base_pass = True num_sympy_processed_this_pass += 1 except Exception as e: - print(f"ERROR(SymPy Base) al procesar {instr_type_original} UID {instr_uid}: {e}") - traceback.print_exc() - instruction["scl"] = f"// ERROR en SymPy procesador base: {e}" - instruction["type"] = instr_type_original + "_error" - made_change_in_base_pass = True # Marcar cambio aunque sea error - print(f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy.") - + print( + f"ERROR(SymPy Base) al procesar {instr_type_original} UID {instr_uid}: {e}" + ) + traceback.print_exc() + instruction["scl"] = ( + f"// ERROR en SymPy procesador base: {e}" + ) + instruction["type"] = instr_type_original + "_error" + made_change_in_base_pass = True + print( + f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy." + ) # --- FASE 2: Agrupación IF (Ignorando STL) --- - if made_change_in_base_pass or passes == 1: + if ( + made_change_in_base_pass or passes == 1 + ): # Ejecutar siempre en el primer pase print(f" Fase 2 (Agrupación IF con Simplificación):") - num_grouped_this_pass = 0 + num_grouped_this_pass = 0 # Resetear contador para el pase for network in data.get("networks", []): network_id = network["id"] - network_lang = network.get("language", "LAD") # Obtener lenguaje - - # *** IGNORAR REDES STL EN ESTA FASE *** + network_lang = network.get("language", "LAD") if network_lang == "STL": - continue # Saltar red STL - + continue # Saltar STL network_logic = network.get("logic", []) for instruction in network_logic: try: - # Llama a process_group_ifs (que necesita acceso a 'data' global o pasado) - group_changed = process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data) - if group_changed: - made_change_in_group_pass = True - num_grouped_this_pass += 1 + group_changed = process_group_ifs( + instruction, network_id, sympy_map, symbol_manager, data + ) + if group_changed: + made_change_in_group_pass = True + num_grouped_this_pass += 1 except Exception as e: - print(f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}") - traceback.print_exc() - print(f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL).") - + print( + f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}" + ) + traceback.print_exc() + print( + f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL)." + ) # --- Comprobar si se completó el procesamiento --- if not made_change_in_base_pass and not made_change_in_group_pass: - print(f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---") + print( + f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---" + ) processing_complete = True else: - print(f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando...") + print( + f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando..." + ) # --- Comprobar límite de pases --- if passes == max_passes and not processing_complete: @@ -376,46 +503,51 @@ def process_json_to_scl(json_filepath): # --- FIN BUCLE ITERATIVO --- # --- Verificación Final (Ajustada para RAW_STL_CHUNK) --- - print("\n--- Verificación Final de Instrucciones No Procesadas ---") + print("\n--- Verificación Final de Instrucciones No Procesadas (FC/FB) ---") unprocessed_count = 0 unprocessed_details = [] - # Añadir RAW_STL_CHUNK a los tipos ignorados - ignored_types = ['raw_scl_chunk', 'unsupported_lang', 'raw_stl_chunk'] # Añadido raw_stl_chunk - + ignored_types = [ + "raw_scl_chunk", + "unsupported_lang", + "raw_stl_chunk", + ] # Añadido raw_stl_chunk for network in data.get("networks", []): - network_id = network.get("id", "Unknown ID") - network_title = network.get("title", f"Network {network_id}") - network_lang = network.get("language", "LAD") # Obtener lenguaje - - # No verificar instrucciones dentro de redes STL, ya que no se procesan - if network_lang == "STL": - continue - - for instruction in network.get("logic", []): - instr_uid = instruction.get("instruction_uid", "Unknown UID") - instr_type = instruction.get("type", "Unknown Type") - is_grouped = instruction.get("grouped", False) - - # Condición revisada para ignorar los chunks crudos - if (not instr_type.endswith(SCL_SUFFIX) and - "_error" not in instr_type and - not is_grouped and - instr_type.lower() not in ignored_types): # Verifica contra lista actualizada - unprocessed_count += 1 - unprocessed_details.append( - f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), " - f"Instrucción UID: {instr_uid}, Tipo: '{instr_type}'" - ) - + network_id = network.get("id", "Unknown ID") + network_title = network.get("title", f"Network {network_id}") + network_lang = network.get("language", "LAD") + if network_lang == "STL": + continue # No verificar redes STL + for instruction in network.get("logic", []): + instr_uid = instruction.get("instruction_uid", "Unknown UID") + instr_type = instruction.get("type", "Unknown Type") + is_grouped = instruction.get("grouped", False) + if ( + not instr_type.endswith(SCL_SUFFIX) + and "_error" not in instr_type + and not is_grouped + and instr_type.lower() not in ignored_types + ): + unprocessed_count += 1 + unprocessed_details.append( + f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), " + f"Instrucción UID: {instr_uid}, Tipo: '{instr_type}'" + ) if unprocessed_count > 0: - print(f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:") - for detail in unprocessed_details: print(detail) + print( + f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:" + ) + for detail in unprocessed_details: + print(detail) else: - print("INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas.") + print( + "INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas." + ) # --- Guardar JSON Final --- - output_filename = json_filepath.replace("_simplified.json", "_simplified_processed.json") - print(f"\nGuardando JSON procesado en: {output_filename}") + output_filename = json_filepath.replace( + "_simplified.json", "_simplified_processed.json" + ) + print(f"\nGuardando JSON procesado (FC/FB) en: {output_filename}") try: with open(output_filename, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) @@ -424,6 +556,7 @@ def process_json_to_scl(json_filepath): print(f"Error Crítico al guardar JSON procesado: {e}") traceback.print_exc() + # --- Ejecución (sin cambios) --- if __name__ == "__main__": # Imports necesarios solo para la ejecución como script principal @@ -436,43 +569,55 @@ if __name__ == "__main__": description="Process simplified JSON (_simplified.json) to embed SCL logic (SymPy version). Expects original XML filepath as argument." ) parser.add_argument( - "source_xml_filepath", # Argumento posicional obligatorio + "source_xml_filepath", # Argumento posicional obligatorio help="Path to the original source XML file (passed from x0_main.py, used to derive JSON input name).", ) - args = parser.parse_args() # Parsea los argumentos de sys.argv + args = parser.parse_args() # Parsea los argumentos de sys.argv - source_xml_file = args.source_xml_filepath # Obtiene la ruta del XML original + source_xml_file = args.source_xml_filepath # Obtiene la ruta del XML original # Verificar si el archivo XML original existe (como referencia, útil para depuración) # No es estrictamente necesario para la lógica aquí, pero ayuda a confirmar if not os.path.exists(source_xml_file): - print(f"Advertencia (x2): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON correspondiente.") - # No salir necesariamente, pero es bueno saberlo. + print( + f"Advertencia (x2): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON correspondiente." + ) + # No salir necesariamente, pero es bueno saberlo. # Derivar nombre del archivo JSON de entrada (_simplified.json) xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0] # Asumir que el JSON simplificado está en el mismo directorio que el XML original - input_dir = os.path.dirname(source_xml_file) # Directorio del XML original + input_dir = os.path.dirname(source_xml_file) # Directorio del XML original input_json_file = os.path.join(input_dir, f"{xml_filename_base}_simplified.json") # Determinar el nombre esperado del archivo JSON procesado de salida - output_json_file = os.path.join(input_dir, f"{xml_filename_base}_simplified_processed.json") - - print(f"(x2) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'") + output_json_file = os.path.join( + input_dir, f"{xml_filename_base}_simplified_processed.json" + ) + print( + f"(x2) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'" + ) # Verificar si el archivo JSON de entrada (_simplified.json) EXISTE antes de procesar if not os.path.exists(input_json_file): - print(f"Error Fatal (x2): El archivo de entrada JSON simplificado no existe: '{input_json_file}'") - print(f"Asegúrate de que 'x1_to_json.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'.") - sys.exit(1) # Salir si el archivo necesario no está + print( + f"Error Fatal (x2): El archivo de entrada JSON simplificado no existe: '{input_json_file}'" + ) + print( + f"Asegúrate de que 'x1_to_json.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'." + ) + sys.exit(1) # Salir si el archivo necesario no está else: # Llamar a la función principal de procesamiento del script # Asumiendo que tu función principal se llama process_json_to_scl(input_json_path) try: process_json_to_scl(input_json_file) except Exception as e: - print(f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}") + print( + f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}" + ) import traceback + traceback.print_exc() - sys.exit(1) # Salir con error si la función principal falla \ No newline at end of file + sys.exit(1) # Salir con error si la función principal falla diff --git a/x3_generate_scl.py b/x3_generate_scl.py index 0551f51..e90d74e 100644 --- a/x3_generate_scl.py +++ b/x3_generate_scl.py @@ -5,290 +5,481 @@ import os import re import argparse import sys -import traceback # Importar traceback para errores +import traceback # Importar traceback para errores # --- Importar Utilidades y Constantes (Asumiendo ubicación) --- try: # Intenta importar desde el paquete de procesadores si está estructurado así from processors.processor_utils import format_variable_name + # Definir SCL_SUFFIX aquí o importarlo si está centralizado - SCL_SUFFIX = "_sympy_processed" # Asegúrate que coincida con x2_process.py - GROUPED_COMMENT = "// Logic included in grouped IF" # Opcional, si se usa para filtrar + SCL_SUFFIX = "_sympy_processed" # Asegúrate que coincida con x2_process.py + GROUPED_COMMENT = ( + "// Logic included in grouped IF" # Opcional, si se usa para filtrar + ) except ImportError: - print("Advertencia: No se pudo importar 'format_variable_name' desde processors.processor_utils.") - print("Usando una implementación local básica (¡PUEDE FALLAR CON NOMBRES COMPLEJOS!).") + print( + "Advertencia: No se pudo importar 'format_variable_name' desde processors.processor_utils." + ) + print( + "Usando una implementación local básica (¡PUEDE FALLAR CON NOMBRES COMPLEJOS!)." + ) + # Implementación local BÁSICA como fallback (MENOS RECOMENDADA) def format_variable_name(name): - if not name: return "_INVALID_NAME_" - if name.startswith('"') and name.endswith('"'): return name # Mantener comillas + if not name: + return "_INVALID_NAME_" + if name.startswith('"') and name.endswith('"'): + return name # Mantener comillas prefix = "#" if name.startswith("#") else "" - if prefix: name = name[1:] - if name and name[0].isdigit(): name = "_" + name + if prefix: + name = name[1:] + if name and name[0].isdigit(): + name = "_" + name name = re.sub(r"[^a-zA-Z0-9_]", "_", name) return prefix + name + SCL_SUFFIX = "_sympy_processed" GROUPED_COMMENT = "// Logic included in grouped IF" -# --- Función Principal de Generación SCL --- +# para formatear valores iniciales +def format_scl_start_value(value, datatype): + """Formatea un valor para la inicialización SCL según el tipo.""" + if value is None: + return None + datatype_lower = datatype.lower() if datatype else "" + value_str = str(value) + if "bool" in datatype_lower: + return "TRUE" if value_str.lower() == "true" else "FALSE" + elif "string" in datatype_lower: + escaped_value = value_str.replace("'", "''") + if escaped_value.startswith("'") and escaped_value.endswith("'"): + escaped_value = escaped_value[1:-1] + return f"'{escaped_value}'" + elif "char" in datatype_lower: # Añadido Char + escaped_value = value_str.replace("'", "''") + if escaped_value.startswith("'") and escaped_value.endswith("'"): + escaped_value = escaped_value[1:-1] + return f"'{escaped_value}'" + elif any( + t in datatype_lower + for t in [ + "int", + "byte", + "word", + "dint", + "dword", + "lint", + "lword", + "sint", + "usint", + "uint", + "udint", + "ulint", + ] + ): # Ampliado + try: + return str(int(value_str)) + except ValueError: + if re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", value_str): + return value_str + return f"'{value_str}'" # O como string si no es entero ni símbolo + elif "real" in datatype_lower or "lreal" in datatype_lower: + try: + f_val = float(value_str) + s_val = str(f_val) + if "." not in s_val and "e" not in s_val.lower(): + s_val += ".0" + return s_val + except ValueError: + if re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", value_str): + return value_str + return f"'{value_str}'" + elif "time" in datatype_lower: # Añadido Time, S5Time, LTime + # Quitar T#, LT#, S5T# si existen + prefix = "" + if value_str.upper().startswith("T#"): + prefix = "T#" + value_str = value_str[2:] + elif value_str.upper().startswith("LT#"): + prefix = "LT#" + value_str = value_str[3:] + elif value_str.upper().startswith("S5T#"): + prefix = "S5T#" + value_str = value_str[4:] + # Devolver con el prefijo correcto o T# por defecto si no había + if prefix: + return f"{prefix}{value_str}" + elif "s5time" in datatype_lower: + return f"S5T#{value_str}" + elif "ltime" in datatype_lower: + return f"LT#{value_str}" + else: + return f"T#{value_str}" # Default a TIME + elif "date" in datatype_lower: # Añadido Date, DT, TOD + if value_str.upper().startswith("D#"): + return value_str + elif "dt" in datatype_lower or "date_and_time" in datatype_lower: + if value_str.upper().startswith("DT#"): + return value_str + else: + return f"DT#{value_str}" # Añadir prefijo DT# + elif "tod" in datatype_lower or "time_of_day" in datatype_lower: + if value_str.upper().startswith("TOD#"): + return value_str + else: + return f"TOD#{value_str}" # Añadir prefijo TOD# + else: + return f"D#{value_str}" # Default a Date + # Fallback genérico + else: + if re.match( + r'^[a-zA-Z_][a-zA-Z0-9_."#\[\]]+$', value_str + ): # Permitir más caracteres en símbolos/tipos + # Si es un UDT o Struct complejo, podría venir con comillas, quitarlas + if value_str.startswith('"') and value_str.endswith('"'): + return value_str[1:-1] + return value_str + else: + escaped_value = value_str.replace("'", "''") + return f"'{escaped_value}'" + + +# --- NUEVA FUNCIÓN RECURSIVA para generar declaraciones SCL (VAR/STRUCT/ARRAY) --- +def generate_scl_declarations(variables, indent_level=1): + """Genera las líneas SCL para declarar variables, structs y arrays.""" + scl_lines = [] + indent = " " * indent_level + for var in variables: + var_name_scl = format_variable_name(var.get("name")) + var_dtype_raw = var.get("datatype", "VARIANT") + # Limpiar comillas de tipos de datos UDT ("MyType" -> MyType) + var_dtype = ( + var_dtype_raw.strip('"') + if var_dtype_raw.startswith('"') and var_dtype_raw.endswith('"') + else var_dtype_raw + ) + + var_comment = var.get("comment") + start_value = var.get("start_value") + children = var.get("children") # Para structs + array_elements = var.get("array_elements") # Para arrays + + # Manejar tipos de datos Array especiales + array_match = re.match(r"(Array\[.*\]\s+of\s+)(.*)", var_dtype, re.IGNORECASE) + base_type_for_init = var_dtype + declaration_dtype = var_dtype + if array_match: + array_prefix = array_match.group(1) + base_type_raw = array_match.group(2).strip() + # Limpiar comillas del tipo base del array + base_type_for_init = ( + base_type_raw.strip('"') + if base_type_raw.startswith('"') and base_type_raw.endswith('"') + else base_type_raw + ) + declaration_dtype = ( + f'{array_prefix}"{base_type_for_init}"' + if '"' not in base_type_raw + else f"{array_prefix}{base_type_raw}" + ) # Reconstruir con comillas si es UDT + + # Reconstruir declaración con comillas si es UDT y no array + elif ( + not array_match and var_dtype != base_type_for_init + ): # Es un tipo que necesita comillas (UDT) + declaration_dtype = f'"{var_dtype}"' + + declaration_line = f"{indent}{var_name_scl} : {declaration_dtype}" + init_value = None + + # ---- Arrays ---- + if array_elements: + # Ordenar índices (asumiendo que son numéricos) + try: + sorted_indices = sorted(array_elements.keys(), key=int) + except ValueError: + sorted_indices = sorted( + array_elements.keys() + ) # Fallback a orden alfabético + + init_values = [ + format_scl_start_value(array_elements[idx], base_type_for_init) + for idx in sorted_indices + ] + valid_inits = [v for v in init_values if v is not None] + if valid_inits: + init_value = f"[{', '.join(valid_inits)}]" + + # ---- Structs ---- + elif children: + # No añadir comentario // Struct aquí, es redundante + scl_lines.append(declaration_line) # Añadir línea de declaración base + scl_lines.append(f"{indent}STRUCT") + scl_lines.extend(generate_scl_declarations(children, indent_level + 1)) + scl_lines.append(f"{indent}END_STRUCT;") + if var_comment: + scl_lines.append(f"{indent}// {var_comment}") + scl_lines.append("") # Línea extra + continue # Saltar resto para Struct + + # ---- Tipos Simples ---- + else: + if start_value is not None: + init_value = format_scl_start_value(start_value, var_dtype) + + # Añadir inicialización si existe + if init_value: + declaration_line += f" := {init_value}" + declaration_line += ";" + if var_comment: + declaration_line += f" // {var_comment}" + scl_lines.append(declaration_line) + + return scl_lines + + +# --- Función Principal de Generación SCL --- def generate_scl(processed_json_filepath, output_scl_filepath): - """Genera un archivo SCL a partir del JSON procesado por x2_process (versión SymPy).""" + """Genera un archivo SCL a partir del JSON procesado (FC/FB o DB).""" if not os.path.exists(processed_json_filepath): - print(f"Error: Archivo JSON procesado no encontrado en '{processed_json_filepath}'") + print( + f"Error: Archivo JSON procesado no encontrado en '{processed_json_filepath}'" + ) return print(f"Cargando JSON procesado desde: {processed_json_filepath}") try: - with open(processed_json_filepath, 'r', encoding='utf-8') as f: + with open(processed_json_filepath, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: print(f"Error al cargar o parsear JSON: {e}") traceback.print_exc() return - # --- Extracción de Información del Bloque --- - block_name = data.get('block_name', 'UnknownBlock') - block_number = data.get('block_number') - block_lang_original = data.get('language', 'LAD') # Lenguaje original - # Determinar tipo de bloque SCL (Asumir FB si no se especifica) - # Idealmente, x1_to_json.py guardaría esto en data['block_type_scl'] = 'FC' o 'FB' - block_type_scl = data.get('block_type_scl', 'FUNCTION_BLOCK') - block_comment = data.get('block_comment', '') - - # Usar format_variable_name para el nombre del bloque en SCL - scl_block_name = format_variable_name(block_name) - print(f"Generando SCL para {block_type_scl}: {scl_block_name} (Original: {block_name})") - - # --- Identificación de Variables Temporales y Estáticas --- - # La detección basada en regex sobre el SCL final debería seguir funcionando - temp_vars = set() - stat_vars = set() - # Regex mejorado para capturar variables temporales que empiezan con # o _temp_ - # y estáticas (si usas un prefijo como 'stat_' o para bits de memoria de flanco) - temp_pattern = re.compile(r'"?#(_temp_[a-zA-Z0-9_]+)"?|"?(_temp_[a-zA-Z0-9_]+)"?') # Captura con o sin # - stat_pattern = re.compile(r'"?(stat_[a-zA-Z0-9_]+)"?') # Para memorias de flanco si usan prefijo 'stat_' - - edge_memory_bits = set() # Para detectar bits de memoria de flanco por nombre - - for network in data.get('networks', []): - for instruction in network.get('logic', []): - scl_code = instruction.get('scl', '') - # Buscar también en _edge_mem_update_scl si existe - edge_update_code = instruction.get('_edge_mem_update_scl','') - code_to_scan = (scl_code if scl_code else '') + '\n' + (edge_update_code if edge_update_code else '') - - if code_to_scan: - # Buscar #_temp_... o _temp_... - found_temps = temp_pattern.findall(code_to_scan) - for temp_tuple in found_temps: - # findall devuelve tuplas por los grupos de captura, tomar el no vacío - temp_name = next((t for t in temp_tuple if t), None) - if temp_name: - temp_vars.add("#"+temp_name if not temp_name.startswith("#") else temp_name) # Asegurar que empiece con # - - # Buscar estáticas (ej: stat_...) - found_stats = stat_pattern.findall(code_to_scan) - stat_vars.update(found_stats) - - # Identificar explícitamente bits de memoria usados por PBox/NBox - # Asumiendo que el nombre se guarda en el JSON (requiere ajuste en x1/x2) - # if instruction.get("type","").startswith(("PBox", "NBox")): - # mem_bit_info = instruction.get("inputs", {}).get("bit") - # if mem_bit_info and mem_bit_info.get("type") == "variable": - # edge_memory_bits.add(format_variable_name(mem_bit_info.get("name"))) - - - print(f"Variables temporales (#_temp_...) detectadas: {len(temp_vars)}") - # Si se detectan memorias de flanco, añadirlas a stat_vars si no tienen prefijo 'stat_' - # stat_vars.update(edge_memory_bits - stat_vars) # Añadir solo las nuevas - print(f"Variables estáticas (stat_...) detectadas: {len(stat_vars)}") - - # --- Construcción del String SCL --- + # --- Extracción de Información del Bloque (Común) --- + block_name = data.get("block_name", "UnknownBlock") + block_number = data.get("block_number") + block_lang_original = data.get("language", "Unknown") # Será "DB" para Data Blocks + block_type = data.get("block_type", "Unknown") # FC, FB, GlobalDB + block_comment = data.get("block_comment", "") + scl_block_name = format_variable_name(block_name) # Nombre SCL seguro + print( + f"Generando SCL para: {block_type} '{scl_block_name}' (Original: {block_name}, Lang: {block_lang_original})" + ) scl_output = [] - # Cabecera del Bloque - scl_output.append(f"// Block Name (Original): {block_name}") - if block_number: scl_output.append(f"// Block Number: {block_number}") - scl_output.append(f"// Original Language: {block_lang_original}") - if block_comment: scl_output.append(f"// Block Comment: {block_comment}") - scl_output.append("") - scl_output.append(f"{block_type_scl} \"{scl_block_name}\"") - scl_output.append("{ S7_Optimized_Access := 'TRUE' }") - scl_output.append("VERSION : 0.1") - scl_output.append("") + # --- GENERACIÓN PARA DATA BLOCK (DB) --- + if block_lang_original == "DB": + print("Modo de generación: DATA_BLOCK") + scl_output.append(f"// Block Type: {block_type}") + scl_output.append(f"// Block Name (Original): {block_name}") + if block_number: + scl_output.append(f"// Block Number: {block_number}") + if block_comment: + scl_output.append(f"// Block Comment: {block_comment}") + scl_output.append("") + scl_output.append(f'DATA_BLOCK "{scl_block_name}"') + scl_output.append("{ S7_Optimized_Access := 'TRUE' }") # Asumir optimizado + scl_output.append("VERSION : 0.1") + scl_output.append("") + interface_data = data.get("interface", {}) + static_vars = interface_data.get("Static", []) + if static_vars: + scl_output.append("VAR") + scl_output.extend(generate_scl_declarations(static_vars, indent_level=1)) + scl_output.append("END_VAR") + scl_output.append("") + else: + print( + "Advertencia: No se encontró sección 'Static' o está vacía en la interfaz del DB." + ) + scl_output.append("VAR") + scl_output.append("END_VAR") + scl_output.append("") + scl_output.append("BEGIN") + scl_output.append("") + scl_output.append("END_DATA_BLOCK") - # Declaraciones de Interfaz (Implementación básica) - interface_sections = ["Input", "Output", "InOut", "Static", "Temp", "Constant", "Return"] - interface_data = data.get('interface', {}) + # --- GENERACIÓN PARA FUNCTION BLOCK / FUNCTION (FC/FB) --- + else: + print("Modo de generación: FUNCTION_BLOCK / FUNCTION") + scl_block_keyword = "FUNCTION_BLOCK" if block_type == "FB" else "FUNCTION" + # Cabecera del Bloque + scl_output.append(f"// Block Type: {block_type}") + scl_output.append(f"// Block Name (Original): {block_name}") + if block_number: + scl_output.append(f"// Block Number: {block_number}") + scl_output.append(f"// Original Language: {block_lang_original}") + if block_comment: + scl_output.append(f"// Block Comment: {block_comment}") + scl_output.append("") + # Manejar tipo de retorno para FUNCTION + return_type = "Void" # Default + interface_data = data.get("interface", {}) + if scl_block_keyword == "FUNCTION" and interface_data.get("Return"): + return_member = interface_data["Return"][ + 0 + ] # Asumir un solo valor de retorno + return_type_raw = return_member.get("datatype", "Void") + return_type = ( + return_type_raw.strip('"') + if return_type_raw.startswith('"') and return_type_raw.endswith('"') + else return_type_raw + ) + # Añadir comillas si es UDT + if return_type != return_type_raw: + return_type = f'"{return_type}"' - for section_name in interface_sections: - scl_section_name = section_name - # Ajustar nombres de sección para SCL (Static -> STAT, Temp -> TEMP) - if section_name == "Static": scl_section_name = "STAT" - if section_name == "Temp": scl_section_name = "TEMP" # Usar VAR_TEMP para variables #temp - - vars_in_section = interface_data.get(section_name, []) - # No declarar VAR_TEMP aquí, se hará después con las detectadas/originales - if section_name == "Temp": continue - - # No declarar VAR_STAT aquí si ya lo hacemos abajo con las detectadas - if section_name == "Static" and stat_vars: continue - - - if vars_in_section or (section_name == "Static" and stat_vars): # Incluir STAT si hay detectadas - # Usar VAR para Input/Output/InOut/Constant/Return - var_keyword = "VAR" if section_name != "Static" else "VAR_STAT" - scl_output.append(f"{var_keyword}_{section_name.upper()}") - - for var in vars_in_section: - var_name = var.get('name') - var_dtype = var.get('datatype', 'VARIANT') # Default a VARIANT - if var_name: - # Usar format_variable_name CORRECTO - scl_name = format_variable_name(var_name) - scl_output.append(f" {scl_name} : {var_dtype};") - - # Declarar stat_vars detectadas si esta es la sección STAT - if section_name == "Static" and stat_vars: - for var_name in sorted(list(stat_vars)): - # Asumir Bool para stat_, podría necesitar inferencia - scl_output.append(f" {format_variable_name(var_name)} : Bool; // Auto-detected STAT") - - scl_output.append("END_VAR") - scl_output.append("") - - # Declaraciones Estáticas (Si no estaban en la interfaz y se detectaron) - # Esto es redundante si la sección VAR_STAT ya se generó arriba - # if stat_vars and not interface_data.get("Static"): - # scl_output.append("VAR_STAT") - # for var_name in sorted(list(stat_vars)): - # scl_output.append(f" {format_variable_name(var_name)} : Bool; // Auto-detected STAT") - # scl_output.append("END_VAR") - # scl_output.append("") - - - # Declaraciones Temporales (Interfaz Temp + _temp_ detectadas) - scl_output.append("VAR_TEMP") - declared_temps = set() - interface_temps = interface_data.get('Temp', []) - if interface_temps: - for var in interface_temps: - var_name = var.get('name') - var_dtype = var.get('datatype', 'VARIANT') - if var_name: - scl_name = format_variable_name(var_name) - scl_output.append(f" {scl_name} : {var_dtype};") - declared_temps.add(scl_name) # Marcar como declarada - - # Declarar las _temp_ generadas si no estaban ya en la interfaz Temp - if temp_vars: - for var_name in sorted(list(temp_vars)): - scl_name = format_variable_name(var_name) # #_temp_... - if scl_name not in declared_temps: - # Inferencia básica de tipo - inferred_type = "Bool" # Asumir Bool para la mayoría de temps de lógica - # Se podría mejorar si los procesadores añadieran info de tipo - scl_output.append(f" {scl_name} : {inferred_type}; // Auto-generated temporary") - declared_temps.add(scl_name) - scl_output.append("END_VAR") - scl_output.append("") - - # Cuerpo del Bloque - scl_output.append("BEGIN") - scl_output.append("") - - # Iterar por redes y lógica - for i, network in enumerate(data.get('networks', [])): - network_title = network.get('title', f'Network {network.get("id")}') - network_comment = network.get('comment', '') - network_lang = network.get('language', 'LAD') # O el lenguaje original - - scl_output.append(f" // Network {i+1}: {network_title} (Original Language: {network_lang})") - if network_comment: - for line in network_comment.splitlines(): - scl_output.append(f" // {line}") + scl_output.append( + f'{scl_block_keyword} "{scl_block_name}" : {return_type}' + if scl_block_keyword == "FUNCTION" + else f'{scl_block_keyword} "{scl_block_name}"' + ) + scl_output.append("{ S7_Optimized_Access := 'TRUE' }") + scl_output.append("VERSION : 0.1") scl_output.append("") - network_has_code = False + # Declaraciones de Interfaz FC/FB + section_order = [ + "Input", + "Output", + "InOut", + "Static", + "Temp", + "Constant", + ] # Return ya está en cabecera + declared_temps = set() + for section_name in section_order: + vars_in_section = interface_data.get(section_name, []) + if vars_in_section: + scl_section_keyword = f"VAR_{section_name.upper()}" + if section_name == "Static": + scl_section_keyword = "VAR_STAT" + if section_name == "Temp": + scl_section_keyword = "VAR_TEMP" + if section_name == "Constant": + scl_section_keyword = "CONSTANT" + scl_output.append(scl_section_keyword) + scl_output.extend( + generate_scl_declarations(vars_in_section, indent_level=1) + ) + if section_name == "Temp": + declared_temps.update( + format_variable_name(v.get("name")) + for v in vars_in_section + if v.get("name") + ) + scl_output.append("END_VAR") + scl_output.append("") - # --- NUEVO MANEJO STL con formato Markdown --- - if network_lang == "STL": - network_has_code = True # Marcar que la red tiene contenido - if network.get('logic') and isinstance(network['logic'], list) and len(network['logic']) > 0: - stl_chunk = network['logic'][0] - if stl_chunk.get("type") == "RAW_STL_CHUNK" and "stl" in stl_chunk: - raw_stl_code = stl_chunk["stl"] - # Añadir marcador de inicio (como comentario SCL para evitar errores) - scl_output.append(f" {'//'} ```STL") # Doble '//' para asegurar que sea comentario - # Escribir el código STL crudo, indentado + # Declaraciones VAR_TEMP adicionales detectadas + temp_vars = set() + temp_pattern = re.compile( + r'"?#(_temp_[a-zA-Z0-9_]+)"?|"?(_temp_[a-zA-Z0-9_]+)"?' + ) + for network in data.get("networks", []): + for instruction in network.get("logic", []): + scl_code = instruction.get("scl", "") + edge_update_code = instruction.get("_edge_mem_update_scl", "") + code_to_scan = ( + (scl_code if scl_code else "") + + "\n" + + (edge_update_code if edge_update_code else "") + ) + if code_to_scan: + found_temps = temp_pattern.findall(code_to_scan) + for temp_tuple in found_temps: + temp_name = next((t for t in temp_tuple if t), None) + if temp_name: + temp_vars.add( + "#" + temp_name + if not temp_name.startswith("#") + else temp_name + ) + additional_temps = sorted(list(temp_vars - declared_temps)) + if additional_temps: + if not interface_data.get("Temp"): + scl_output.append("VAR_TEMP") + for var_name in additional_temps: + scl_name = format_variable_name(var_name) + inferred_type = "Bool" # Asumir Bool + scl_output.append( + f" {scl_name} : {inferred_type}; // Auto-generated temporary" + ) + if not interface_data.get("Temp"): + scl_output.append("END_VAR") + scl_output.append("") + + # Cuerpo del Bloque FC/FB + scl_output.append("BEGIN") + scl_output.append("") + # Iterar por redes y lógica (como antes, incluyendo manejo STL Markdown) + for i, network in enumerate(data.get("networks", [])): + network_title = network.get("title", f'Network {network.get("id")}') + network_comment = network.get("comment", "") + network_lang = network.get("language", "LAD") + scl_output.append( + f" // Network {i+1}: {network_title} (Original Language: {network_lang})" + ) + if network_comment: + for line in network_comment.splitlines(): + scl_output.append(f" // {line}") + scl_output.append("") + network_has_code = False + if network_lang == "STL": + network_has_code = True + if ( + network.get("logic") + and network["logic"][0].get("type") == "RAW_STL_CHUNK" + ): + raw_stl_code = network["logic"][0].get( + "stl", "// ERROR: STL code missing" + ) + scl_output.append(f" {'//'} ```STL") for stl_line in raw_stl_code.splitlines(): - # Añadir indentación estándar de SCL - scl_output.append(f" {stl_line}") # <-- STL sin comentar - # Añadir marcador de fin (como comentario SCL) + scl_output.append(f" {stl_line}") scl_output.append(f" {'//'} ```") else: - scl_output.append(" // ERROR: Contenido STL inesperado en JSON.") - else: - scl_output.append(" // ERROR: No se encontró lógica STL en JSON para esta red.") - scl_output.append("") # Línea en blanco después de la red STL - # --- FIN NUEVO MANEJO STL con formato Markdown --- - else: - - # Iterar sobre la 'logica' de la red - for instruction in network.get('logic', []): - instruction_type = instruction.get("type", "") - scl_code = instruction.get('scl', "") # Obtener SCL generado por x2 - - # Saltar instrucciones agrupadas - if instruction.get("grouped", False): - continue - - # Escribir SCL si es un tipo procesado y tiene código relevante - # (Ignorar comentarios de depuración de SymPy) - if instruction_type.endswith(SCL_SUFFIX) and scl_code: - is_internal_sympy_comment_only = scl_code.strip().startswith("// SymPy") or \ - scl_code.strip().startswith("// PBox SymPy processed") or \ - scl_code.strip().startswith("// NBox SymPy processed") - # O podría ser más genérico: ignorar cualquier línea que solo sea comentario SCL - is_only_comment = all(line.strip().startswith("//") for line in scl_code.splitlines()) - - - # Escribir solo si NO es un comentario interno de SymPy O si es un bloque IF (que sí debe escribirse) - if not is_only_comment or scl_code.strip().startswith("IF"): - network_has_code = True - for line in scl_code.splitlines(): - # Añadir indentación estándar - scl_output.append(f" {line}") - - # Incluir también tipos especiales directamente - elif instruction_type in ["RAW_SCL_CHUNK", "UNSUPPORTED_LANG"] and scl_code: - network_has_code = True - for line in scl_code.splitlines(): - scl_output.append(f" {line}") # Indentar - - # Podríamos añadir comentarios para errores si se desea - # elif "_error" in instruction_type: - # network_has_code = True - # scl_output.append(f" // ERROR processing instruction UID {instruction.get('instruction_uid')}: {instruction.get('scl', 'No details')}") - - + scl_output.append(" // ERROR: Contenido STL inesperado.") + else: # LAD, FBD, SCL, etc. + for instruction in network.get("logic", []): + instruction_type = instruction.get("type", "") + scl_code = instruction.get("scl", "") + is_grouped = instruction.get("grouped", False) + if is_grouped: + continue + if ( + instruction_type.endswith(SCL_SUFFIX) + or instruction_type in ["RAW_SCL_CHUNK", "UNSUPPORTED_LANG"] + ) and scl_code: + is_only_comment = all( + line.strip().startswith("//") + for line in scl_code.splitlines() + if line.strip() + ) + is_if_block = scl_code.strip().startswith("IF") + if not is_only_comment or is_if_block: + network_has_code = True + for line in scl_code.splitlines(): + scl_output.append(f" {line}") if network_has_code: - scl_output.append("") # Línea en blanco después del código de la red + scl_output.append("") else: scl_output.append(f" // Network did not produce printable SCL code.") scl_output.append("") + # Fin del bloque FC/FB + scl_output.append(f"END_{scl_block_keyword}") - # Fin del bloque - scl_output.append("END_FUNCTION_BLOCK") # O END_FUNCTION si es FC - - # --- Escritura del Archivo SCL --- + # --- Escritura del Archivo SCL (Común) --- print(f"Escribiendo archivo SCL en: {output_scl_filepath}") try: - with open(output_scl_filepath, 'w', encoding='utf-8') as f: + with open(output_scl_filepath, "w", encoding="utf-8") as f: for line in scl_output: - f.write(line + '\n') + f.write(line + "\n") print("Generación de SCL completada.") except Exception as e: print(f"Error al escribir el archivo SCL: {e}") @@ -301,47 +492,61 @@ if __name__ == "__main__": import argparse import os import sys - import traceback # Asegurarse que traceback está importado si se usa en generate_scl + import traceback # Asegurarse que traceback está importado si se usa en generate_scl # Configurar ArgumentParser para recibir la ruta del XML original obligatoria parser = argparse.ArgumentParser( description="Generate final SCL file from processed JSON (_simplified_processed.json). Expects original XML filepath as argument." ) parser.add_argument( - "source_xml_filepath", # Argumento posicional obligatorio + "source_xml_filepath", # Argumento posicional obligatorio help="Path to the original source XML file (passed from x0_main.py, used to derive input/output names).", ) - args = parser.parse_args() # Parsea los argumentos de sys.argv + args = parser.parse_args() # Parsea los argumentos de sys.argv - source_xml_file = args.source_xml_filepath # Obtiene la ruta del XML original + source_xml_file = args.source_xml_filepath # Obtiene la ruta del XML original # Verificar si el archivo XML original existe (como referencia) if not os.path.exists(source_xml_file): - print(f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON procesado.") - # No salir necesariamente. + print( + f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON procesado." + ) + # No salir necesariamente. # Derivar nombres de archivos de entrada (JSON procesado) y salida (SCL) xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0] # Asumir que los archivos están en el mismo directorio que el XML original - base_dir = os.path.dirname(source_xml_file) # Directorio del XML original + base_dir = os.path.dirname(source_xml_file) # Directorio del XML original - input_json_file = os.path.join(base_dir, f"{xml_filename_base}_simplified_processed.json") - output_scl_file = os.path.join(base_dir, f"{xml_filename_base}_simplified_processed.scl") + input_json_file = os.path.join( + base_dir, f"{xml_filename_base}_simplified_processed.json" + ) + output_scl_file = os.path.join( + base_dir, f"{xml_filename_base}_simplified_processed.scl" + ) - print(f"(x3) Generando SCL: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_scl_file)}'") + print( + f"(x3) Generando SCL: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_scl_file)}'" + ) # Verificar si el archivo JSON procesado de entrada EXISTE if not os.path.exists(input_json_file): - print(f"Error Fatal (x3): Archivo JSON procesado no encontrado: '{input_json_file}'") - print(f"Asegúrate de que 'x2_process.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'.") - sys.exit(1) # Salir si el archivo necesario no está + print( + f"Error Fatal (x3): Archivo JSON procesado no encontrado: '{input_json_file}'" + ) + print( + f"Asegúrate de que 'x2_process.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'." + ) + sys.exit(1) # Salir si el archivo necesario no está else: # Llamar a la función principal de generación SCL del script # Asumiendo que tu función principal se llama generate_scl(input_json_path, output_scl_path) try: generate_scl(input_json_file, output_scl_file) except Exception as e: - print(f"Error Crítico (x3) durante la generación de SCL desde '{input_json_file}': {e}") + print( + f"Error Crítico (x3) durante la generación de SCL desde '{input_json_file}': {e}" + ) # traceback ya debería estar importado si generate_scl lo necesita traceback.print_exc() - sys.exit(1) # Salir con error si la función principal falla \ No newline at end of file + sys.exit(1) # Salir con error si la función principal falla