""" LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL Este script convierte un archivo JSON simplificado (resultado de un análisis de un XML de Siemens) a un JSON enriquecido con lógica SCL. Se enfoca en la lógica de programación y la agrupación de instrucciones IF. """ # -*- coding: utf-8 -*- import json import argparse import os import copy import traceback import re import importlib import sys import sympy script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration # Import necessary components from processors directory from processors.processor_utils import format_variable_name, sympy_expr_to_scl from processors.symbol_manager import SymbolManager # --- Constantes y Configuración --- SCL_SUFFIX = "_sympy_processed" GROUPED_COMMENT = "// Logic included in grouped IF" SIMPLIFIED_IF_COMMENT = "// Simplified IF condition by script" # Global data dictionary data = {} # --- (process_group_ifs y load_processors SIN CAMBIOS) --- def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): instr_uid = instruction["instruction_uid"] instr_type_original = ( instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "") ) made_change = False if ( not instruction.get("type", "").endswith(SCL_SUFFIX) or "_error" in instruction.get("type", "") or instruction.get("grouped", False) or instr_type_original not in [ "Contact", "O", "Eq", "Ne", "Gt", "Lt", "Ge", "Le", "PBox", "NBox", "And", "Xor", "Not", ] ): return False current_scl = instruction.get("scl", "") if ( current_scl.strip().startswith("IF") and "END_IF;" in current_scl and GROUPED_COMMENT not in current_scl ): return False map_key_out = (network_id, instr_uid, "out") sympy_condition_expr = sympy_map.get(map_key_out) if sympy_condition_expr is None or sympy_condition_expr in [ sympy.true, sympy.false, ]: return False grouped_instructions_cores = [] consumer_instr_list = [] network_logic = next( (net["logic"] for net in data["networks"] if net["id"] == network_id), [] ) if not network_logic: return False groupable_types = [ "Move", "Add", "Sub", "Mul", "Div", "Mod", "Convert", "Call_FC", "Call_FB", "SCoil", "RCoil", "BLKMOV", "TON", "TOF", "TP", "Se", "Sd", "CTU", "CTD", "CTUD", ] for consumer_instr in network_logic: consumer_uid = consumer_instr["instruction_uid"] if consumer_instr.get("grouped", False) or consumer_uid == instr_uid: continue consumer_en = consumer_instr.get("inputs", {}).get("en") consumer_type = consumer_instr.get("type", "") consumer_type_original = consumer_type.replace(SCL_SUFFIX, "").replace( "_error", "" ) is_enabled_by_us = False if ( isinstance(consumer_en, dict) and consumer_en.get("type") == "connection" and consumer_en.get("source_instruction_uid") == instr_uid and consumer_en.get("source_pin") == "out" ): is_enabled_by_us = True if ( is_enabled_by_us and consumer_type.endswith(SCL_SUFFIX) and consumer_type_original in groupable_types ): consumer_scl = consumer_instr.get("scl", "") core_scl = None if consumer_scl: if consumer_scl.strip().startswith("IF"): match = re.search( r"IF\s+.*?THEN\s*(.*?)\s*END_IF;", consumer_scl, re.DOTALL | re.IGNORECASE, ) core_scl = match.group(1).strip() if match else None elif not consumer_scl.strip().startswith("//"): core_scl = consumer_scl.strip() if core_scl: grouped_instructions_cores.append(core_scl) consumer_instr_list.append(consumer_instr) if len(grouped_instructions_cores) > 1: print( f"INFO: Agrupando {len(grouped_instructions_cores)} instr. bajo condición de {instr_type_original} UID {instr_uid}" ) try: simplified_expr = sympy.logic.boolalg.to_dnf( sympy_condition_expr, simplify=True ) except Exception as e: print(f"Error simplifying condition for grouping UID {instr_uid}: {e}") simplified_expr = sympy_condition_expr condition_scl_simplified = sympy_expr_to_scl(simplified_expr, symbol_manager) scl_grouped_lines = [f"IF {condition_scl_simplified} THEN"] for core_line in grouped_instructions_cores: indented_core = "\n".join( [f" {line.strip()}" for line in core_line.splitlines()] ) scl_grouped_lines.append(indented_core) scl_grouped_lines.append("END_IF;") final_grouped_scl = "\n".join(scl_grouped_lines) instruction["scl"] = final_grouped_scl for consumer_instr in consumer_instr_list: consumer_instr["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})" consumer_instr["grouped"] = True made_change = True return made_change def load_processors(processors_dir="processors"): processor_map = {} processor_list_unsorted = [] default_priority = 10 if not os.path.isdir(processors_dir): print(f"Error: Directorio de procesadores no encontrado: '{processors_dir}'") return processor_map, [] print(f"Cargando procesadores desde: '{processors_dir}'") processors_package = os.path.basename(processors_dir) for filename in os.listdir(processors_dir): if filename.startswith("process_") and filename.endswith(".py"): module_name_rel = filename[:-3] full_module_name = f"{processors_package}.{module_name_rel}" try: module = importlib.import_module(full_module_name) if hasattr(module, "get_processor_info") and callable( module.get_processor_info ): processor_info = module.get_processor_info() info_list = [] if isinstance(processor_info, dict): info_list = [processor_info] elif isinstance(processor_info, list): info_list = processor_info else: print( f" Advertencia: get_processor_info en {full_module_name} devolvió tipo inesperado." ) continue for info in info_list: if ( isinstance(info, dict) and "type_name" in info and "processor_func" in info ): type_name = info["type_name"].lower() processor_func = info["processor_func"] priority = info.get("priority", default_priority) if callable(processor_func): if type_name in processor_map: print( f" Advertencia: '{type_name}' en {full_module_name} sobrescribe definición anterior." ) processor_map[type_name] = processor_func processor_list_unsorted.append( { "priority": priority, "type_name": type_name, "func": processor_func, } ) else: print( f" Advertencia: 'processor_func' para '{type_name}' en {full_module_name} no es callable." ) else: print( f" Advertencia: Entrada inválida en {full_module_name}: {info}" ) else: print( f" Advertencia: Módulo {module_name_rel}.py no tiene 'get_processor_info'." ) except ImportError as e: print(f"Error importando {full_module_name}: {e}") except Exception as e: print(f"Error procesando {full_module_name}: {e}") traceback.print_exc() processor_list_sorted = sorted(processor_list_unsorted, key=lambda x: x["priority"]) return processor_map, processor_list_sorted # --- Bucle Principal de Procesamiento (MODIFICADO para copiar metadatos) --- def process_json_to_scl(json_filepath, output_json_filepath): """ Lee JSON simplificado, copia metadatos, aplica procesadores dinámicos, y guarda JSON procesado en la ruta especificada. """ global data if not os.path.exists(json_filepath): print( f"Error Crítico (x2): JSON de entrada no encontrado: {json_filepath}", file=sys.stderr, ) return False print(f"Cargando JSON desde: {json_filepath}") try: with open(json_filepath, "r", encoding="utf-8") as f: data = json.load(f) # Carga el JSON de entrada except Exception as e: print(f"Error Crítico al cargar JSON de entrada: {e}", file=sys.stderr) traceback.print_exc(file=sys.stderr) return False # <-- NUEVO: Extraer metadatos del JSON de entrada (si existen) --> source_xml_mod_time = data.get("source_xml_mod_time") source_xml_size = data.get("source_xml_size") # <-- FIN NUEVO --> block_type = data.get("block_type", "Unknown") print(f"Procesando bloque tipo: {block_type}") if block_type in ["GlobalDB", "PlcUDT", "PlcTagTable"]: print(f"INFO: El bloque es {block_type}. Saltando procesamiento lógico de x2.") print( f"Guardando JSON de {block_type} (con metadatos) en: {output_json_filepath}" ) try: # <-- MODIFICADO: Asegurar que los metadatos se guarden aunque se salte --> data_to_save = copy.deepcopy(data) # Copiar datos originales if source_xml_mod_time is not None: data_to_save["source_xml_mod_time"] = source_xml_mod_time if source_xml_size is not None: data_to_save["source_xml_size"] = source_xml_size # <-- FIN MODIFICADO --> with open(output_json_filepath, "w", encoding="utf-8") as f_out: json.dump(data_to_save, f_out, indent=4, ensure_ascii=False) print(f"Guardado de {block_type} completado.") return True except Exception as e: print(f"Error Crítico al guardar JSON de {block_type}: {e}") traceback.print_exc(file=sys.stderr) return False print(f"INFO: El bloque es {block_type}. Iniciando procesamiento lógico...") script_dir = os.path.dirname(__file__) processors_dir_path = os.path.join(script_dir, "processors") processor_map, sorted_processors = load_processors(processors_dir_path) if not processor_map: print("Error crítico: No se cargaron procesadores. Abortando.", file=sys.stderr) return False # (Mapas de acceso y bucle iterativo SIN CAMBIOS relevantes, solo pasan 'data' que ya tiene metadatos) network_access_maps = {} for network in data.get("networks", []): net_id = network["id"] current_access_map = {} for instr in network.get("logic", []): for _, source in instr.get("inputs", {}).items(): sources_to_check = ( source if isinstance(source, list) else ([source] if isinstance(source, dict) else []) ) for src in sources_to_check: if ( isinstance(src, dict) and src.get("uid") and src.get("type") in ["variable", "constant"] ): current_access_map[src["uid"]] = src for _, dest_list in instr.get("outputs", {}).items(): if isinstance(dest_list, list): for dest in dest_list: if ( isinstance(dest, dict) and dest.get("uid") and dest.get("type") in ["variable", "constant"] ): current_access_map[dest["uid"]] = dest network_access_maps[net_id] = current_access_map symbol_manager = SymbolManager() sympy_map = {} max_passes = 30 passes = 0 processing_complete = False print(f"\n--- Iniciando Bucle de Procesamiento Iterativo ({block_type}) ---") while passes < max_passes and not processing_complete: passes += 1 made_change_in_base_pass = False made_change_in_group_pass = False print(f"\n--- Pase {passes} ---") num_sympy_processed_this_pass = 0 num_grouped_this_pass = 0 print(f" Fase 1 (SymPy Base - Orden por Prioridad):") num_sympy_processed_this_pass = 0 for processor_info in sorted_processors: current_type_name = processor_info["type_name"] func_to_call = processor_info["func"] for network in data.get("networks", []): network_id = network["id"] network_lang = network.get("language", "LAD") if network_lang == "STL": continue access_map = network_access_maps.get(network_id, {}) network_logic = network.get("logic", []) for instruction in network_logic: instr_uid = instruction.get("instruction_uid") instr_type_current = instruction.get("type", "Unknown") if ( instr_type_current.endswith(SCL_SUFFIX) or "_error" in instr_type_current or instruction.get("grouped", False) or instr_type_current in [ "RAW_STL_CHUNK", "RAW_SCL_CHUNK", "UNSUPPORTED_LANG", "UNSUPPORTED_CONTENT", "PARSING_ERROR", ] ): continue lookup_key = instr_type_current.lower() effective_type_name = lookup_key if instr_type_current == "Call": call_block_type = instruction.get("block_type", "").upper() if call_block_type == "FC": effective_type_name = "call_fc" elif call_block_type == "FB": effective_type_name = "call_fb" if effective_type_name == current_type_name: try: changed = func_to_call( instruction, network_id, sympy_map, symbol_manager, data ) # data se pasa aquí if changed: made_change_in_base_pass = True num_sympy_processed_this_pass += 1 except Exception as e: print( f"ERROR(SymPy Base) al procesar {instr_type_current} UID {instr_uid}: {e}" ) traceback.print_exc() instruction["scl"] = ( f"// ERROR en SymPy procesador base: {e}" ) instruction["type"] = instr_type_current + "_error" made_change_in_base_pass = True print( f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy." ) if made_change_in_base_pass or passes == 1: print(f" Fase 2 (Agrupación IF con Simplificación):") num_grouped_this_pass = 0 for network in data.get("networks", []): network_id = network["id"] network_lang = network.get("language", "LAD") if network_lang == "STL": continue network_logic = network.get("logic", []) uids_in_network = sorted( [ instr.get("instruction_uid", "Z") for instr in network_logic if instr.get("instruction_uid") ] ) for uid_to_process in uids_in_network: instruction = next( ( instr for instr in network_logic if instr.get("instruction_uid") == uid_to_process ), None, ) if not instruction: continue if instruction.get("grouped") or "_error" in instruction.get( "type", "" ): continue if instruction.get("type", "").endswith(SCL_SUFFIX): try: group_changed = process_group_ifs( instruction, network_id, sympy_map, symbol_manager, data ) # data se pasa aquí if group_changed: made_change_in_group_pass = True num_grouped_this_pass += 1 except Exception as e: print( f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}" ) traceback.print_exc() print( f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL)." ) if not made_change_in_base_pass and not made_change_in_group_pass: print( f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---" ) processing_complete = True else: print( f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando..." ) if passes == max_passes and not processing_complete: print(f"\n--- ADVERTENCIA: Límite de {max_passes} pases alcanzado...") # --- Verificación Final y Guardado JSON --- print(f"\n--- Verificación Final de Instrucciones No Procesadas ({block_type}) ---") unprocessed_count = 0 unprocessed_details = [] ignored_types = [ "raw_scl_chunk", "unsupported_lang", "raw_stl_chunk", "unsupported_content", "parsing_error", ] for network in data.get("networks", []): network_id = network.get("id", "Unknown ID") network_title = network.get("title", f"Network {network_id}") network_lang = network.get("language", "LAD") if network_lang == "STL": continue for instruction in network.get("logic", []): instr_uid = instruction.get("instruction_uid", "Unknown UID") instr_type = instruction.get("type", "Unknown Type") is_grouped = instruction.get("grouped", False) if ( not instr_type.endswith(SCL_SUFFIX) and "_error" not in instr_type and not is_grouped and instr_type.lower() not in ignored_types ): unprocessed_count += 1 unprocessed_details.append( f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), Instrucción UID: {instr_uid}, Tipo: '{instr_type}'" ) if unprocessed_count > 0: print( f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:" ) [print(detail) for detail in unprocessed_details] else: print( "INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas." ) # <-- MODIFICADO: Asegurar que los metadatos se añadan al 'data' final antes de guardar --> if source_xml_mod_time is not None: data["source_xml_mod_time"] = source_xml_mod_time if source_xml_size is not None: data["source_xml_size"] = source_xml_size # <-- FIN MODIFICADO --> print(f"\nGuardando JSON procesado ({block_type}) en: {output_json_filepath}") try: with open(output_json_filepath, "w", encoding="utf-8") as f: json.dump( data, f, indent=4, ensure_ascii=False ) # Guardar el diccionario 'data' modificado print("Guardado completado.") return True except Exception as e: print(f"Error Crítico al guardar JSON procesado: {e}", file=sys.stderr) traceback.print_exc(file=sys.stderr) return False # --- Ejecución (MODIFICADO) --- if __name__ == "__main__": # Lógica para ejecución standalone try: import tkinter as tk from tkinter import filedialog except ImportError: print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr) tk = None input_json_file = "" if tk: root = tk.Tk() root.withdraw() print("Por favor, selecciona el archivo JSON de entrada (generado por x1)...") input_json_file = filedialog.askopenfilename( title="Selecciona el archivo JSON de entrada (.json)", filetypes=[("JSON files", "*.json"), ("All files", "*.*")] ) root.destroy() if not input_json_file: print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr) else: print(f"Archivo JSON de entrada seleccionado: {input_json_file}") # Calcular ruta de salida JSON procesado json_filename_base = os.path.splitext(os.path.basename(input_json_file))[0] # Asumimos que el _processed.json va al mismo directorio 'parsing' parsing_dir = os.path.dirname(input_json_file) output_json_file = os.path.join(parsing_dir, f"{json_filename_base}_processed.json") # Asegurarse de que el directorio de salida exista (aunque debería si el input existe) os.makedirs(parsing_dir, exist_ok=True) print( f"(x2 - Standalone) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'" ) try: success = process_json_to_scl(input_json_file, output_json_file) if success: print("\nProcesamiento completado exitosamente.") else: print(f"\nError durante el procesamiento de '{os.path.relpath(input_json_file)}'.", file=sys.stderr) # sys.exit(1) # No usar sys.exit except Exception as e: print( f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}", file=sys.stderr, ) traceback.print_exc(file=sys.stderr) # sys.exit(1) # No usar sys.exit