print("=== SCRIPT FILE ACCESSED BY PYTHON ===") #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Convertidor simplificado de LAD TwinCAT a pseudocódigo estructurado Versión mejorada con SymPy para optimización de expresiones lógicas Version con dos pasadas: 1) Recopilar funciones 2) Convertir con interfaz conocida """ import re import sympy import os import sys import glob from sympy import symbols, And, Or, Not, simplify from sympy.logic.boolalg import to_dnf # Configurar el path al directorio raíz del proyecto script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) # Importar la función de configuración from backend.script_utils import load_configuration class FunctionInfo: """Información sobre una función extraída""" def __init__(self, name, return_type="BOOL"): self.name = name self.return_type = return_type self.inputs = [] # Lista de (nombre, tipo) self.outputs = [] # Lista de (nombre, tipo) self.source_file = "" self.function_type = "FUNCTION" # FUNCTION, FUNCTION_BLOCK, etc. def add_input(self, name, data_type): """Agregar parámetro de entrada""" self.inputs.append((name.strip(), data_type.strip())) def add_output(self, name, data_type): """Agregar parámetro de salida""" self.outputs.append((name.strip(), data_type.strip())) def get_call_signature(self): """Obtener signatura para llamada a función""" input_params = [name for name, _ in self.inputs] return f"{self.name}({', '.join(input_params)})" def __str__(self): inputs_str = ", ".join([f"{name}: {dtype}" for name, dtype in self.inputs]) outputs_str = ", ".join([f"{name}: {dtype}" for name, dtype in self.outputs]) return f"{self.function_type} {self.name}: {self.return_type} | IN: [{inputs_str}] | OUT: [{outputs_str}]" class FunctionRegistry: """Registro de funciones disponibles""" def __init__(self): self.functions = {} # nombre -> FunctionInfo self.function_blocks = {} # nombre -> FunctionInfo def add_function(self, func_info): """Agregar función al registro""" if func_info.function_type == "FUNCTION": self.functions[func_info.name] = func_info elif func_info.function_type == "FUNCTION_BLOCK": self.function_blocks[func_info.name] = func_info else: self.functions[func_info.name] = func_info def get_function(self, name): """Obtener información de función por nombre""" return self.functions.get(name, None) def get_function_block(self, name): """Obtener información de function block por nombre""" return self.function_blocks.get(name, None) def has_function(self, name): """Verificar si existe la función""" return name in self.functions def has_function_block(self, name): """Verificar si existe el function block""" return name in self.function_blocks def get_all_functions(self): """Obtener todas las funciones registradas""" return list(self.functions.values()) def get_all_function_blocks(self): """Obtener todos los function blocks registrados""" return list(self.function_blocks.values()) def print_summary(self): """Imprimir resumen del registro""" print(f"=== REGISTRO DE FUNCIONES ===") print(f"Funciones: {len(self.functions)}") for name, func_info in self.functions.items(): print(f" {func_info}") print(f"Function Blocks: {len(self.function_blocks)}") for name, func_info in self.function_blocks.items(): print(f" {func_info}") print() class SymbolManager: """Gestor de símbolos para SymPy""" def __init__(self): self._symbol_cache = {} def get_symbol(self, name): """Obtener o crear símbolo SymPy""" if name not in self._symbol_cache: # Limpiar nombre para SymPy (sin espacios, caracteres especiales) clean_name = re.sub(r"[^a-zA-Z0-9_]", "_", name) self._symbol_cache[name] = symbols(clean_name) return self._symbol_cache[name] class SimpleLadConverter: """Convertidor simplificado de LAD a código estructurado""" def __init__(self, function_registry=None): self.networks = [] self.current_network_id = 0 self.symbol_manager = SymbolManager() self.sympy_expressions = {} # Mapeo de network_id -> sympy expression self.function_registry = function_registry or FunctionRegistry() # Nuevas propiedades para estructura SCL completa self.program_name = "" self.program_type = ( "PROGRAM" # PROGRAM, FUNCTION, FUNCTION_BLOCK, TYPE, VAR_GLOBAL_LIST ) self.program_path = "" self.var_sections = {} # VAR, VAR_INPUT, VAR_OUTPUT, etc. self.actions = {} # Diccionario de ACTIONs self.st_main_code = None # Código ST del programa principal self.type_content = None # Contenido completo de TYPE def parse_file(self, file_path): """Parse el archivo LAD completo incluyendo variables y ACTIONs""" with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() # Guardar contenido original para extraer información adicional self._original_content = content # Extraer información del header self._parse_header_info(content) # Extraer declaraciones de variables self._parse_variable_declarations(content) # Verificar si es un TYPE (solo datos) o VAR_GLOBAL_LIST if hasattr(self, "program_type") and self.program_type == "TYPE": print("Archivo TYPE detectado - Extrayendo declaraciones") self._extract_type_content(content) elif hasattr(self, "program_type") and self.program_type == "VAR_GLOBAL_LIST": print( "Archivo de variables globales detectado - Extrayendo todas las secciones" ) self._extract_global_variables(content) else: # Encontrar sección LAD lad_start = content.find("_LD_BODY") if lad_start != -1: # Extraer contenido LAD hasta ACTION o END_PROGRAM action_start = content.find("\nACTION", lad_start) end_program = content.find("\nEND_PROGRAM", lad_start) lad_end = action_start if action_start != -1 else end_program if lad_end == -1: lad_end = len(content) lad_content = content[lad_start:lad_end] lines = lad_content.split("\n") self._parse_networks(lines) else: print("No se encontró _LD_BODY - Asumiendo código ST") # Extraer código ST del programa principal self._extract_st_code(content) # Extraer ACTIONs self._parse_actions(content) def parse_function_interface(self, file_path): """Parsear solo la interfaz de una función (primera pasada)""" with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() # Verificar si es una función func_match = re.search( r"(FUNCTION|FUNCTION_BLOCK)\s+(\w+)\s*:\s*(\w+)", content ) if not func_match: return None func_type = func_match.group(1) func_name = func_match.group(2) return_type = func_match.group(3) # Crear objeto FunctionInfo func_info = FunctionInfo(func_name, return_type) func_info.function_type = func_type func_info.source_file = os.path.basename(file_path) # Extraer parámetros de entrada var_input_match = re.search(r"VAR_INPUT(.*?)END_VAR", content, re.DOTALL) if var_input_match: input_section = var_input_match.group(1) for line in input_section.split("\n"): line = line.strip() if line and ":" in line and not line.startswith("(*"): # Limpiar comentarios inline if "(*" in line: line = line[: line.find("(*")] # Parsear declaración de variable line = line.replace(";", "").strip() if ":" in line: parts = line.split(":") if len(parts) >= 2: var_name = parts[0].strip() var_type = parts[1].strip() func_info.add_input(var_name, var_type) # Extraer parámetros de salida var_output_match = re.search(r"VAR_OUTPUT(.*?)END_VAR", content, re.DOTALL) if var_output_match: output_section = var_output_match.group(1) for line in output_section.split("\n"): line = line.strip() if line and ":" in line and not line.startswith("(*"): # Limpiar comentarios inline if "(*" in line: line = line[: line.find("(*")] # Parsear declaración de variable line = line.replace(";", "").strip() if ":" in line: parts = line.split(":") if len(parts) >= 2: var_name = parts[0].strip() var_type = parts[1].strip() func_info.add_output(var_name, var_type) print(f" Función encontrada: {func_info}") return func_info def _parse_header_info(self, content): """Extraer información del header del programa""" # Buscar PATH y nombre del programa path_match = re.search(r"\(\* @PATH := \'([^\']+)\' \*\)", content) if path_match: self.program_path = path_match.group(1) # Verificar si es un archivo de variables globales global_var_match = re.search( r"\(\*\s*@GLOBAL_VARIABLE_LIST\s*:=\s*(\w+)\s*\*\)", content ) if global_var_match: self.program_type = "VAR_GLOBAL_LIST" self.program_name = global_var_match.group(1) print(f"Archivo de variables globales detectado: {self.program_name}") else: # Buscar nombre del programa/function_block/function/type program_match = re.search( r"(PROGRAM|FUNCTION_BLOCK|FUNCTION|TYPE)\s+(\w+)", content ) if program_match: self.program_type = program_match.group(1) self.program_name = program_match.group(2) print(f"Programa encontrado: {self.program_name}") if self.program_path: print(f"Path: {self.program_path}") def _parse_variable_declarations(self, content): """Extraer todas las declaraciones de variables""" # Buscar todas las secciones VAR var_patterns = [ (r"VAR_INPUT(.*?)END_VAR", "VAR_INPUT"), (r"VAR_OUTPUT(.*?)END_VAR", "VAR_OUTPUT"), (r"VAR_IN_OUT(.*?)END_VAR", "VAR_IN_OUT"), (r"VAR\s+(.*?)END_VAR", "VAR"), ] for pattern, var_type in var_patterns: matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE) if matches: variables = [] for match in matches: # Parsear cada línea de variable for line in match.split("\n"): line = line.strip() if line and not line.startswith("(*") and ":" in line: # Limpiar comentarios inline if "(*" in line: line = line[: line.find("(*")] variables.append(line.strip()) if variables: self.var_sections[var_type] = variables print(f"Variables {var_type}: {len(variables)} encontradas") def _parse_networks(self, lines): """Parse todas las redes""" i = 0 expected_networks = 0 # Buscar cuántas redes se esperan for line in lines: if line.strip().startswith("_NETWORKS :"): expected_networks = int(line.strip().split(":")[1].strip()) print(f"Se esperan {expected_networks} redes según el archivo") break # LÍMITE DE SEGURIDAD: No procesar más de 50 redes para evitar colgarse if expected_networks > 50: print( f"⚠️ ADVERTENCIA: {expected_networks} redes es demasiado. Limitando a 50 redes para evitar colgarse." ) expected_networks = 50 while i < len(lines): if lines[i].strip() == "_NETWORK": self.current_network_id += 1 print(f"Procesando red {self.current_network_id}...") print(f" Comenzando en línea {i}: '{lines[i].strip()}'") # Mostrar las próximas líneas para debug print(f" Próximas líneas después de _NETWORK:") for j in range(i + 1, min(i + 10, len(lines))): print(f" Línea {j}: '{lines[j].strip()}'") i = self._parse_network(lines, i) else: i += 1 if len(self.networks) != expected_networks: print( f"ADVERTENCIA: Se esperaban {expected_networks} redes pero solo se parsearon {len(self.networks)}" ) print("Esto puede indicar redes con _EMPTY o estructuras no reconocidas") # DEBUG: Mostrar todas las líneas _NETWORK encontradas print("DEBUG: Buscando todas las instancias de _NETWORK...") for idx, line in enumerate(lines): if line.strip() == "_NETWORK": print(f" Línea {idx}: _NETWORK") if idx + 1 < len(lines): print(f" Siguiente línea: '{lines[idx + 1].strip()}'") if idx + 2 < len(lines): print(f" Línea +2: '{lines[idx + 2].strip()}'") print() def _parse_network(self, lines, start_idx): """Parse una red individual con soporte mejorado para operadores LAD""" network = { "id": self.current_network_id, "comment": "", "logic": None, "target": "", "calls": [], # NUEVO: para almacenar llamadas a FB/FUN "function_blocks": [], } i = start_idx + 1 # Parse comentario if i < len(lines) and lines[i].strip() == "_COMMENT": i, comment = self._parse_comment(lines, i) network["comment"] = comment # Parse contenido de la red while i < len(lines): line = lines[i].strip() if line == "_NETWORK": break elif line == "_LD_ASSIGN": i += 1 print(f" 🔍 Procesando _LD_ASSIGN en línea {i}") # Parsear la lógica LAD después de _LD_ASSIGN i, logic = self._parse_lad_expression(lines, i) network["logic"] = logic print(f" 📋 Lógica parseada: {logic}") # Después de la lógica, buscar una lista de ejecución (ENABLELIST) # Continuar buscando ENABLELIST desde donde terminó el parser de lógica while i < len(lines): line_content = lines[i].strip() print(f" 🔎 Buscando ENABLELIST en línea {i}: '{line_content}'") if line_content.startswith("ENABLELIST"): print(f" ✅ Encontrado ENABLELIST en línea {i}") i, calls = self._parse_enable_list(lines, i) network["calls"] = calls print( f" ✅ Red {network['id']}: Encontradas {len(calls)} llamadas en ENABLELIST" ) break elif line_content == "_NETWORK": print(f" 🔚 Fin de red encontrado en línea {i}") break elif line_content.startswith("_OUTPUTS"): print(f" 📤 Sección de outputs encontrada en línea {i}") break else: # Continuar buscando i += 1 elif line.startswith("_OUTPUT"): # Buscar variable de salida i += 1 while i < len(lines) and lines[i].strip().startswith("_"): i += 1 if i < len(lines) and lines[i].strip() and "ENABLELIST" not in lines[i]: network["target"] = lines[i].strip() i += 1 else: i += 1 self.networks.append(network) print(f" Red {network['id']} agregada. Total redes: {len(self.networks)}") if network["logic"]: print( f" Con lógica: {network['logic'].get('type')} - {network['logic'].get('name', 'Sin nombre')}" ) if network["calls"]: print(f" Con {len(network['calls'])} llamadas de ejecución.") # Mostrar detalles de las llamadas for call in network["calls"]: if call["name"] and self.function_registry.has_function(call["name"]): print(f" 🎯 Función reconocida: {call['name']}") else: print(f" 📋 Llamada: {call['type']} - {call['name']}") print(f" Target: '{network['target']}'") return i def _parse_lad_expression(self, lines, start_idx): """Parse una expresión LAD recursivamente""" i = start_idx while i < len(lines): line = lines[i].strip() if line == "_LD_AND": return self._parse_and_expression(lines, i + 1) elif line == "_LD_OR": return self._parse_or_expression(lines, i + 1) elif line == "_LD_CONTACT": return self._parse_contact(lines, i + 1) elif line.startswith("_FUNCTIONBLOCK"): return self._parse_function_block(lines, i) elif line == "_EMPTY": # Red vacía que puede contener funciones después print(f" Encontrada _EMPTY dentro de _LD_ASSIGN en línea {i}") i += 1 # Buscar funciones en las siguientes líneas i, empty_logic = self._parse_empty_network(lines, i) return i, empty_logic elif ( line.startswith("_OUTPUT") or line == "ENABLELIST : 0" or line.startswith("ENABLELIST :") ): break else: i += 1 return i, None def _parse_and_expression(self, lines, start_idx): """Parse una expresión AND""" i = start_idx operands = [] # Buscar operadores if i < len(lines) and lines[i].strip().startswith("_LD_OPERATOR"): # Extraer número de operandos operator_line = lines[i].strip() num_operands = ( int(operator_line.split(":")[-1].strip()) if ":" in operator_line else 2 ) i += 1 # Parse cada operando for _ in range(num_operands): i, operand = self._parse_lad_expression(lines, i) if operand: operands.append(operand) return i, {"type": "AND", "operands": operands} def _parse_or_expression(self, lines, start_idx): """Parse una expresión OR""" i = start_idx operands = [] # Buscar operadores if i < len(lines) and lines[i].strip().startswith("_LD_OPERATOR"): # Extraer número de operandos operator_line = lines[i].strip() num_operands = ( int(operator_line.split(":")[-1].strip()) if ":" in operator_line else 2 ) i += 1 # Parse cada operando for _ in range(num_operands): i, operand = self._parse_lad_expression(lines, i) if operand: operands.append(operand) return i, {"type": "OR", "operands": operands} def _parse_contact(self, lines, start_idx): """Parse un contacto LAD""" i = start_idx contact_name = "" negated = False # Obtener nombre del contacto if i < len(lines): contact_name = lines[i].strip() i += 1 # Verificar si hay expresión if i < len(lines) and lines[i].strip() == "_EXPRESSION": i += 1 # Verificar si está negado if i < len(lines): if lines[i].strip() == "_NEGATIV": negated = True i += 1 elif lines[i].strip() == "_POSITIV": i += 1 return i, {"type": "CONTACT", "name": contact_name, "negated": negated} def _parse_function_block(self, lines, start_idx): """Parse un bloque de función o llamada a ACTION""" i = start_idx + 1 fb_name = "" inputs = [] action_call = None max_iterations = 50 # Protección contra bucles infinitos iterations = 0 if i < len(lines): fb_name = lines[i].strip() i += 1 # PATRÓN IDENTIFICADO: ??? indica llamada a ACTION if fb_name == "???": print( f" Detectado patrón ??? en línea {i-1} - Buscando ACTION call..." ) # Buscar el nombre completo de la ACTION más adelante action_call = self._find_action_call_name(lines, i) if action_call: print(f" ✓ ACTION call encontrada: {action_call}") return i, {"type": "ACTION_CALL", "name": action_call, "inputs": []} else: print(f" ⚠ Patrón ??? pero no se encontró nombre de ACTION") else: print(f" Detectado Function Block directo: {fb_name}") # Parse inputs del function block normal while ( i < len(lines) and not lines[i].strip().startswith("_OUTPUT") and iterations < max_iterations ): line = lines[i].strip() iterations += 1 if line.startswith("_OPERAND"): i += 2 # Saltar _EXPRESSION if i < len(lines): inputs.append(lines[i].strip()) i += 1 elif line and not line.startswith("_"): # Para function blocks normales, recopilar inputs # Para ACTION calls perdidas, último intento if "." in line and fb_name == "???" and action_call is None: action_call = line print(f" ✓ ACTION call encontrada (fallback): {action_call}") return i + 1, { "type": "ACTION_CALL", "name": action_call, "inputs": inputs, } elif line.startswith("ENABLELIST"): # Fin del bloque break else: i += 1 # Salida de emergencia del bucle if iterations >= max_iterations: print( f" ADVERTENCIA: Bucle infinito evitado en function block en línea {start_idx}" ) break return i, { "type": "FUNCTION_BLOCK", "name": fb_name, "instance_name": fb_name, # Usar el mismo nombre como instancia "inputs": inputs, "outputs": [], } def _find_action_call_name(self, lines, start_idx): """Buscar el nombre completo de la ACTION después de ???""" i = start_idx print(f" Buscando ACTION name desde línea {start_idx}...") while i < len(lines) and i < start_idx + 15: # Buscar en las próximas 15 líneas line = lines[i].strip() print(f" Línea {i}: '{line}'") # Buscar patrón namespace.actionname (ej: _Filling_Head_PID_Ctrl.Read_Analog) if ( "." in line and "ENABLELIST" not in line and "EXPRESSION" not in line and "BOX_EXPR" not in line and "ENABLED" not in line and "OUTPUTS" not in line and "NO_SET" not in line and "OUTPUT" not in line and line != "_POSITIV" and line != "_NEGATIV" and line != "_NO_SET" ): print(f" ✓ ACTION name encontrado: {line}") return line i += 1 print( f" ⚠ No se encontró ACTION name en {start_idx + 15 - start_idx} líneas" ) return None def _parse_comment(self, lines, start_idx): """Parse comentario""" i = start_idx + 1 comment_lines = [] while i < len(lines): line = lines[i].strip() if line == "_END_COMMENT": break if line and not line.startswith("_"): comment_lines.append(line) i += 1 return i + 1, " ".join(comment_lines) def _parse_enable_list(self, lines, start_idx): """Parsear una lista de ejecución ENABLELIST""" i = start_idx calls = [] # Buscar línea ENABLELIST while i < len(lines) and not lines[i].strip().startswith("ENABLELIST"): i += 1 if i < len(lines) and lines[i].strip().startswith("ENABLELIST"): enable_line = lines[i].strip() num_enabled = ( int(enable_line.split(":")[1].strip()) if ":" in enable_line else 0 ) print(f" 📋 Procesando ENABLELIST con {num_enabled} llamadas") i += 1 # Moverse a la siguiente línea después de ENABLELIST for call_idx in range(num_enabled): if i >= len(lines): break print( f" 🔍 Buscando llamada {call_idx+1}/{num_enabled} en línea {i}" ) # Buscar el próximo _ASSIGN assign_found = False while i < len(lines) and not assign_found: line_content = lines[i].strip() print(f" 🔎 Línea {i}: '{line_content}'") if line_content == "_ASSIGN": assign_found = True i += 1 print(f" ✅ Encontrado _ASSIGN en línea {i-1}") i, call = self._parse_execution_block(lines, i) if call: calls.append(call) print( f" ✅ Llamada parseada: {call['type']} - {call['name']}" ) else: print(f" ❌ No se pudo parsear la llamada") break elif line_content.startswith("ENABLELIST_END"): print(f" 🔚 Encontrado ENABLELIST_END prematuro") break else: i += 1 if not assign_found: print(f" ❌ No se encontró _ASSIGN para llamada {call_idx+1}") break # Avanzar hasta el final de la lista de habilitación while i < len(lines) and not lines[i].strip().startswith("ENABLELIST_END"): i += 1 if i < len(lines) and lines[i].strip() == "ENABLELIST_END": print(f" 🔚 Encontrado ENABLELIST_END en línea {i}") i += 1 print(f" 📊 Total de llamadas parseadas: {len(calls)}") return i, calls def _parse_execution_block(self, lines, start_idx): """Parsear un bloque de ejecución (_FUNCTION o _FUNCTIONBLOCK)""" i = start_idx call_data = { "type": None, # FUNCTION, FUNCTION_BLOCK, OPERATOR "instance_name": None, # Para FB "name": None, # Para FUNCTION o OPERATOR "inputs": [], "outputs": [], # Lista de variables de salida } # Encontrar el tipo de bloque (_FUNCTION, _FUNCTIONBLOCK, etc.) block_type_line_idx = -1 temp_i = i while temp_i < len(lines) and temp_i < i + 15: line = lines[temp_i].strip() if ( line.startswith("_FUNCTIONBLOCK") or line.startswith("_FUNCTION") or line.startswith("_OPERATOR") ): block_type_line_idx = temp_i break temp_i += 1 if block_type_line_idx == -1: return i, None i = block_type_line_idx line = lines[i].strip() if line.startswith("_FUNCTIONBLOCK"): call_data["type"] = "FUNCTION_BLOCK" i += 1 call_data["instance_name"] = lines[i].strip() elif line.startswith("_FUNCTION"): call_data["type"] = "FUNCTION" elif line.startswith("_OPERATOR"): call_data["type"] = "OPERATOR" i += 1 # Saltar _BOX_EXPR y _ENABLED while i < len(lines) and ( lines[i].strip().startswith("_BOX_EXPR") or lines[i].strip().startswith("_ENABLED") ): i += 1 # Parsear inputs (_OPERAND) while i < len(lines): line = lines[i].strip() if line.startswith("_OPERAND"): i += 1 # Saltar _EXPRESSION y polaridad si existen if i < len(lines) and lines[i].strip() == "_EXPRESSION": i += 2 # Salta _EXPRESSION y _POSITIV/_NEGATIV if i < len(lines): call_data["inputs"].append(lines[i].strip()) i += 1 else: break elif line.startswith("ENABLELIST"): # Algunos FBs anidados i += 1 else: break # El nombre de la función/bloque viene después de los operandos name_found = False while i < len(lines) and not name_found: line = lines[i].strip() if ( line and not line.startswith("_") and not line.startswith("ENABLELIST") and "FB41_PID" not in line # Excepción para PID ): call_data["name"] = line name_found = True # Verificar si es una función conocida en el registry if call_data[ "type" ] == "FUNCTION" and self.function_registry.has_function(line): print(f" ✓ Función conocida detectada: {line}") elif call_data[ "type" ] == "FUNCTION_BLOCK" and self.function_registry.has_function_block( line ): print(f" ✓ Function Block conocido detectado: {line}") elif line.startswith("_OUTPUTS"): break i += 1 # Si el nombre no se encontró (caso de FBs como PID), usar la instancia if not call_data["name"] and call_data["type"] == "FUNCTION_BLOCK": # El nombre del tipo de FB está después de los outputs en algunos casos pass # Parsear outputs while i < len(lines): line = lines[i].strip() if line.startswith("_OUTPUTS"): i += 1 # Saltar línea _OUTPUTS # Bucle para cada _OUTPUT while i < len(lines) and lines[i].strip().startswith("_OUTPUT"): i += 1 # Saltar línea _OUTPUT # Saltar polaridad y _NO_SET while i < len(lines) and lines[i].strip() in [ "_POSITIV", "_NEGATIV", "_NO_SET", ]: i += 1 if i < len(lines): output_var = lines[i].strip() if output_var != "_EMPTY": call_data["outputs"].append(output_var) i += 1 else: break elif line.startswith("ENABLELIST_END"): # Parar aquí - fin del ENABLELIST break elif line == "_ASSIGN": # Parar aquí - otra llamada en el ENABLELIST print( f" 🔚 Encontrado siguiente _ASSIGN en línea {i}, terminando parsing de llamada actual" ) break else: # El nombre del tipo de FB puede estar aquí if call_data["type"] == "FUNCTION_BLOCK" and not call_data["name"]: if line and not line.startswith("_"): call_data["name"] = line # Verificar si es un FB conocido if self.function_registry.has_function_block(line): print(f" ✓ Function Block conocido detectado: {line}") i += 1 return i, call_data def _parse_empty_network(self, lines, start_idx): """Parse una red que empieza con _EMPTY y puede contener funciones""" i = start_idx max_iterations = 20 iterations = 0 function_found = None target_found = None print(f" Entrando a _parse_empty_network desde línea {start_idx}") while ( i < len(lines) and iterations < max_iterations and not lines[i].strip().startswith("_OUTPUT") and not lines[i].strip() == "_NETWORK" ): line = lines[i].strip() iterations += 1 print(f" Línea {i}: '{line}'") if line.startswith("_FUNCTIONBLOCK"): # Encontrada llamada a función/ACTION print(f" ENCONTRADO _FUNCTIONBLOCK en línea {i}") i, logic = self._parse_function_block(lines, i) function_found = logic elif line.startswith("_FUNCTION"): # Otra variante de función print(f" ENCONTRADO _FUNCTION en línea {i}") i += 1 # Buscar el nombre de la función while i < len(lines) and i < start_idx + 10: func_line = lines[i].strip() print(f" Buscando nombre función línea {i}: '{func_line}'") if ( func_line and not func_line.startswith("_") and "ENABLELIST" not in func_line ): print(f" ENCONTRADO nombre función: {func_line}") function_found = { "type": "FUNCTION_CALL", "name": func_line, "inputs": [], } break i += 1 elif line.startswith("ENABLELIST"): print(f" Encontrado ENABLELIST, continuando búsqueda...") # No terminar aquí, continuar buscando _ASSIGN i += 1 elif line.startswith("_ASSIGN"): print(f" ENCONTRADO _ASSIGN en línea {i}") # Buscar función dentro de _ASSIGN i += 1 i, assign_logic = self._parse_assign_section(lines, i) if assign_logic: function_found = assign_logic elif line.startswith("_OUTPUT"): # Buscar el target después de _OUTPUT print(f" ENCONTRADO _OUTPUT, buscando target...") i += 1 while i < len(lines) and lines[i].strip().startswith("_"): i += 1 if i < len(lines) and lines[i].strip() and "ENABLELIST" not in lines[i]: target_found = lines[i].strip() print(f" ENCONTRADO target: {target_found}") break else: i += 1 # Si encontramos una función, crear red independiente if function_found and target_found: print( f" Creando red independiente para función con target: {target_found}" ) self._create_function_network(function_found, target_found) return i, function_found elif function_found: print(f" Función encontrada pero sin target específico") # Crear target por defecto basado en el tipo de función if function_found["type"] == "ACTION_CALL": default_target = "mDummy" # Variable dummy típica para ACTION calls elif function_found["type"] == "FUNCTION_CALL": # Para funciones como ProductLiterInTank, usar la variable global correspondiente if function_found["name"] == "gProductTankLevel": default_target = ( "gTankProdAmount" # Variable que se asigna según el contexto ) else: default_target = "mDummy" else: # Caso por defecto para otros tipos de función (FUNCTION_BLOCK, etc.) default_target = "mDummy" print(f" Usando target por defecto: {default_target}") self._create_function_network(function_found, default_target) return i, function_found print(f" _parse_empty_network terminó sin encontrar función") return i, None def _parse_assign_section(self, lines, start_idx): """Parse una sección _ASSIGN que puede contener funciones""" i = start_idx max_iterations = 15 iterations = 0 print(f" Entrando a _parse_assign_section desde línea {start_idx}") while ( i < len(lines) and iterations < max_iterations and not lines[i].strip() == "_NETWORK" and not lines[i].strip() == "ENABLELIST_END" ): line = lines[i].strip() iterations += 1 print(f" Línea {i}: '{line}'") if line.startswith("_FUNCTIONBLOCK"): print(f" ENCONTRADO _FUNCTIONBLOCK en _ASSIGN: línea {i}") i, logic = self._parse_function_block(lines, i) return i, logic elif line.startswith("_FUNCTION"): print(f" ENCONTRADO _FUNCTION en _ASSIGN: línea {i}") # Usar la misma lógica que _parse_execution_block i, call_info = self._parse_execution_block(lines, i) if call_info: return i, call_info else: i += 1 elif line == "ENABLELIST_END": print(f" Encontrado ENABLELIST_END, terminando") break else: i += 1 print(f" _parse_assign_section terminó sin encontrar función") return i, None def _create_function_network(self, function_logic, target_name): """Crear una red independiente para una función o ACTION call""" self.current_network_id += 1 function_network = { "id": self.current_network_id, "comment": f'Llamada a función: {function_logic.get("name", "unknown")}', "logic": function_logic, "target": target_name, "function_blocks": [], "calls": [function_logic], # ✅ Añadir la llamada al array calls } self.networks.append(function_network) print( f" Red de función {function_network['id']} creada para {function_logic['type']}: {function_logic.get('name', 'Sin nombre')}" ) print(f" Target: '{target_name}'") return function_network def convert_to_structured(self): """Convertir a código SCL completo con variables y ACTIONs""" output = [] # Header del archivo output.append("(* Código SCL generado desde LAD TwinCAT *)") output.append("(* Convertidor mejorado con SymPy - Estructura DNF preferida *)") if self.program_path: output.append(f"(* Path original: {self.program_path} *)") output.append("") # Verificar si es un TYPE o VAR_GLOBAL_LIST - manejarlos diferente program_type = self.program_type if hasattr(self, "program_type") else "PROGRAM" if program_type == "TYPE": return self._convert_type_to_scl(output) elif program_type == "VAR_GLOBAL_LIST": return self._convert_global_vars_to_scl(output) # Declaración del programa/función program_name = self.program_name if self.program_name else "ConvertedProgram" # Para FUNCTIONs, necesitamos detectar el tipo de retorno if program_type == "FUNCTION": # Buscar tipo de retorno en el código original (ej: FUNCTION ArrayToReal : REAL) return_type = self._extract_function_return_type() if return_type: output.append(f"FUNCTION {program_name} : {return_type}") else: output.append(f"FUNCTION {program_name} : BOOL") # Tipo por defecto else: output.append(f"{program_type} {program_name}") # Declaraciones de variables self._add_variable_sections(output) # Inicio del código principal output.append("") output.append("(* === CÓDIGO PRINCIPAL === *)") output.append("") # Código principal - LAD o ST if self.networks: # Hay redes LAD - generar código desde redes output.append(" (* Código LAD convertido *)") for network in self.networks: output.append(f" // Red {network['id']}") if network["comment"]: output.append(f" // {network['comment']}") # Procesar lógica si existe condition_str = "" if network["logic"]: # Usar expresión DNF optimizada si está disponible if network["id"] in self.sympy_expressions: sympy_expr = self.sympy_expressions[network["id"]] condition_str = self._format_dnf_for_lad(sympy_expr) else: # Fallback al método original condition_str = self._convert_logic_to_string(network["logic"]) # Asegurar que el campo 'calls' existe antes de accederlo network_calls = network.get("calls", []) # ✅ PROCESAR LLAMADAS INDEPENDIENTEMENTE DE LA LÓGICA if network_calls: print( f" 🔧 GENERANDO CÓDIGO - Red {network['id']} tiene {len(network_calls)} llamadas" ) # Si hay condición, generar IF if condition_str: if "\n" in condition_str: output.append(f" IF {condition_str} THEN") else: output.append(f" IF {condition_str} THEN") # Generar las llamadas a función/FB print( f" 🔧 Generando {len(network_calls)} llamadas para Red {network['id']}" ) for call in network_calls: print(f" 🔧 Procesando: {call['type']} - {call['name']}") call_str = self._convert_call_to_string( call, " " if condition_str else " " ) print(f" 🔧 Código generado: {call_str}") output.append(call_str) # Si había condición, cerrar IF if condition_str: output.append(" END_IF;") elif network["target"]: # Red sin llamadas pero con target if condition_str: output.append(f" IF {condition_str} THEN") output.append(f" {network['target']} := TRUE;") output.append(" ELSE") output.append(f" {network['target']} := FALSE;") output.append(" END_IF;") else: output.append( f" {network['target']} := TRUE; // Sin condición" ) output.append("") elif self.st_main_code: # Hay código ST - copiarlo directamente preservando indentación output.append(" (* Código ST original *)") for line in self.st_main_code.split("\n"): if line.strip(): # Preservar indentación original pero agregar indentación base SCL output.append(f" {line}") else: output.append("") else: # No hay código principal output.append(" (* Sin código principal detectado *)") output.append("") # Agregar ACTIONs como subfunciones self._add_actions_as_procedures(output, program_name) # Fin del programa principal program_type = self.program_type if hasattr(self, "program_type") else "PROGRAM" output.append(f"END_{program_type}") return "\n".join(output) def _add_variable_sections(self, output): """Agregar todas las secciones de variables al código""" # Orden preferido de las secciones section_order = ["VAR_INPUT", "VAR_OUTPUT", "VAR_IN_OUT", "VAR"] for section_type in section_order: if section_type in self.var_sections: output.append(f"{section_type}") for var_line in self.var_sections[section_type]: # Asegurar que termine con punto y coma if not var_line.endswith(";"): var_line += " ;" output.append(f" {var_line}") output.append("END_VAR") output.append("") def _convert_call_to_string(self, call, indent=""): """Convertir una llamada de ejecución a string SCL""" if not call: return "" if call["type"] == "FUNCTION": # Buscar información de la función en el registry func_info = self.function_registry.get_function(call["name"]) if func_info: # Usar información del registry para generar llamada correcta # Mapear inputs disponibles a parámetros de la función param_assignments = [] for i, (param_name, param_type) in enumerate(func_info.inputs): if i < len(call["inputs"]): param_assignments.append(f"{param_name} := {call['inputs'][i]}") inputs_str = ", ".join(param_assignments) output_var = call["outputs"][0] if call["outputs"] else "mDummy" print( f" 🎯 Generando llamada a función con interfaz conocida: {call['name']}" ) print(f" Parámetros: {inputs_str}") print(f" Salida: {output_var}") # Generar llamada con parámetros nombrados return f"{indent}{output_var} := {call['name']}({inputs_str});" else: # Fallback al método original si no hay información inputs_str = ", ".join(call["inputs"]) output_var = call["outputs"][0] if call["outputs"] else "mDummy" return f"{indent}{output_var} := {call['name']}({inputs_str}); // Sin info de interfaz" elif call["type"] == "FUNCTION_BLOCK": # Buscar información del FB en el registry fb_info = self.function_registry.get_function_block(call["name"]) instance_name = call["instance_name"] if fb_info: # Usar información del registry para generar llamada correcta param_assignments = [] for i, (param_name, param_type) in enumerate(fb_info.inputs): if i < len(call["inputs"]): param_assignments.append(f"{param_name} := {call['inputs'][i]}") inputs_str = ", ".join(param_assignments) fb_call = f"{indent}{instance_name}({inputs_str});" # Generar asignaciones de salida usando información del registry if call["outputs"] and fb_info.outputs: for i, output_var in enumerate(call["outputs"]): if i < len(fb_info.outputs): output_param_name = fb_info.outputs[i][0] fb_call += f"\n{indent}{output_var} := {instance_name}.{output_param_name};" return fb_call else: # Fallback al método original inputs_str = ", ".join(call["inputs"]) fb_call = ( f"{indent}{instance_name}({inputs_str}); // Sin info de interfaz" ) if call["outputs"]: output_var = call["outputs"][0] # Heurística: el nombre de la salida suele ser 'FilterOut' o similar output_pin_name = "FilterOut" # Suposición basada en LowPassFilter if "stat" in instance_name.lower(): output_pin_name = ( "MeanValue" # Suposición para StatisticalAnalisys ) fb_call += ( f"\n{indent}{output_var} := {instance_name}.{output_pin_name};" ) return fb_call elif call["type"] == "OPERATOR": # Formato: output := Operator(input1, input2, ...) inputs_str = ", ".join(call["inputs"]) output_var = call["outputs"][0] if call["outputs"] else "mDummy" return f"{indent}{output_var} := {call['name']}({inputs_str});" return f"{indent}(* Llamada no reconocida: {call['type']} - {call['name']} *)" def _add_actions_as_procedures(self, output, program_name): """Agregar ACTIONs como procedimientos independientes""" if not self.actions: return output.append("") output.append("(* === SUBFUNCIONES (ACTIONs convertidas) === *)") output.append("") for action_name, action_data in self.actions.items(): # Nombre completo como se usa en TwinCAT (sin doble underscore) full_name = f"{program_name}_{action_name}" output.append(f"PROCEDURE {full_name}") output.append("(* Convertida desde ACTION *)") output.append("") if isinstance(action_data, dict) and action_data["type"] == "LAD": # ACTION con código LAD - convertir a SCL output.append(" (* Código LAD convertido a SCL *)") for network in action_data["networks"]: # Asegurar que el campo 'calls' existe if "calls" not in network: network["calls"] = [] output.append(f" // Red {network['id']}") if network["comment"]: output.append(f" // {network['comment']}") # Generar código para la red (condición + llamadas) condition_str = self._convert_logic_to_string(network["logic"]) # Asegurar que el campo 'calls' existe antes de accederlo network_calls = network.get("calls", []) if condition_str or (not network["logic"] and network_calls): if condition_str: if "\n" in condition_str: output.append(f" IF {condition_str} THEN") else: output.append(f" IF {condition_str} THEN") indent = " " if condition_str else " " if network_calls: for call in network_calls: call_str = self._convert_call_to_string(call, indent) output.append(call_str) elif network["target"]: output.append(f"{indent}{network['target']} := TRUE;") if condition_str: if not network_calls and network["target"]: output.append(" ELSE") output.append(f" {network['target']} := FALSE;") output.append(" END_IF;") elif network["target"]: output.append( f" {network['target']} := TRUE; // Sin condición, solo target" ) output.append("") elif isinstance(action_data, dict) and action_data["type"] == "ST": # ACTION con código ST - copiar directamente preservando indentación output.append(" (* Código ST original *)") for line in action_data["code"].split("\n"): if line.strip(): # Preservar indentación original pero agregar indentación base SCL output.append(f" {line}") else: output.append("") else: # Compatibilidad hacia atrás - código directo (legacy) output.append(" (* Código original - modo compatibilidad *)") for line in str(action_data).split("\n"): if line.strip(): # Preservar indentación original pero agregar indentación base SCL output.append(f" {line}") else: output.append("") output.append("") output.append("END_PROCEDURE") output.append("") def _convert_logic_to_string(self, logic): """Convertir lógica LAD a string estructurado""" if not logic: return "" if logic["type"] == "CONTACT": if logic["negated"]: return f"NOT {logic['name']}" else: return logic["name"] elif logic["type"] == "AND": operand_strings = [] for operand in logic["operands"]: operand_str = self._convert_logic_to_string(operand) if operand_str: operand_strings.append(operand_str) if len(operand_strings) > 1: return "(" + " AND ".join(operand_strings) + ")" elif len(operand_strings) == 1: return operand_strings[0] else: return "" elif logic["type"] == "OR": operand_strings = [] for operand in logic["operands"]: operand_str = self._convert_logic_to_string(operand) if operand_str: operand_strings.append(operand_str) if len(operand_strings) > 1: return "(" + " OR ".join(operand_strings) + ")" elif len(operand_strings) == 1: return operand_strings[0] else: return "" elif logic["type"] == "FUNCTION_BLOCK": # Buscar información del FB en el registry fb_info = self.function_registry.get_function_block(logic["name"]) if fb_info: # Generar llamada con parámetros conocidos param_assignments = [] for i, (param_name, param_type) in enumerate(fb_info.inputs): if i < len(logic["inputs"]): param_assignments.append( f"{param_name} := {logic['inputs'][i]}" ) inputs_str = ", ".join(param_assignments) return f"{logic['name']}({inputs_str})" else: # Fallback al método original inputs_str = ", ".join(logic["inputs"]) if logic["inputs"] else "" return f"{logic['name']}({inputs_str})" elif logic["type"] == "ACTION_CALL": return logic["name"] # Solo devolver el nombre, no CALL aquí elif logic["type"] == "FUNCTION_CALL": # Buscar información de la función en el registry func_info = self.function_registry.get_function(logic["name"]) if func_info: # Generar llamada con parámetros conocidos param_assignments = [] for i, (param_name, param_type) in enumerate(func_info.inputs): if i < len(logic.get("inputs", [])): param_assignments.append( f"{param_name} := {logic['inputs'][i]}" ) inputs_str = ", ".join(param_assignments) return f"{logic['name']}({inputs_str})" else: # Fallback al método original return f"{logic['name']}()" return "" def save_to_file(self, output_path): """Guardar código estructurado""" structured_code = self.convert_to_structured() with open(output_path, "w", encoding="utf-8") as f: f.write(structured_code) return structured_code def print_debug_info(self): """Mostrar información de debug sobre los networks parseados""" print(f"\n=== DEBUG INFO - {len(self.networks)} networks encontrados ===") for network in self.networks: print(f"\nRed {network['id']}:") if network["comment"]: print(f" Comentario: {network['comment']}") print(f" Target: {network['target']}") if network["logic"]: print(f" Lógica: {self._debug_logic_string(network['logic'])}") condition_str = self._convert_logic_to_string(network["logic"]) print(f" Condición: {condition_str}") else: print(" Sin lógica") # ¡AGREGAR DEBUGGING PARA LLAMADAS! if "calls" in network and network["calls"]: print(f" ✅ Llamadas: {len(network['calls'])}") for i, call in enumerate(network["calls"]): print(f" {i+1}. {call['type']}: {call['name']}") if call["inputs"]: print(f" Inputs: {call['inputs']}") if call.get("outputs"): print(f" Outputs: {call['outputs']}") else: print(" ❌ Sin llamadas") def _debug_logic_string(self, logic, indent=0): """Crear string de debug para la lógica""" if not logic: return "None" prefix = " " * indent if logic["type"] == "CONTACT": neg_str = " (NEGADO)" if logic["negated"] else "" return f"{prefix}CONTACT: {logic['name']}{neg_str}" elif logic["type"] == "AND": result = f"{prefix}AND:\n" for operand in logic["operands"]: result += self._debug_logic_string(operand, indent + 1) + "\n" return result.rstrip() elif logic["type"] == "OR": result = f"{prefix}OR:\n" for operand in logic["operands"]: result += self._debug_logic_string(operand, indent + 1) + "\n" return result.rstrip() elif logic["type"] == "FUNCTION_BLOCK": return f"{prefix}FUNCTION_BLOCK: {logic['name']} inputs: {logic['inputs']}" elif logic["type"] == "ACTION_CALL": return f"{prefix}ACTION_CALL: {logic['name']}" elif logic["type"] == "FUNCTION_CALL": return f"{prefix}FUNCTION_CALL: {logic['name']}" return f"{prefix}UNKNOWN: {logic}" def _logic_to_sympy(self, logic): """Convertir lógica LAD a expresión SymPy""" if not logic: return None if logic["type"] == "CONTACT": symbol = self.symbol_manager.get_symbol(logic["name"]) if logic["negated"]: return Not(symbol) else: return symbol elif logic["type"] == "AND": sympy_operands = [] for operand in logic["operands"]: operand_sympy = self._logic_to_sympy(operand) if operand_sympy is not None: sympy_operands.append(operand_sympy) if len(sympy_operands) > 1: return And(*sympy_operands) elif len(sympy_operands) == 1: return sympy_operands[0] else: return None elif logic["type"] == "OR": sympy_operands = [] for operand in logic["operands"]: operand_sympy = self._logic_to_sympy(operand) if operand_sympy is not None: sympy_operands.append(operand_sympy) if len(sympy_operands) > 1: return Or(*sympy_operands) elif len(sympy_operands) == 1: return sympy_operands[0] else: return None elif logic["type"] == "FUNCTION_BLOCK": # Para function blocks, creamos un símbolo especial fb_name = f"{logic['name']}({', '.join(logic['inputs'])})" return self.symbol_manager.get_symbol(fb_name) elif logic["type"] == "ACTION_CALL": # Para llamadas a ACTION, creamos un símbolo especial action_name = f"CALL_{logic['name'].replace('.', '_')}" return self.symbol_manager.get_symbol(action_name) return None def _sympy_to_structured_string(self, sympy_expr): """Convertir expresión SymPy a string estructurado""" if sympy_expr is None: return "" # Crear un mapeo inverso de símbolos a nombres originales symbol_to_name = {} for original_name, symbol in self.symbol_manager._symbol_cache.items(): symbol_to_name[str(symbol)] = original_name # Convertir la expresión a string str_expr = str(sympy_expr) # Reemplazar símbolos por nombres originales for symbol_str, original_name in symbol_to_name.items(): str_expr = str_expr.replace(symbol_str, original_name) # Reemplazar operadores SymPy por operadores IEC61131-3 str_expr = str_expr.replace("&", " AND ") str_expr = str_expr.replace("|", " OR ") str_expr = str_expr.replace("~", "NOT ") # Limpiar espacios múltiples str_expr = re.sub(r"\s+", " ", str_expr) return str_expr.strip() def optimize_expressions(self): """Optimizar todas las expresiones usando SymPy - Preferir DNF para LAD""" print("\n=== Optimizando expresiones con SymPy (forzando DNF para LAD) ===") for network in self.networks: if network["logic"]: print(f"\nOptimizando Red {network['id']}:") # Convertir a SymPy sympy_expr = self._logic_to_sympy(network["logic"]) if sympy_expr: print(f" Expresión original: {sympy_expr}") # Simplificar primero try: simplified = simplify(sympy_expr) print(f" Simplificada: {simplified}") # Verificar complejidad antes de DNF num_symbols = len(simplified.free_symbols) complexity = self._estimate_expression_complexity(simplified) if num_symbols > 12 or complexity > 800: print( f" ADVERTENCIA: Expresión muy compleja ({num_symbols} símbolos, complejidad {complexity})" ) print( f" Saltando conversión DNF por rendimiento - usando simplificación básica" ) final_expr = simplified else: # SIEMPRE convertir a DNF para LAD (forma natural: (AND) OR (AND) OR (AND)) dnf_expr = to_dnf(simplified) print(f" DNF (forma LAD preferida): {dnf_expr}") # Post-procesar para eliminar contradicciones final_expr = self._post_process_expression(dnf_expr) # Verificar si el post-procesamiento cambió algo if str(final_expr) != str(dnf_expr): print(f" Post-procesada: {final_expr}") self.sympy_expressions[network["id"]] = final_expr except Exception as e: print(f" Error optimizando: {e}") self.sympy_expressions[network["id"]] = sympy_expr def group_common_conditions(self): """Agrupar networks con condiciones similares o complementarias""" print("\n=== Analizando agrupación de condiciones ===") # Buscar networks que podrían agruparse groupable_networks = [] for network in self.networks: if ( network["logic"] and network["target"] and network["id"] in self.sympy_expressions ): groupable_networks.append(network) if len(groupable_networks) < 2: print("No hay suficientes networks para agrupar") return # Analizar condiciones comunes print(f"Analizando {len(groupable_networks)} networks para agrupación:") for i, net1 in enumerate(groupable_networks): for j, net2 in enumerate(groupable_networks[i + 1 :], i + 1): expr1 = self.sympy_expressions[net1["id"]] expr2 = self.sympy_expressions[net2["id"]] # Buscar factores comunes try: # Extraer factores comunes si es posible common_factors = self._find_common_factors(expr1, expr2) if common_factors: print( f" Red {net1['id']} y Red {net2['id']} comparten: {common_factors}" ) # Verificar si son complementarias (útil para SET/RESET) if self._are_complementary(expr1, expr2): print( f" Red {net1['id']} y Red {net2['id']} son complementarias" ) except Exception as e: print( f" Error analizando Red {net1['id']} y Red {net2['id']}: {e}" ) def _find_common_factors(self, expr1, expr2): """Encontrar factores comunes entre dos expresiones""" try: # Convertir a conjuntos de símbolos para análisis básico symbols1 = expr1.free_symbols symbols2 = expr2.free_symbols common_symbols = symbols1.intersection(symbols2) if len(common_symbols) > 1: return f"{len(common_symbols)} símbolos comunes" return None except: return None def _are_complementary(self, expr1, expr2): """Verificar si dos expresiones son complementarias""" try: # Verificar si expr1 == NOT(expr2) simplificado complement = Not(expr2) simplified_complement = simplify(complement) simplified_expr1 = simplify(expr1) return simplified_expr1.equals(simplified_complement) except: return False def _estimate_expression_complexity(self, expr): """Estimar la complejidad de una expresión para evitar explosión exponencial""" try: # Contar operadores AND/OR anidados complexity = 0 expr_str = str(expr) # Contar número de operadores and_count = expr_str.count("&") or_count = expr_str.count("|") not_count = expr_str.count("~") complexity += and_count * 2 # AND complexity += or_count * 4 # OR (más costoso en DNF) complexity += not_count * 1 # NOT # Penalizar anidamiento profundo depth = expr_str.count("(") complexity += depth * 6 # Número de símbolos únicos num_symbols = len(expr.free_symbols) complexity += num_symbols * 15 # Detectar patrones problemáticos específicos # CNF con muchos términos (patrón típico que causa explosión) if and_count > 10 and or_count > 15: complexity += 2000 # Penalización severa para CNF complejas # Expresiones con muchos términos negativos (difíciles para DNF) if not_count > 8: complexity += 500 # Longitud total como indicador de complejidad if len(expr_str) > 1000: complexity += len(expr_str) // 2 return complexity except: return 999999 # Asumir muy complejo si hay error def _post_process_expression(self, expr): """Post-procesar expresión para eliminar contradicciones obvias""" try: # Detectar contradicciones como X & ~X que deberían ser False cleaned_expr = expr # Aplicar simplificaciones adicionales cleaned_expr = simplify(cleaned_expr) # Si la expresión contiene contradicciones obvias, intentar limpiar free_symbols = cleaned_expr.free_symbols for symbol in free_symbols: # Verificar si tenemos symbol & ~symbol en alguna parte contradiction = And(symbol, Not(symbol)) if cleaned_expr.has(contradiction): print( f" Detectada contradicción eliminable: {symbol} AND NOT {symbol}" ) # Reemplazar contradicción por False cleaned_expr = cleaned_expr.replace(contradiction, sympy.false) cleaned_expr = simplify(cleaned_expr) return cleaned_expr except: return expr def _format_dnf_for_lad(self, sympy_expr): """Formatear expresión DNF para código LAD más legible""" if sympy_expr is None: return "" # Crear mapeo de símbolos a nombres originales symbol_to_name = {} for original_name, symbol in self.symbol_manager._symbol_cache.items(): symbol_to_name[str(symbol)] = original_name # Convertir a string y analizar la estructura str_expr = str(sympy_expr) # Reemplazar símbolos por nombres originales for symbol_str, original_name in symbol_to_name.items(): str_expr = str_expr.replace(symbol_str, original_name) # Reemplazar operadores SymPy por IEC61131-3 primero str_expr = str_expr.replace("&", " AND ") str_expr = str_expr.replace("|", " OR ") str_expr = str_expr.replace("~", "NOT ") # Limpiar espacios múltiples str_expr = re.sub(r"\s+", " ", str_expr).strip() # Si es una expresión OR de términos AND principales, formatear cada término # Buscar el patrón de OR principales (no anidados en paréntesis) if " OR " in str_expr and not str_expr.startswith("("): # Dividir por OR de nivel principal # Esto es más complejo debido a paréntesis anidados parts = self._split_main_or_terms(str_expr) if len(parts) > 1: formatted_terms = [] for part in parts: part = part.strip() # Asegurar que cada término tenga paréntesis si es complejo if " AND " in part and not ( part.startswith("(") and part.endswith(")") ): part = f"({part})" formatted_terms.append(part) # Unir con OR y saltos de línea para mejor legibilidad return "\n OR ".join(formatted_terms) return str_expr def _split_main_or_terms(self, expr): """Dividir expresión por OR de nivel principal, respetando paréntesis""" parts = [] current_part = "" paren_level = 0 i = 0 while i < len(expr): char = expr[i] if char == "(": paren_level += 1 current_part += char elif char == ")": paren_level -= 1 current_part += char elif paren_level == 0 and expr[i : i + 4] == " OR ": # OR de nivel principal encontrado parts.append(current_part.strip()) current_part = "" i += 3 # Saltar ' OR ' else: current_part += char i += 1 # Agregar la última parte if current_part.strip(): parts.append(current_part.strip()) return parts if len(parts) > 1 else [expr] def _parse_actions(self, content): """Extraer todas las ACTIONs del programa""" # Buscar patrón ACTION nombre: ... END_ACTION action_pattern = r"ACTION\s+(\w+)\s*:(.*?)END_ACTION" action_matches = re.findall(action_pattern, content, re.DOTALL | re.IGNORECASE) for action_name, action_code in action_matches: # Limpiar el código de la ACTION clean_code = action_code.strip() # Verificar si es código LAD o ST if "_LD_BODY" in clean_code: # Es código LAD - necesita parsing print( f"ACTION LAD encontrada: {action_name} ({len(clean_code)} caracteres)" ) self.actions[action_name] = self._parse_action_lad( action_name, clean_code ) else: # Es código ST - copiar directamente print( f"ACTION ST encontrada: {action_name} ({len(clean_code)} caracteres)" ) self.actions[action_name] = {"type": "ST", "code": clean_code} print(f"Total ACTIONs: {len(self.actions)}") def _parse_action_lad(self, action_name, action_content): """Parsear una ACTION que contiene código LAD""" # Crear un convertidor temporal para esta ACTION action_converter = SimpleLadConverter() action_converter.symbol_manager = self.symbol_manager # Compartir símbolos # Encontrar sección LAD lad_start = action_content.find("_LD_BODY") if lad_start != -1: # Extraer contenido LAD hasta el final lad_content = action_content[lad_start:] lines = lad_content.split("\n") action_converter._parse_networks(lines) return { "type": "LAD", "networks": action_converter.networks, "raw_content": action_content, } def _extract_st_code(self, content): """Extraer código ST del programa principal""" # Buscar desde después de END_VAR hasta ACTION o END_PROGRAM var_end = content.rfind("END_VAR") if var_end == -1: return # Buscar el final del código principal action_start = content.find("\nACTION", var_end) end_program = content.find("\nEND_PROGRAM", var_end) code_end = action_start if action_start != -1 else end_program if code_end == -1: code_end = len(content) # Extraer código ST st_code = content[var_end + 7 : code_end].strip() # +7 para saltar "END_VAR" if st_code: # Almacenar código ST para usar en la generación self.st_main_code = st_code print(f"Código ST principal extraído: {len(st_code)} caracteres") else: self.st_main_code = None def _extract_function_return_type(self): """Extraer el tipo de retorno de una FUNCTION desde el header""" if not hasattr(self, "_original_content"): return None # Buscar patrón FUNCTION nombre : TIPO match = re.search(r"FUNCTION\s+\w+\s*:\s*(\w+)", self._original_content) if match: return match.group(1) return None def _extract_type_content(self, content): """Extraer el contenido completo de un TYPE""" # Buscar desde TYPE hasta END_TYPE type_start = content.find("TYPE") if type_start == -1: return # Buscar END_TYPE type_end = content.find("END_TYPE", type_start) if type_end == -1: type_end = len(content) else: type_end += len("END_TYPE") # Extraer contenido del TYPE type_content = content[type_start:type_end].strip() if type_content: self.type_content = type_content print(f"Contenido TYPE extraído: {len(type_content)} caracteres") else: self.type_content = None def _convert_type_to_scl(self, output): """Convertir TYPE a SCL - simplemente copiar el contenido original""" if self.type_content: # Copiar directamente el contenido del TYPE preservando formato for line in self.type_content.split("\n"): if line.strip(): output.append(line) else: output.append("") else: # Si no hay contenido TYPE, generar estructura básica program_name = self.program_name if self.program_name else "UnknownType" output.append(f"TYPE {program_name}:") output.append("STRUCT") output.append(" (* Contenido TYPE no detectado *)") output.append(" Dummy : INT;") output.append("END_STRUCT") output.append("END_TYPE") return "\n".join(output) def _extract_global_variables(self, content): """Extraer todas las secciones de variables globales""" # Buscar todas las secciones VAR_GLOBAL var_global_patterns = [ (r"VAR_GLOBAL\s+PERSISTENT(.*?)END_VAR", "VAR_GLOBAL PERSISTENT"), (r"VAR_GLOBAL((?:(?!PERSISTENT)(?!END_VAR).)*?)END_VAR", "VAR_GLOBAL"), ] for pattern, var_type in var_global_patterns: matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE) if matches: variables = [] for match in matches: # Parsear cada línea de variable for line in match.split("\n"): line = line.strip() if ( line and not line.startswith("(*") and not line.startswith("(************") and ":" in line and not line.startswith("*") ): # Limpiar comentarios inline if "(*" in line: # Mantener el comentario inline pass # No limpiar comentarios para preservar información variables.append(line.strip()) if variables: if var_type not in self.var_sections: self.var_sections[var_type] = [] self.var_sections[var_type].extend(variables) # Mostrar resumen for var_type, variables in self.var_sections.items(): print(f"Variables {var_type}: {len(variables)} encontradas") def _convert_global_vars_to_scl(self, output): """Convertir archivo de variables globales a SCL""" # Para archivos de variables globales, simplemente copiar las declaraciones # Agregar secciones de variables encontradas section_order = ["VAR_GLOBAL", "VAR_GLOBAL PERSISTENT"] for section_type in section_order: if section_type in self.var_sections: output.append(f"{section_type}") for var_line in self.var_sections[section_type]: # Asegurar que termine con punto y coma if not var_line.endswith(";"): var_line += " ;" output.append(f" {var_line}") output.append("END_VAR") output.append("") # Si no se encontraron variables, agregar estructura básica if not self.var_sections: output.append("VAR_GLOBAL") output.append(" (* No se detectaron variables globales *)") output.append(" Dummy : INT ;") output.append("END_VAR") return "\n".join(output) def is_function_only_file(file_path): """Verificar si un archivo contiene solo una función (sin código LAD)""" try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() # Buscar patrón de función func_match = re.search( r"(FUNCTION|FUNCTION_BLOCK)\s+(\w+)\s*:\s*(\w+)", content ) if not func_match: return False # Verificar que NO tenga código LAD has_lad = "_LD_BODY" in content # Verificar que tenga principalmente declaraciones de variables has_vars = "VAR_INPUT" in content or "VAR_OUTPUT" in content or "VAR" in content # Es un archivo de función si tiene función, variables, pero no LAD return func_match and has_vars and not has_lad except: return False def collect_function_interfaces(input_directory, debug_mode=False): """Primera pasada: recopilar información de todas las funciones""" print("=== PRIMERA PASADA: RECOPILANDO INTERFACES DE FUNCIONES ===") function_registry = FunctionRegistry() # Obtener todos los archivos .EXP exp_pattern = os.path.join(input_directory, "*.EXP") exp_files = glob.glob(exp_pattern) function_files = [] for exp_file in exp_files: filename = os.path.basename(exp_file) # Verificar si es un archivo de función if is_function_only_file(exp_file): function_files.append(exp_file) print(f" Analizando función: {filename}") # Crear convertidor temporal solo para extraer interfaz temp_converter = SimpleLadConverter() func_info = temp_converter.parse_function_interface(exp_file) if func_info: function_registry.add_function(func_info) else: if debug_mode: print(f" Saltando (no es función pura): {filename}") print(f"\nFunciones encontradas: {len(function_registry.functions)}") print(f"Function Blocks encontrados: {len(function_registry.function_blocks)}") if debug_mode: function_registry.print_summary() return function_registry, function_files def main(): """Función principal - Convierte todos los archivos .EXP a .SCL con dos pasadas""" try: import time timestamp = time.strftime("%Y-%m-%d %H:%M:%S") print("=== SCRIPT VERIFICADO Y EJECUTÁNDOSE CORRECTAMENTE ===") print(f"🕒 Timestamp: {timestamp}") print("=== INICIANDO CONVERTIDOR SCRIPT (2 PASADAS) ===") # Verificar si se pasó un archivo específico como parámetro para debug debug_file = None if len(sys.argv) > 1: debug_file = sys.argv[1] print(f"=== MODO DEBUG: Procesando archivo específico ===") print(f"Archivo: {debug_file}") print("🔧 VERIFICACIÓN: Script modificado y ejecutándose en tiempo real") else: print("=== Convertidor Masivo LAD a SCL con SymPy (2 Pasadas) ===") # Cargar configuración configs = load_configuration() # Verificar que se cargó correctamente if not configs: print( "Advertencia: No se pudo cargar la configuración, usando valores por defecto" ) working_directory = "./" scl_output_dir = "scl" debug_mode = True show_optimizations = True show_generated_code = False max_display_lines = 50 force_regenerate = False level1_config = {} level2_config = {} level3_config = {} else: # Obtener configuraciones working_directory = configs.get("working_directory", "./") level1_config = configs.get("level1", {}) level2_config = configs.get("level2", {}) level3_config = configs.get("level3", {}) # Parámetros de configuración debug_mode = level1_config.get("debug_mode", True) show_optimizations = level1_config.get("show_optimizations", True) scl_output_dir = level2_config.get("scl_output_dir", "scl") backup_existing = level2_config.get("backup_existing", True) show_generated_code = level2_config.get("show_generated_code", False) max_display_lines = level2_config.get("max_display_lines", 50) sympy_optimization = level3_config.get("sympy_optimization", True) group_analysis = level3_config.get("group_analysis", True) force_regenerate = level2_config.get( "force_regenerate", False ) # Nueva opción # Directorio de entrada para archivos .EXP # En modo debug, usar el directorio actual si no hay configuración específica if debug_file: # Modo debug: usar directorio actual donde están los archivos EXP input_directory = os.path.dirname(os.path.abspath(__file__)) # Buscar directorio ExportTwinCat si existe export_dir = os.path.join(input_directory, "../ExportTwinCat") if os.path.exists(export_dir): input_directory = os.path.abspath(export_dir) print(f"Detectado directorio ExportTwinCat: {input_directory}") else: print(f"Usando directorio actual: {input_directory}") else: # Modo normal: usar configuración input_directory = level3_config.get( "twincat_exp_directory", working_directory ) # Verificar directorio de trabajo if not os.path.exists(working_directory): print(f"Error: El directorio de trabajo no existe: {working_directory}") return # Verificar directorio de entrada if not os.path.exists(input_directory): print(f"Error: El directorio de entrada no existe: {input_directory}") return # Crear directorio de salida SCL full_scl_path = os.path.join(working_directory, scl_output_dir) if not os.path.exists(full_scl_path): os.makedirs(full_scl_path) print(f"Directorio creado: {full_scl_path}") # PRIMERA PASADA: Recopilar interfaces de funciones # IMPORTANTE: Siempre hacer primera pasada, incluso en modo debug print(f"Directorio para primera pasada: {input_directory}") function_registry, function_files = collect_function_interfaces( input_directory, debug_mode ) # Determinar archivos a procesar if debug_file: # Modo debug - archivo específico if not debug_file.endswith(".EXP"): debug_file += ".EXP" debug_file_path = os.path.join(input_directory, debug_file) if not os.path.exists(debug_file_path): print(f"Error: No se encontró el archivo {debug_file_path}") return exp_files = [debug_file_path] print(f"🔍 MODO DEBUG ACTIVADO") print(f"Procesando archivo específico: {debug_file}") print(f"Directorio de entrada: {input_directory}") print(f"Directorio de salida SCL: {full_scl_path}") print(f"Funciones en registry: {len(function_registry.functions)}") print( f"Function Blocks en registry: {len(function_registry.function_blocks)}" ) # En modo debug, forzar regeneración y mostrar más información force_regenerate = True debug_mode = True show_generated_code = True max_display_lines = 100 # Mostrar información detallada del registry en modo debug if function_registry.functions: print("\n🔧 FUNCIONES DETECTADAS:") for name, func_info in function_registry.functions.items(): print(f" ✓ {name}: {func_info.return_type}") if func_info.inputs: print(f" IN: {[f'{n}:{t}' for n, t in func_info.inputs]}") if func_info.outputs: print(f" OUT: {[f'{n}:{t}' for n, t in func_info.outputs]}") print() else: # Modo normal - todos los archivos exp_pattern = os.path.join(input_directory, "*.EXP") exp_files = glob.glob(exp_pattern) if not exp_files: print(f"No se encontraron archivos .EXP en: {input_directory}") return print(f"Encontrados {len(exp_files)} archivos .EXP en: {input_directory}") print(f"Directorio de salida SCL: {full_scl_path}") print() print("=== SEGUNDA PASADA: CONVERSIÓN CON INTERFACES CONOCIDAS ===") # Procesar cada archivo successful_conversions = 0 failed_conversions = 0 for exp_file in exp_files: filename = os.path.basename(exp_file) base_name = os.path.splitext(filename)[0] scl_filename = f"{base_name}.scl" scl_output_path = os.path.join(full_scl_path, scl_filename) # Verificar si ya existe el archivo SCL (exportación progresiva) if os.path.exists(scl_output_path) and not force_regenerate: print(f"{'='*60}") print(f"SALTANDO: {filename} - Ya existe {scl_filename}") print( f" (usa force_regenerate: true en configuración para forzar regeneración)" ) successful_conversions += 1 # Contar como exitoso continue print(f"{'='*60}") print(f"Procesando: {filename}") print(f"Salida: {scl_filename}") try: # Crear nuevo convertidor para cada archivo CON el registry de funciones converter = SimpleLadConverter(function_registry) # Parsear archivo converter.parse_file(exp_file) print(f" ✓ Redes encontradas: {len(converter.networks)}") print( f" ✓ Secciones de variables: {list(converter.var_sections.keys())}" ) print(f" ✓ ACTIONs encontradas: {list(converter.actions.keys())}") # Mostrar información de debug si está habilitado if debug_mode: converter.print_debug_info() # Optimizar expresiones con SymPy si está habilitado if sympy_optimization and show_optimizations: converter.optimize_expressions() # Analizar agrupación de condiciones si está habilitado if group_analysis and show_optimizations: converter.group_common_conditions() # Convertir y guardar print(f" Generando código SCL...") structured_code = converter.save_to_file(scl_output_path) # Mostrar parte del código generado si está habilitado if show_generated_code: lines = structured_code.split("\n") display_lines = min(max_display_lines, len(lines)) print( f" Código SCL generado ({len(lines)} líneas, mostrando {display_lines}):" ) for i in range(display_lines): print(f" {i+1:3d}: {lines[i]}") if len(lines) > display_lines: print(f" ... ({len(lines) - display_lines} líneas más)") print(f" ✓ Guardado en: {scl_output_path}") successful_conversions += 1 except Exception as e: print(f" ✗ Error procesando {filename}: {e}") if debug_mode: import traceback traceback.print_exc() failed_conversions += 1 print() # Resumen final print(f"{'='*60}") print(f"RESUMEN DE CONVERSIÓN:") print(f" 📋 Funciones registradas: {len(function_registry.functions)}") print( f" 📋 Function Blocks registrados: {len(function_registry.function_blocks)}" ) print(f" ✓ Exitosas: {successful_conversions}") print(f" ✗ Fallidas: {failed_conversions}") print(f" 📁 Directorio salida: {full_scl_path}") if successful_conversions > 0: print(f"\n✓ Conversión masiva completada!") except Exception as e: print(f"Error general: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()