ParamManagerScripts/backend/script_groups/TwinCat/x1_lad_converter.py

2382 lines
94 KiB
Python

print("=== SCRIPT FILE ACCESSED BY PYTHON ===")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Convertidor simplificado de LAD TwinCAT a pseudocódigo estructurado
Versión mejorada con SymPy para optimización de expresiones lógicas
Version con dos pasadas: 1) Recopilar funciones 2) Convertir con interfaz conocida
"""
import re
import sympy
import os
import sys
import glob
from sympy import symbols, And, Or, Not, simplify
from sympy.logic.boolalg import to_dnf
# Configurar el path al directorio raíz del proyecto
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
# Importar la función de configuración
from backend.script_utils import load_configuration
class FunctionInfo:
"""Información sobre una función extraída"""
def __init__(self, name, return_type="BOOL"):
self.name = name
self.return_type = return_type
self.inputs = [] # Lista de (nombre, tipo)
self.outputs = [] # Lista de (nombre, tipo)
self.source_file = ""
self.function_type = "FUNCTION" # FUNCTION, FUNCTION_BLOCK, etc.
def add_input(self, name, data_type):
"""Agregar parámetro de entrada"""
self.inputs.append((name.strip(), data_type.strip()))
def add_output(self, name, data_type):
"""Agregar parámetro de salida"""
self.outputs.append((name.strip(), data_type.strip()))
def get_call_signature(self):
"""Obtener signatura para llamada a función"""
input_params = [name for name, _ in self.inputs]
return f"{self.name}({', '.join(input_params)})"
def __str__(self):
inputs_str = ", ".join([f"{name}: {dtype}" for name, dtype in self.inputs])
outputs_str = ", ".join([f"{name}: {dtype}" for name, dtype in self.outputs])
return f"{self.function_type} {self.name}: {self.return_type} | IN: [{inputs_str}] | OUT: [{outputs_str}]"
class FunctionRegistry:
"""Registro de funciones disponibles"""
def __init__(self):
self.functions = {} # nombre -> FunctionInfo
self.function_blocks = {} # nombre -> FunctionInfo
def add_function(self, func_info):
"""Agregar función al registro"""
if func_info.function_type == "FUNCTION":
self.functions[func_info.name] = func_info
elif func_info.function_type == "FUNCTION_BLOCK":
self.function_blocks[func_info.name] = func_info
else:
self.functions[func_info.name] = func_info
def get_function(self, name):
"""Obtener información de función por nombre"""
return self.functions.get(name, None)
def get_function_block(self, name):
"""Obtener información de function block por nombre"""
return self.function_blocks.get(name, None)
def has_function(self, name):
"""Verificar si existe la función"""
return name in self.functions
def has_function_block(self, name):
"""Verificar si existe el function block"""
return name in self.function_blocks
def get_all_functions(self):
"""Obtener todas las funciones registradas"""
return list(self.functions.values())
def get_all_function_blocks(self):
"""Obtener todos los function blocks registrados"""
return list(self.function_blocks.values())
def print_summary(self):
"""Imprimir resumen del registro"""
print(f"=== REGISTRO DE FUNCIONES ===")
print(f"Funciones: {len(self.functions)}")
for name, func_info in self.functions.items():
print(f" {func_info}")
print(f"Function Blocks: {len(self.function_blocks)}")
for name, func_info in self.function_blocks.items():
print(f" {func_info}")
print()
class SymbolManager:
"""Gestor de símbolos para SymPy"""
def __init__(self):
self._symbol_cache = {}
def get_symbol(self, name):
"""Obtener o crear símbolo SymPy"""
if name not in self._symbol_cache:
# Limpiar nombre para SymPy (sin espacios, caracteres especiales)
clean_name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
self._symbol_cache[name] = symbols(clean_name)
return self._symbol_cache[name]
class SimpleLadConverter:
"""Convertidor simplificado de LAD a código estructurado"""
def __init__(self, function_registry=None):
self.networks = []
self.current_network_id = 0
self.symbol_manager = SymbolManager()
self.sympy_expressions = {} # Mapeo de network_id -> sympy expression
self.function_registry = function_registry or FunctionRegistry()
# Nuevas propiedades para estructura SCL completa
self.program_name = ""
self.program_type = (
"PROGRAM" # PROGRAM, FUNCTION, FUNCTION_BLOCK, TYPE, VAR_GLOBAL_LIST
)
self.program_path = ""
self.var_sections = {} # VAR, VAR_INPUT, VAR_OUTPUT, etc.
self.actions = {} # Diccionario de ACTIONs
self.st_main_code = None # Código ST del programa principal
self.type_content = None # Contenido completo de TYPE
def parse_file(self, file_path):
"""Parse el archivo LAD completo incluyendo variables y ACTIONs"""
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
# Guardar contenido original para extraer información adicional
self._original_content = content
# Extraer información del header
self._parse_header_info(content)
# Extraer declaraciones de variables
self._parse_variable_declarations(content)
# Verificar si es un TYPE (solo datos) o VAR_GLOBAL_LIST
if hasattr(self, "program_type") and self.program_type == "TYPE":
print("Archivo TYPE detectado - Extrayendo declaraciones")
self._extract_type_content(content)
elif hasattr(self, "program_type") and self.program_type == "VAR_GLOBAL_LIST":
print(
"Archivo de variables globales detectado - Extrayendo todas las secciones"
)
self._extract_global_variables(content)
else:
# Encontrar sección LAD
lad_start = content.find("_LD_BODY")
if lad_start != -1:
# Extraer contenido LAD hasta ACTION o END_PROGRAM
action_start = content.find("\nACTION", lad_start)
end_program = content.find("\nEND_PROGRAM", lad_start)
lad_end = action_start if action_start != -1 else end_program
if lad_end == -1:
lad_end = len(content)
lad_content = content[lad_start:lad_end]
lines = lad_content.split("\n")
self._parse_networks(lines)
else:
print("No se encontró _LD_BODY - Asumiendo código ST")
# Extraer código ST del programa principal
self._extract_st_code(content)
# Extraer ACTIONs
self._parse_actions(content)
def parse_function_interface(self, file_path):
"""Parsear solo la interfaz de una función (primera pasada)"""
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
# Verificar si es una función
func_match = re.search(
r"(FUNCTION|FUNCTION_BLOCK)\s+(\w+)\s*:\s*(\w+)", content
)
if not func_match:
return None
func_type = func_match.group(1)
func_name = func_match.group(2)
return_type = func_match.group(3)
# Crear objeto FunctionInfo
func_info = FunctionInfo(func_name, return_type)
func_info.function_type = func_type
func_info.source_file = os.path.basename(file_path)
# Extraer parámetros de entrada
var_input_match = re.search(r"VAR_INPUT(.*?)END_VAR", content, re.DOTALL)
if var_input_match:
input_section = var_input_match.group(1)
for line in input_section.split("\n"):
line = line.strip()
if line and ":" in line and not line.startswith("(*"):
# Limpiar comentarios inline
if "(*" in line:
line = line[: line.find("(*")]
# Parsear declaración de variable
line = line.replace(";", "").strip()
if ":" in line:
parts = line.split(":")
if len(parts) >= 2:
var_name = parts[0].strip()
var_type = parts[1].strip()
func_info.add_input(var_name, var_type)
# Extraer parámetros de salida
var_output_match = re.search(r"VAR_OUTPUT(.*?)END_VAR", content, re.DOTALL)
if var_output_match:
output_section = var_output_match.group(1)
for line in output_section.split("\n"):
line = line.strip()
if line and ":" in line and not line.startswith("(*"):
# Limpiar comentarios inline
if "(*" in line:
line = line[: line.find("(*")]
# Parsear declaración de variable
line = line.replace(";", "").strip()
if ":" in line:
parts = line.split(":")
if len(parts) >= 2:
var_name = parts[0].strip()
var_type = parts[1].strip()
func_info.add_output(var_name, var_type)
print(f" Función encontrada: {func_info}")
return func_info
def _parse_header_info(self, content):
"""Extraer información del header del programa"""
# Buscar PATH y nombre del programa
path_match = re.search(r"\(\* @PATH := \'([^\']+)\' \*\)", content)
if path_match:
self.program_path = path_match.group(1)
# Verificar si es un archivo de variables globales
global_var_match = re.search(
r"\(\*\s*@GLOBAL_VARIABLE_LIST\s*:=\s*(\w+)\s*\*\)", content
)
if global_var_match:
self.program_type = "VAR_GLOBAL_LIST"
self.program_name = global_var_match.group(1)
print(f"Archivo de variables globales detectado: {self.program_name}")
else:
# Buscar nombre del programa/function_block/function/type
program_match = re.search(
r"(PROGRAM|FUNCTION_BLOCK|FUNCTION|TYPE)\s+(\w+)", content
)
if program_match:
self.program_type = program_match.group(1)
self.program_name = program_match.group(2)
print(f"Programa encontrado: {self.program_name}")
if self.program_path:
print(f"Path: {self.program_path}")
def _parse_variable_declarations(self, content):
"""Extraer todas las declaraciones de variables"""
# Buscar todas las secciones VAR
var_patterns = [
(r"VAR_INPUT(.*?)END_VAR", "VAR_INPUT"),
(r"VAR_OUTPUT(.*?)END_VAR", "VAR_OUTPUT"),
(r"VAR_IN_OUT(.*?)END_VAR", "VAR_IN_OUT"),
(r"VAR\s+(.*?)END_VAR", "VAR"),
]
for pattern, var_type in var_patterns:
matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE)
if matches:
variables = []
for match in matches:
# Parsear cada línea de variable
for line in match.split("\n"):
line = line.strip()
if line and not line.startswith("(*") and ":" in line:
# Limpiar comentarios inline
if "(*" in line:
line = line[: line.find("(*")]
variables.append(line.strip())
if variables:
self.var_sections[var_type] = variables
print(f"Variables {var_type}: {len(variables)} encontradas")
def _parse_networks(self, lines):
"""Parse todas las redes"""
i = 0
expected_networks = 0
# Buscar cuántas redes se esperan
for line in lines:
if line.strip().startswith("_NETWORKS :"):
expected_networks = int(line.strip().split(":")[1].strip())
print(f"Se esperan {expected_networks} redes según el archivo")
break
# LÍMITE DE SEGURIDAD: No procesar más de 50 redes para evitar colgarse
if expected_networks > 50:
print(
f"⚠️ ADVERTENCIA: {expected_networks} redes es demasiado. Limitando a 50 redes para evitar colgarse."
)
expected_networks = 50
while i < len(lines):
if lines[i].strip() == "_NETWORK":
self.current_network_id += 1
print(f"Procesando red {self.current_network_id}...")
print(f" Comenzando en línea {i}: '{lines[i].strip()}'")
# Mostrar las próximas líneas para debug
print(f" Próximas líneas después de _NETWORK:")
for j in range(i + 1, min(i + 10, len(lines))):
print(f" Línea {j}: '{lines[j].strip()}'")
i = self._parse_network(lines, i)
else:
i += 1
if len(self.networks) != expected_networks:
print(
f"ADVERTENCIA: Se esperaban {expected_networks} redes pero solo se parsearon {len(self.networks)}"
)
print("Esto puede indicar redes con _EMPTY o estructuras no reconocidas")
# DEBUG: Mostrar todas las líneas _NETWORK encontradas
print("DEBUG: Buscando todas las instancias de _NETWORK...")
for idx, line in enumerate(lines):
if line.strip() == "_NETWORK":
print(f" Línea {idx}: _NETWORK")
if idx + 1 < len(lines):
print(f" Siguiente línea: '{lines[idx + 1].strip()}'")
if idx + 2 < len(lines):
print(f" Línea +2: '{lines[idx + 2].strip()}'")
print()
def _parse_network(self, lines, start_idx):
"""Parse una red individual con soporte mejorado para operadores LAD"""
network = {
"id": self.current_network_id,
"comment": "",
"logic": None,
"targets": [],
"calls": [], # NUEVO: para almacenar llamadas a FB/FUN
"function_blocks": [],
}
i = start_idx + 1
# Parse comentario
if i < len(lines) and lines[i].strip() == "_COMMENT":
i, comment = self._parse_comment(lines, i)
network["comment"] = comment
# Parse contenido de la red
while i < len(lines):
line = lines[i].strip()
if line == "_NETWORK":
break
elif line == "_LD_ASSIGN":
i += 1
print(f" 🔍 Procesando _LD_ASSIGN en línea {i}")
# Parsear la lógica LAD después de _LD_ASSIGN
i, logic = self._parse_lad_expression(lines, i)
network["logic"] = logic
print(f" 📋 Lógica parseada: {logic}")
# Después de la lógica, buscar una lista de ejecución (ENABLELIST)
# Continuar buscando ENABLELIST desde donde terminó el parser de lógica
while i < len(lines):
line_content = lines[i].strip()
print(f" 🔎 Buscando ENABLELIST en línea {i}: '{line_content}'")
if line_content.startswith("ENABLELIST"):
print(f" ✅ Encontrado ENABLELIST en línea {i}")
i, calls = self._parse_enable_list(lines, i)
network["calls"] = calls
print(
f" ✅ Red {network['id']}: Encontradas {len(calls)} llamadas en ENABLELIST"
)
break
elif line_content == "_NETWORK":
print(f" 🔚 Fin de red encontrado en línea {i}")
break
elif line_content.startswith("_OUTPUTS"):
print(f" 📤 Sección de outputs encontrada en línea {i}")
break
else:
# Continuar buscando
i += 1
elif line.startswith("_OUTPUTS"):
num_outputs = int(line.split(":")[1].strip())
i += 1
for _ in range(num_outputs):
while i < len(lines) and not lines[i].strip().startswith("_OUTPUT"):
i += 1
if i >= len(lines):
break
i += 1 # past _OUTPUT
while i < len(lines) and lines[i].strip().startswith("_"): # flags
i += 1
if i < len(lines):
var_name = lines[i].strip()
if var_name:
network["targets"].append(var_name)
i += 1
elif line.startswith("_OUTPUT"):
# Buscar variable de salida
i += 1
while i < len(lines) and lines[i].strip().startswith("_"):
i += 1
if i < len(lines) and lines[i].strip() and "ENABLELIST" not in lines[i]:
network["targets"].append(lines[i].strip())
i += 1
else:
i += 1
self.networks.append(network)
print(f" Red {network['id']} agregada. Total redes: {len(self.networks)}")
if network["logic"]:
print(
f" Con lógica: {network['logic'].get('type')} - {network['logic'].get('name', 'Sin nombre')}"
)
if network["calls"]:
print(f" Con {len(network['calls'])} llamadas de ejecución.")
# Mostrar detalles de las llamadas
for call in network["calls"]:
if call["name"] and self.function_registry.has_function(call["name"]):
print(f" 🎯 Función reconocida: {call['name']}")
else:
print(f" 📋 Llamada: {call['type']} - {call['name']}")
print(f" Targets: '{network['targets']}'")
return i
def _parse_lad_expression(self, lines, start_idx):
"""Parse una expresión LAD recursivamente"""
i = start_idx
while i < len(lines):
line = lines[i].strip()
if line == "_LD_AND":
return self._parse_and_expression(lines, i + 1)
elif line == "_LD_OR":
return self._parse_or_expression(lines, i + 1)
elif line == "_LD_CONTACT":
return self._parse_contact(lines, i + 1)
elif line.startswith("_FUNCTIONBLOCK"):
return self._parse_function_block(lines, i)
elif line == "_EMPTY":
# Red vacía que puede contener funciones después
print(f" Encontrada _EMPTY dentro de _LD_ASSIGN en línea {i}")
i += 1
# Buscar funciones en las siguientes líneas
i, empty_logic = self._parse_empty_network(lines, i)
return i, empty_logic
elif (
line.startswith("_OUTPUT")
or line == "ENABLELIST : 0"
or line.startswith("ENABLELIST :")
):
break
else:
i += 1
return i, None
def _parse_and_expression(self, lines, start_idx):
"""Parse una expresión AND"""
i = start_idx
operands = []
# Buscar operadores
if i < len(lines) and lines[i].strip().startswith("_LD_OPERATOR"):
# Extraer número de operandos
operator_line = lines[i].strip()
num_operands = (
int(operator_line.split(":")[-1].strip()) if ":" in operator_line else 2
)
i += 1
# Parse cada operando
for _ in range(num_operands):
i, operand = self._parse_lad_expression(lines, i)
if operand:
operands.append(operand)
return i, {"type": "AND", "operands": operands}
def _parse_or_expression(self, lines, start_idx):
"""Parse una expresión OR"""
i = start_idx
operands = []
# Buscar operadores
if i < len(lines) and lines[i].strip().startswith("_LD_OPERATOR"):
# Extraer número de operandos
operator_line = lines[i].strip()
num_operands = (
int(operator_line.split(":")[-1].strip()) if ":" in operator_line else 2
)
i += 1
# Parse cada operando
for _ in range(num_operands):
i, operand = self._parse_lad_expression(lines, i)
if operand:
operands.append(operand)
return i, {"type": "OR", "operands": operands}
def _parse_contact(self, lines, start_idx):
"""Parse un contacto LAD"""
i = start_idx
contact_name = ""
negated = False
# Obtener nombre del contacto
if i < len(lines):
contact_name = lines[i].strip()
i += 1
# Verificar si hay expresión
if i < len(lines) and lines[i].strip() == "_EXPRESSION":
i += 1
# Verificar si está negado
if i < len(lines):
if lines[i].strip() == "_NEGATIV":
negated = True
i += 1
elif lines[i].strip() == "_POSITIV":
i += 1
return i, {"type": "CONTACT", "name": contact_name, "negated": negated}
def _parse_function_block(self, lines, start_idx):
"""Parse un bloque de función o llamada a ACTION"""
i = start_idx + 1
fb_name = ""
inputs = []
action_call = None
max_iterations = 50 # Protección contra bucles infinitos
iterations = 0
if i < len(lines):
fb_name = lines[i].strip()
i += 1
# PATRÓN IDENTIFICADO: ??? indica llamada a ACTION
if fb_name == "???":
print(
f" Detectado patrón ??? en línea {i-1} - Buscando ACTION call..."
)
# Buscar el nombre completo de la ACTION más adelante
action_call = self._find_action_call_name(lines, i)
if action_call:
print(f" ✓ ACTION call encontrada: {action_call}")
return i, {"type": "ACTION_CALL", "name": action_call, "inputs": []}
else:
print(f" ⚠ Patrón ??? pero no se encontró nombre de ACTION")
else:
print(f" Detectado Function Block directo: {fb_name}")
# Parse inputs del function block normal
while (
i < len(lines)
and not lines[i].strip().startswith("_OUTPUT")
and iterations < max_iterations
):
line = lines[i].strip()
iterations += 1
if line.startswith("_OPERAND"):
i += 2 # Saltar _EXPRESSION
if i < len(lines):
inputs.append(lines[i].strip())
i += 1
elif line and not line.startswith("_"):
# Para function blocks normales, recopilar inputs
# Para ACTION calls perdidas, último intento
if "." in line and fb_name == "???" and action_call is None:
action_call = line
print(f" ✓ ACTION call encontrada (fallback): {action_call}")
return i + 1, {
"type": "ACTION_CALL",
"name": action_call,
"inputs": inputs,
}
elif line.startswith("ENABLELIST"):
# Fin del bloque
break
else:
i += 1
# Salida de emergencia del bucle
if iterations >= max_iterations:
print(
f" ADVERTENCIA: Bucle infinito evitado en function block en línea {start_idx}"
)
break
return i, {
"type": "FUNCTION_BLOCK",
"name": fb_name,
"instance_name": fb_name, # Usar el mismo nombre como instancia
"inputs": inputs,
"outputs": [],
}
def _find_action_call_name(self, lines, start_idx):
"""Buscar el nombre completo de la ACTION después de ???"""
i = start_idx
print(f" Buscando ACTION name desde línea {start_idx}...")
while i < len(lines) and i < start_idx + 15: # Buscar en las próximas 15 líneas
line = lines[i].strip()
print(f" Línea {i}: '{line}'")
# Buscar patrón namespace.actionname (ej: _Filling_Head_PID_Ctrl.Read_Analog)
if (
"." in line
and "ENABLELIST" not in line
and "EXPRESSION" not in line
and "BOX_EXPR" not in line
and "ENABLED" not in line
and "OUTPUTS" not in line
and "NO_SET" not in line
and "OUTPUT" not in line
and line != "_POSITIV"
and line != "_NEGATIV"
and line != "_NO_SET"
):
print(f" ✓ ACTION name encontrado: {line}")
return line
i += 1
print(
f" ⚠ No se encontró ACTION name en {start_idx + 15 - start_idx} líneas"
)
return None
def _parse_comment(self, lines, start_idx):
"""Parse comentario"""
i = start_idx + 1
comment_lines = []
while i < len(lines):
line = lines[i].strip()
if line == "_END_COMMENT":
break
if line and not line.startswith("_"):
comment_lines.append(line)
i += 1
return i + 1, " ".join(comment_lines)
def _parse_enable_list(self, lines, start_idx):
"""Parsear una lista de ejecución ENABLELIST"""
i = start_idx
calls = []
# Buscar línea ENABLELIST
while i < len(lines) and not lines[i].strip().startswith("ENABLELIST"):
i += 1
if i < len(lines) and lines[i].strip().startswith("ENABLELIST"):
enable_line = lines[i].strip()
num_enabled = (
int(enable_line.split(":")[1].strip()) if ":" in enable_line else 0
)
print(f" 📋 Procesando ENABLELIST con {num_enabled} llamadas")
i += 1 # Moverse a la siguiente línea después de ENABLELIST
for call_idx in range(num_enabled):
if i >= len(lines):
break
print(
f" 🔍 Buscando llamada {call_idx+1}/{num_enabled} en línea {i}"
)
# Buscar el próximo _ASSIGN
assign_found = False
while i < len(lines) and not assign_found:
line_content = lines[i].strip()
print(f" 🔎 Línea {i}: '{line_content}'")
if line_content == "_ASSIGN":
assign_found = True
i += 1
print(f" ✅ Encontrado _ASSIGN en línea {i-1}")
i, call = self._parse_execution_block(lines, i)
if call:
calls.append(call)
print(
f" ✅ Llamada parseada: {call['type']} - {call['name']}"
)
else:
print(f" ❌ No se pudo parsear la llamada")
break
elif line_content.startswith("ENABLELIST_END"):
print(f" 🔚 Encontrado ENABLELIST_END prematuro")
break
else:
i += 1
if not assign_found:
print(f" ❌ No se encontró _ASSIGN para llamada {call_idx+1}")
break
# Avanzar hasta el final de la lista de habilitación
while i < len(lines) and not lines[i].strip().startswith("ENABLELIST_END"):
i += 1
if i < len(lines) and lines[i].strip() == "ENABLELIST_END":
print(f" 🔚 Encontrado ENABLELIST_END en línea {i}")
i += 1
print(f" 📊 Total de llamadas parseadas: {len(calls)}")
return i, calls
def _parse_execution_block(self, lines, start_idx):
"""Parsear un bloque de ejecución (_FUNCTION o _FUNCTIONBLOCK)"""
i = start_idx
call_data = {
"type": None, # FUNCTION, FUNCTION_BLOCK, OPERATOR
"instance_name": None, # Para FB
"name": None, # Para FUNCTION o OPERATOR
"inputs": [],
"outputs": [], # Lista de variables de salida
}
# Encontrar el tipo de bloque (_FUNCTION, _FUNCTIONBLOCK, etc.)
block_type_line_idx = -1
temp_i = i
while temp_i < len(lines) and temp_i < i + 15:
line = lines[temp_i].strip()
if (
line.startswith("_FUNCTIONBLOCK")
or line.startswith("_FUNCTION")
or line.startswith("_OPERATOR")
):
block_type_line_idx = temp_i
break
temp_i += 1
if block_type_line_idx == -1:
return i, None
i = block_type_line_idx
line = lines[i].strip()
if line.startswith("_FUNCTIONBLOCK"):
call_data["type"] = "FUNCTION_BLOCK"
i += 1
call_data["instance_name"] = lines[i].strip()
elif line.startswith("_FUNCTION"):
call_data["type"] = "FUNCTION"
elif line.startswith("_OPERATOR"):
call_data["type"] = "OPERATOR"
i += 1
# Saltar _BOX_EXPR y _ENABLED
while i < len(lines) and (
lines[i].strip().startswith("_BOX_EXPR")
or lines[i].strip().startswith("_ENABLED")
):
i += 1
# Parsear inputs (_OPERAND)
while i < len(lines):
line = lines[i].strip()
if line.startswith("_OPERAND"):
i += 1
# Saltar _EXPRESSION y polaridad si existen
if i < len(lines) and lines[i].strip() == "_EXPRESSION":
i += 2 # Salta _EXPRESSION y _POSITIV/_NEGATIV
if i < len(lines):
call_data["inputs"].append(lines[i].strip())
i += 1
else:
break
elif line.startswith("ENABLELIST"): # Algunos FBs anidados
i += 1
else:
break
# El nombre de la función/bloque viene después de los operandos
name_found = False
while i < len(lines) and not name_found:
line = lines[i].strip()
if (
line
and not line.startswith("_")
and not line.startswith("ENABLELIST")
and "FB41_PID" not in line # Excepción para PID
):
call_data["name"] = line
name_found = True
# Verificar si es una función conocida en el registry
if call_data[
"type"
] == "FUNCTION" and self.function_registry.has_function(line):
print(f" ✓ Función conocida detectada: {line}")
elif call_data[
"type"
] == "FUNCTION_BLOCK" and self.function_registry.has_function_block(
line
):
print(f" ✓ Function Block conocido detectado: {line}")
elif line.startswith("_OUTPUTS"):
break
i += 1
# Si el nombre no se encontró (caso de FBs como PID), usar la instancia
if not call_data["name"] and call_data["type"] == "FUNCTION_BLOCK":
# El nombre del tipo de FB está después de los outputs en algunos casos
pass
# Parsear outputs
while i < len(lines):
line = lines[i].strip()
if line.startswith("_OUTPUTS"):
i += 1 # Saltar línea _OUTPUTS
# Bucle para cada _OUTPUT
while i < len(lines) and lines[i].strip().startswith("_OUTPUT"):
i += 1 # Saltar línea _OUTPUT
# Saltar polaridad y _NO_SET
while i < len(lines) and lines[i].strip() in [
"_POSITIV",
"_NEGATIV",
"_NO_SET",
]:
i += 1
if i < len(lines):
output_var = lines[i].strip()
if output_var != "_EMPTY":
call_data["outputs"].append(output_var)
i += 1
else:
break
elif line.startswith("ENABLELIST_END"):
# Parar aquí - fin del ENABLELIST
break
elif line == "_ASSIGN":
# Parar aquí - otra llamada en el ENABLELIST
print(
f" 🔚 Encontrado siguiente _ASSIGN en línea {i}, terminando parsing de llamada actual"
)
break
else:
# El nombre del tipo de FB puede estar aquí
if call_data["type"] == "FUNCTION_BLOCK" and not call_data["name"]:
if line and not line.startswith("_"):
call_data["name"] = line
# Verificar si es un FB conocido
if self.function_registry.has_function_block(line):
print(f" ✓ Function Block conocido detectado: {line}")
i += 1
return i, call_data
def _parse_empty_network(self, lines, start_idx):
"""Parse una red que empieza con _EMPTY y puede contener funciones"""
i = start_idx
max_iterations = 20
iterations = 0
function_found = None
target_found = None
print(f" Entrando a _parse_empty_network desde línea {start_idx}")
while (
i < len(lines)
and iterations < max_iterations
and not lines[i].strip().startswith("_OUTPUT")
and not lines[i].strip() == "_NETWORK"
):
line = lines[i].strip()
iterations += 1
print(f" Línea {i}: '{line}'")
if line.startswith("_FUNCTIONBLOCK"):
# Encontrada llamada a función/ACTION
print(f" ENCONTRADO _FUNCTIONBLOCK en línea {i}")
i, logic = self._parse_function_block(lines, i)
function_found = logic
elif line.startswith("_FUNCTION"):
# Otra variante de función
print(f" ENCONTRADO _FUNCTION en línea {i}")
i += 1
# Buscar el nombre de la función
while i < len(lines) and i < start_idx + 10:
func_line = lines[i].strip()
print(f" Buscando nombre función línea {i}: '{func_line}'")
if (
func_line
and not func_line.startswith("_")
and "ENABLELIST" not in func_line
):
print(f" ENCONTRADO nombre función: {func_line}")
function_found = {
"type": "FUNCTION_CALL",
"name": func_line,
"inputs": [],
}
break
i += 1
elif line.startswith("ENABLELIST"):
print(f" Encontrado ENABLELIST, continuando búsqueda...")
# No terminar aquí, continuar buscando _ASSIGN
i += 1
elif line.startswith("_ASSIGN"):
print(f" ENCONTRADO _ASSIGN en línea {i}")
# Buscar función dentro de _ASSIGN
i += 1
i, assign_logic = self._parse_assign_section(lines, i)
if assign_logic:
function_found = assign_logic
elif line.startswith("_OUTPUT"):
# Buscar el target después de _OUTPUT
print(f" ENCONTRADO _OUTPUT, buscando target...")
i += 1
while i < len(lines) and lines[i].strip().startswith("_"):
i += 1
if i < len(lines) and lines[i].strip() and "ENABLELIST" not in lines[i]:
target_found = lines[i].strip()
print(f" ENCONTRADO target: {target_found}")
break
else:
i += 1
# Si encontramos una función, crear red independiente
if function_found and target_found:
print(
f" Creando red independiente para función con target: {target_found}"
)
self._create_function_network(function_found, target_found)
return i, function_found
elif function_found:
print(f" Función encontrada pero sin target específico")
# Crear target por defecto basado en el tipo de función
if function_found["type"] == "ACTION_CALL":
default_target = "mDummy" # Variable dummy típica para ACTION calls
elif function_found["type"] == "FUNCTION_CALL":
# Para funciones como ProductLiterInTank, usar la variable global correspondiente
if function_found["name"] == "gProductTankLevel":
default_target = (
"gTankProdAmount" # Variable que se asigna según el contexto
)
else:
default_target = "mDummy"
else:
# Caso por defecto para otros tipos de función (FUNCTION_BLOCK, etc.)
default_target = "mDummy"
print(f" Usando target por defecto: {default_target}")
self._create_function_network(function_found, default_target)
return i, function_found
print(f" _parse_empty_network terminó sin encontrar función")
return i, None
def _parse_assign_section(self, lines, start_idx):
"""Parse una sección _ASSIGN que puede contener funciones"""
i = start_idx
max_iterations = 15
iterations = 0
print(f" Entrando a _parse_assign_section desde línea {start_idx}")
while (
i < len(lines)
and iterations < max_iterations
and not lines[i].strip() == "_NETWORK"
and not lines[i].strip() == "ENABLELIST_END"
):
line = lines[i].strip()
iterations += 1
print(f" Línea {i}: '{line}'")
if line.startswith("_FUNCTIONBLOCK"):
print(f" ENCONTRADO _FUNCTIONBLOCK en _ASSIGN: línea {i}")
i, logic = self._parse_function_block(lines, i)
return i, logic
elif line.startswith("_FUNCTION"):
print(f" ENCONTRADO _FUNCTION en _ASSIGN: línea {i}")
# Usar la misma lógica que _parse_execution_block
i, call_info = self._parse_execution_block(lines, i)
if call_info:
return i, call_info
else:
i += 1
elif line == "ENABLELIST_END":
print(f" Encontrado ENABLELIST_END, terminando")
break
else:
i += 1
print(f" _parse_assign_section terminó sin encontrar función")
return i, None
def _create_function_network(self, function_logic, target_name):
"""Crear una red independiente para una función o ACTION call"""
self.current_network_id += 1
function_network = {
"id": self.current_network_id,
"comment": f'Llamada a función: {function_logic.get("name", "unknown")}',
"logic": function_logic,
"targets": [target_name],
"function_blocks": [],
"calls": [function_logic], # ✅ Añadir la llamada al array calls
}
self.networks.append(function_network)
print(
f" Red de función {function_network['id']} creada para {function_logic['type']}: {function_logic.get('name', 'Sin nombre')}"
)
print(f" Target: '{target_name}'")
return function_network
def convert_to_structured(self):
"""Convertir a código SCL completo con variables y ACTIONs"""
output = []
# Header del archivo
output.append("(* Código SCL generado desde LAD TwinCAT *)")
output.append("(* Convertidor mejorado con SymPy - Estructura DNF preferida *)")
if self.program_path:
output.append(f"(* Path original: {self.program_path} *)")
output.append("")
# Verificar si es un TYPE o VAR_GLOBAL_LIST - manejarlos diferente
program_type = self.program_type if hasattr(self, "program_type") else "PROGRAM"
if program_type == "TYPE":
return self._convert_type_to_scl(output)
elif program_type == "VAR_GLOBAL_LIST":
return self._convert_global_vars_to_scl(output)
# Declaración del programa/función
program_name = self.program_name if self.program_name else "ConvertedProgram"
# Para FUNCTIONs, necesitamos detectar el tipo de retorno
if program_type == "FUNCTION":
# Buscar tipo de retorno en el código original (ej: FUNCTION ArrayToReal : REAL)
return_type = self._extract_function_return_type()
if return_type:
output.append(f"FUNCTION {program_name} : {return_type}")
else:
output.append(f"FUNCTION {program_name} : BOOL") # Tipo por defecto
else:
output.append(f"{program_type} {program_name}")
# Declaraciones de variables
self._add_variable_sections(output)
# Inicio del código principal
output.append("")
output.append("(* === CÓDIGO PRINCIPAL === *)")
output.append("")
# Código principal - LAD o ST
if self.networks:
# Hay redes LAD - generar código desde redes
output.append(" (* Código LAD convertido *)")
for network in self.networks:
output.append(f" // Red {network['id']}")
if network["comment"]:
output.append(f" // {network['comment']}")
# Procesar lógica si existe
condition_str = ""
if network["logic"]:
# Usar expresión DNF optimizada si está disponible
if network["id"] in self.sympy_expressions:
sympy_expr = self.sympy_expressions[network["id"]]
condition_str = self._format_dnf_for_lad(sympy_expr)
else:
# Fallback al método original
condition_str = self._convert_logic_to_string(network["logic"])
# Asegurar que el campo 'calls' existe antes de accederlo
network_calls = network.get("calls", [])
# ✅ PROCESAR LLAMADAS INDEPENDIENTEMENTE DE LA LÓGICA
if network_calls:
print(
f" 🔧 GENERANDO CÓDIGO - Red {network['id']} tiene {len(network_calls)} llamadas"
)
# Si hay condición, generar IF
if condition_str:
if "\n" in condition_str:
output.append(f" IF {condition_str} THEN")
else:
output.append(f" IF {condition_str} THEN")
# Generar las llamadas a función/FB
print(
f" 🔧 Generando {len(network_calls)} llamadas para Red {network['id']}"
)
for call in network_calls:
print(f" 🔧 Procesando: {call['type']} - {call['name']}")
call_str = self._convert_call_to_string(
call, " " if condition_str else " "
)
print(f" 🔧 Código generado: {call_str}")
output.append(call_str)
# Si había condición, cerrar IF
if condition_str:
output.append(" END_IF;")
elif network["targets"]:
# Red sin llamadas pero con target(s)
if condition_str:
output.append(f" IF {condition_str} THEN")
for target in network["targets"]:
output.append(f" {target} := TRUE;")
output.append(" ELSE")
for target in network["targets"]:
output.append(f" {target} := FALSE;")
output.append(" END_IF;")
else:
for target in network["targets"]:
output.append(f" {target} := TRUE; // Sin condición")
output.append("")
elif self.st_main_code:
# Hay código ST - copiarlo directamente preservando indentación
output.append(" (* Código ST original *)")
for line in self.st_main_code.split("\n"):
if line.strip():
# Preservar indentación original pero agregar indentación base SCL
output.append(f" {line}")
else:
output.append("")
else:
# No hay código principal
output.append(" (* Sin código principal detectado *)")
output.append("")
# Agregar ACTIONs como subfunciones
self._add_actions_as_procedures(output, program_name)
# Fin del programa principal
program_type = self.program_type if hasattr(self, "program_type") else "PROGRAM"
output.append(f"END_{program_type}")
return "\n".join(output)
def _add_variable_sections(self, output):
"""Agregar todas las secciones de variables al código"""
# Orden preferido de las secciones
section_order = ["VAR_INPUT", "VAR_OUTPUT", "VAR_IN_OUT", "VAR"]
for section_type in section_order:
if section_type in self.var_sections:
output.append(f"{section_type}")
for var_line in self.var_sections[section_type]:
# Asegurar que termine con punto y coma
if not var_line.endswith(";"):
var_line += " ;"
output.append(f" {var_line}")
output.append("END_VAR")
output.append("")
def _convert_call_to_string(self, call, indent=""):
"""Convertir una llamada de ejecución a string SCL"""
if not call:
return ""
if call["type"] == "FUNCTION":
# Buscar información de la función en el registry
func_info = self.function_registry.get_function(call["name"])
if func_info:
# Usar información del registry para generar llamada correcta
# Mapear inputs disponibles a parámetros de la función
param_assignments = []
for i, (param_name, param_type) in enumerate(func_info.inputs):
if i < len(call["inputs"]):
param_assignments.append(f"{param_name} := {call['inputs'][i]}")
inputs_str = ", ".join(param_assignments)
output_var = call["outputs"][0] if call["outputs"] else "mDummy"
print(
f" 🎯 Generando llamada a función con interfaz conocida: {call['name']}"
)
print(f" Parámetros: {inputs_str}")
print(f" Salida: {output_var}")
# Generar llamada con parámetros nombrados
return f"{indent}{output_var} := {call['name']}({inputs_str});"
else:
# Fallback al método original si no hay información
inputs_str = ", ".join(call["inputs"])
output_var = call["outputs"][0] if call["outputs"] else "mDummy"
return f"{indent}{output_var} := {call['name']}({inputs_str}); // Sin info de interfaz"
elif call["type"] == "FUNCTION_BLOCK":
# Buscar información del FB en el registry
fb_info = self.function_registry.get_function_block(call["name"])
instance_name = call["instance_name"]
if fb_info:
# Usar información del registry para generar llamada correcta
param_assignments = []
for i, (param_name, param_type) in enumerate(fb_info.inputs):
if i < len(call["inputs"]):
param_assignments.append(f"{param_name} := {call['inputs'][i]}")
inputs_str = ", ".join(param_assignments)
fb_call = f"{indent}{instance_name}({inputs_str});"
# Generar asignaciones de salida usando información del registry
if call["outputs"] and fb_info.outputs:
for i, output_var in enumerate(call["outputs"]):
if i < len(fb_info.outputs):
output_param_name = fb_info.outputs[i][0]
fb_call += f"\n{indent}{output_var} := {instance_name}.{output_param_name};"
return fb_call
else:
# Fallback al método original
inputs_str = ", ".join(call["inputs"])
fb_call = (
f"{indent}{instance_name}({inputs_str}); // Sin info de interfaz"
)
if call["outputs"]:
output_var = call["outputs"][0]
# Heurística: el nombre de la salida suele ser 'FilterOut' o similar
output_pin_name = "FilterOut" # Suposición basada en LowPassFilter
if "stat" in instance_name.lower():
output_pin_name = (
"MeanValue" # Suposición para StatisticalAnalisys
)
fb_call += (
f"\n{indent}{output_var} := {instance_name}.{output_pin_name};"
)
return fb_call
elif call["type"] == "OPERATOR":
# Formato: output := Operator(input1, input2, ...)
inputs_str = ", ".join(call["inputs"])
output_var = call["outputs"][0] if call["outputs"] else "mDummy"
return f"{indent}{output_var} := {call['name']}({inputs_str});"
return f"{indent}(* Llamada no reconocida: {call['type']} - {call['name']} *)"
def _add_actions_as_procedures(self, output, program_name):
"""Agregar ACTIONs como procedimientos independientes"""
if not self.actions:
return
output.append("")
output.append("(* === SUBFUNCIONES (ACTIONs convertidas) === *)")
output.append("")
for action_name, action_data in self.actions.items():
# Nombre completo como se usa en TwinCAT (sin doble underscore)
full_name = f"{program_name}_{action_name}"
output.append(f"PROCEDURE {full_name}")
output.append("(* Convertida desde ACTION *)")
output.append("")
if isinstance(action_data, dict) and action_data["type"] == "LAD":
# ACTION con código LAD - convertir a SCL
output.append(" (* Código LAD convertido a SCL *)")
for network in action_data["networks"]:
# Asegurar que el campo 'calls' existe
if "calls" not in network:
network["calls"] = []
if "targets" not in network:
network["targets"] = []
output.append(f" // Red {network['id']}")
if network["comment"]:
output.append(f" // {network['comment']}")
# Generar código para la red (condición + llamadas)
condition_str = self._convert_logic_to_string(network["logic"])
# Asegurar que el campo 'calls' existe antes de accederlo
network_calls = network.get("calls", [])
if condition_str or (not network["logic"] and network_calls):
if condition_str:
if "\n" in condition_str:
output.append(f" IF {condition_str} THEN")
else:
output.append(f" IF {condition_str} THEN")
indent = " " if condition_str else " "
if network_calls:
for call in network_calls:
call_str = self._convert_call_to_string(call, indent)
output.append(call_str)
elif network["targets"]:
for target in network["targets"]:
output.append(f"{indent}{target} := TRUE;")
if condition_str:
if not network_calls and network["targets"]:
output.append(" ELSE")
for target in network["targets"]:
output.append(f" {target} := FALSE;")
output.append(" END_IF;")
elif network["targets"]:
for target in network["targets"]:
output.append(
f" {target} := TRUE; // Sin condición, solo target"
)
output.append("")
elif isinstance(action_data, dict) and action_data["type"] == "ST":
# ACTION con código ST - copiar directamente preservando indentación
output.append(" (* Código ST original *)")
for line in action_data["code"].split("\n"):
if line.strip():
# Preservar indentación original pero agregar indentación base SCL
output.append(f" {line}")
else:
output.append("")
else:
# Compatibilidad hacia atrás - código directo (legacy)
output.append(" (* Código original - modo compatibilidad *)")
for line in str(action_data).split("\n"):
if line.strip():
# Preservar indentación original pero agregar indentación base SCL
output.append(f" {line}")
else:
output.append("")
output.append("")
output.append("END_PROCEDURE")
output.append("")
def _convert_logic_to_string(self, logic):
"""Convertir lógica LAD a string estructurado"""
if not logic:
return ""
if logic["type"] == "CONTACT":
if logic["negated"]:
return f"NOT {logic['name']}"
else:
return logic["name"]
elif logic["type"] == "AND":
operand_strings = []
for operand in logic["operands"]:
operand_str = self._convert_logic_to_string(operand)
if operand_str:
operand_strings.append(operand_str)
if len(operand_strings) > 1:
return "(" + " AND ".join(operand_strings) + ")"
elif len(operand_strings) == 1:
return operand_strings[0]
else:
return ""
elif logic["type"] == "OR":
operand_strings = []
for operand in logic["operands"]:
operand_str = self._convert_logic_to_string(operand)
if operand_str:
operand_strings.append(operand_str)
if len(operand_strings) > 1:
return "(" + " OR ".join(operand_strings) + ")"
elif len(operand_strings) == 1:
return operand_strings[0]
else:
return ""
elif logic["type"] == "FUNCTION_BLOCK":
# Buscar información del FB en el registry
fb_info = self.function_registry.get_function_block(logic["name"])
if fb_info:
# Generar llamada con parámetros conocidos
param_assignments = []
for i, (param_name, param_type) in enumerate(fb_info.inputs):
if i < len(logic["inputs"]):
param_assignments.append(
f"{param_name} := {logic['inputs'][i]}"
)
inputs_str = ", ".join(param_assignments)
return f"{logic['name']}({inputs_str})"
else:
# Fallback al método original
inputs_str = ", ".join(logic["inputs"]) if logic["inputs"] else ""
return f"{logic['name']}({inputs_str})"
elif logic["type"] == "ACTION_CALL":
return logic["name"] # Solo devolver el nombre, no CALL aquí
elif logic["type"] == "FUNCTION_CALL":
# Buscar información de la función en el registry
func_info = self.function_registry.get_function(logic["name"])
if func_info:
# Generar llamada con parámetros conocidos
param_assignments = []
for i, (param_name, param_type) in enumerate(func_info.inputs):
if i < len(logic.get("inputs", [])):
param_assignments.append(
f"{param_name} := {logic['inputs'][i]}"
)
inputs_str = ", ".join(param_assignments)
return f"{logic['name']}({inputs_str})"
else:
# Fallback al método original
return f"{logic['name']}()"
return ""
def save_to_file(self, output_path):
"""Guardar código estructurado"""
structured_code = self.convert_to_structured()
with open(output_path, "w", encoding="utf-8") as f:
f.write(structured_code)
return structured_code
def print_debug_info(self):
"""Mostrar información de debug sobre los networks parseados"""
print(f"\n=== DEBUG INFO - {len(self.networks)} networks encontrados ===")
for network in self.networks:
print(f"\nRed {network['id']}:")
if network["comment"]:
print(f" Comentario: {network['comment']}")
print(f" Targets: {network['targets']}")
if network["logic"]:
print(f" Lógica: {self._debug_logic_string(network['logic'])}")
condition_str = self._convert_logic_to_string(network["logic"])
print(f" Condición: {condition_str}")
else:
print(" Sin lógica")
# ¡AGREGAR DEBUGGING PARA LLAMADAS!
if "calls" in network and network["calls"]:
print(f" ✅ Llamadas: {len(network['calls'])}")
for i, call in enumerate(network["calls"]):
print(f" {i+1}. {call['type']}: {call['name']}")
if call["inputs"]:
print(f" Inputs: {call['inputs']}")
if call.get("outputs"):
print(f" Outputs: {call['outputs']}")
else:
print(" ❌ Sin llamadas")
def _debug_logic_string(self, logic, indent=0):
"""Crear string de debug para la lógica"""
if not logic:
return "None"
prefix = " " * indent
if logic["type"] == "CONTACT":
neg_str = " (NEGADO)" if logic["negated"] else ""
return f"{prefix}CONTACT: {logic['name']}{neg_str}"
elif logic["type"] == "AND":
result = f"{prefix}AND:\n"
for operand in logic["operands"]:
result += self._debug_logic_string(operand, indent + 1) + "\n"
return result.rstrip()
elif logic["type"] == "OR":
result = f"{prefix}OR:\n"
for operand in logic["operands"]:
result += self._debug_logic_string(operand, indent + 1) + "\n"
return result.rstrip()
elif logic["type"] == "FUNCTION_BLOCK":
return f"{prefix}FUNCTION_BLOCK: {logic['name']} inputs: {logic['inputs']}"
elif logic["type"] == "ACTION_CALL":
return f"{prefix}ACTION_CALL: {logic['name']}"
elif logic["type"] == "FUNCTION_CALL":
return f"{prefix}FUNCTION_CALL: {logic['name']}"
return f"{prefix}UNKNOWN: {logic}"
def _logic_to_sympy(self, logic):
"""Convertir lógica LAD a expresión SymPy"""
if not logic:
return None
if logic["type"] == "CONTACT":
symbol = self.symbol_manager.get_symbol(logic["name"])
if logic["negated"]:
return Not(symbol)
else:
return symbol
elif logic["type"] == "AND":
sympy_operands = []
for operand in logic["operands"]:
operand_sympy = self._logic_to_sympy(operand)
if operand_sympy is not None:
sympy_operands.append(operand_sympy)
if len(sympy_operands) > 1:
return And(*sympy_operands)
elif len(sympy_operands) == 1:
return sympy_operands[0]
else:
return None
elif logic["type"] == "OR":
sympy_operands = []
for operand in logic["operands"]:
operand_sympy = self._logic_to_sympy(operand)
if operand_sympy is not None:
sympy_operands.append(operand_sympy)
if len(sympy_operands) > 1:
return Or(*sympy_operands)
elif len(sympy_operands) == 1:
return sympy_operands[0]
else:
return None
elif logic["type"] == "FUNCTION_BLOCK":
# Para function blocks, creamos un símbolo especial
fb_name = f"{logic['name']}({', '.join(logic['inputs'])})"
return self.symbol_manager.get_symbol(fb_name)
elif logic["type"] == "ACTION_CALL":
# Para llamadas a ACTION, creamos un símbolo especial
action_name = f"CALL_{logic['name'].replace('.', '_')}"
return self.symbol_manager.get_symbol(action_name)
return None
def _sympy_to_structured_string(self, sympy_expr):
"""Convertir expresión SymPy a string estructurado"""
if sympy_expr is None:
return ""
# Crear un mapeo inverso de símbolos a nombres originales
symbol_to_name = {}
for original_name, symbol in self.symbol_manager._symbol_cache.items():
symbol_to_name[str(symbol)] = original_name
# Convertir la expresión a string
str_expr = str(sympy_expr)
# Reemplazar símbolos por nombres originales
for symbol_str, original_name in symbol_to_name.items():
str_expr = str_expr.replace(symbol_str, original_name)
# Reemplazar operadores SymPy por operadores IEC61131-3
str_expr = str_expr.replace("&", " AND ")
str_expr = str_expr.replace("|", " OR ")
str_expr = str_expr.replace("~", "NOT ")
# Limpiar espacios múltiples
str_expr = re.sub(r"\s+", " ", str_expr)
return str_expr.strip()
def optimize_expressions(self):
"""Optimizar todas las expresiones usando SymPy - Preferir DNF para LAD"""
print("\n=== Optimizando expresiones con SymPy (forzando DNF para LAD) ===")
for network in self.networks:
if network["logic"]:
print(f"\nOptimizando Red {network['id']}:")
# Convertir a SymPy
sympy_expr = self._logic_to_sympy(network["logic"])
if sympy_expr:
print(f" Expresión original: {sympy_expr}")
# Simplificar primero
try:
simplified = simplify(sympy_expr)
print(f" Simplificada: {simplified}")
# Verificar complejidad antes de DNF
num_symbols = len(simplified.free_symbols)
complexity = self._estimate_expression_complexity(simplified)
if num_symbols > 12 or complexity > 800:
print(
f" ADVERTENCIA: Expresión muy compleja ({num_symbols} símbolos, complejidad {complexity})"
)
print(
f" Saltando conversión DNF por rendimiento - usando simplificación básica"
)
final_expr = simplified
else:
# SIEMPRE convertir a DNF para LAD (forma natural: (AND) OR (AND) OR (AND))
dnf_expr = to_dnf(simplified)
print(f" DNF (forma LAD preferida): {dnf_expr}")
# Post-procesar para eliminar contradicciones
final_expr = self._post_process_expression(dnf_expr)
# Verificar si el post-procesamiento cambió algo
if str(final_expr) != str(dnf_expr):
print(f" Post-procesada: {final_expr}")
self.sympy_expressions[network["id"]] = final_expr
except Exception as e:
print(f" Error optimizando: {e}")
self.sympy_expressions[network["id"]] = sympy_expr
def group_common_conditions(self):
"""Agrupar networks con condiciones similares o complementarias"""
print("\n=== Analizando agrupación de condiciones ===")
# Buscar networks que podrían agruparse
groupable_networks = []
for network in self.networks:
if (
network["logic"]
and network["targets"]
and network["id"] in self.sympy_expressions
):
groupable_networks.append(network)
if len(groupable_networks) < 2:
print("No hay suficientes networks para agrupar")
return
# Analizar condiciones comunes
print(f"Analizando {len(groupable_networks)} networks para agrupación:")
for i, net1 in enumerate(groupable_networks):
for j, net2 in enumerate(groupable_networks[i + 1 :], i + 1):
expr1 = self.sympy_expressions[net1["id"]]
expr2 = self.sympy_expressions[net2["id"]]
# Buscar factores comunes
try:
# Extraer factores comunes si es posible
common_factors = self._find_common_factors(expr1, expr2)
if common_factors:
print(
f" Red {net1['id']} y Red {net2['id']} comparten: {common_factors}"
)
# Verificar si son complementarias (útil para SET/RESET)
if self._are_complementary(expr1, expr2):
print(
f" Red {net1['id']} y Red {net2['id']} son complementarias"
)
except Exception as e:
print(
f" Error analizando Red {net1['id']} y Red {net2['id']}: {e}"
)
def _find_common_factors(self, expr1, expr2):
"""Encontrar factores comunes entre dos expresiones"""
try:
# Convertir a conjuntos de símbolos para análisis básico
symbols1 = expr1.free_symbols
symbols2 = expr2.free_symbols
common_symbols = symbols1.intersection(symbols2)
if len(common_symbols) > 1:
return f"{len(common_symbols)} símbolos comunes"
return None
except:
return None
def _are_complementary(self, expr1, expr2):
"""Verificar si dos expresiones son complementarias"""
try:
# Verificar si expr1 == NOT(expr2) simplificado
complement = Not(expr2)
simplified_complement = simplify(complement)
simplified_expr1 = simplify(expr1)
return simplified_expr1.equals(simplified_complement)
except:
return False
def _estimate_expression_complexity(self, expr):
"""Estimar la complejidad de una expresión para evitar explosión exponencial"""
try:
# Contar operadores AND/OR anidados
complexity = 0
expr_str = str(expr)
# Contar número de operadores
and_count = expr_str.count("&")
or_count = expr_str.count("|")
not_count = expr_str.count("~")
complexity += and_count * 2 # AND
complexity += or_count * 4 # OR (más costoso en DNF)
complexity += not_count * 1 # NOT
# Penalizar anidamiento profundo
depth = expr_str.count("(")
complexity += depth * 6
# Número de símbolos únicos
num_symbols = len(expr.free_symbols)
complexity += num_symbols * 15
# Detectar patrones problemáticos específicos
# CNF con muchos términos (patrón típico que causa explosión)
if and_count > 10 and or_count > 15:
complexity += 2000 # Penalización severa para CNF complejas
# Expresiones con muchos términos negativos (difíciles para DNF)
if not_count > 8:
complexity += 500
# Longitud total como indicador de complejidad
if len(expr_str) > 1000:
complexity += len(expr_str) // 2
return complexity
except:
return 999999 # Asumir muy complejo si hay error
def _post_process_expression(self, expr):
"""Post-procesar expresión para eliminar contradicciones obvias"""
try:
# Detectar contradicciones como X & ~X que deberían ser False
cleaned_expr = expr
# Aplicar simplificaciones adicionales
cleaned_expr = simplify(cleaned_expr)
# Si la expresión contiene contradicciones obvias, intentar limpiar
free_symbols = cleaned_expr.free_symbols
for symbol in free_symbols:
# Verificar si tenemos symbol & ~symbol en alguna parte
contradiction = And(symbol, Not(symbol))
if cleaned_expr.has(contradiction):
print(
f" Detectada contradicción eliminable: {symbol} AND NOT {symbol}"
)
# Reemplazar contradicción por False
cleaned_expr = cleaned_expr.replace(contradiction, sympy.false)
cleaned_expr = simplify(cleaned_expr)
return cleaned_expr
except:
return expr
def _format_dnf_for_lad(self, sympy_expr):
"""Formatear expresión DNF para código LAD más legible"""
if sympy_expr is None:
return ""
# Crear mapeo de símbolos a nombres originales
symbol_to_name = {}
for original_name, symbol in self.symbol_manager._symbol_cache.items():
symbol_to_name[str(symbol)] = original_name
# Convertir a string y analizar la estructura
str_expr = str(sympy_expr)
# Reemplazar símbolos por nombres originales
for symbol_str, original_name in symbol_to_name.items():
str_expr = str_expr.replace(symbol_str, original_name)
# Reemplazar operadores SymPy por IEC61131-3 primero
str_expr = str_expr.replace("&", " AND ")
str_expr = str_expr.replace("|", " OR ")
str_expr = str_expr.replace("~", "NOT ")
# Limpiar espacios múltiples
str_expr = re.sub(r"\s+", " ", str_expr).strip()
# Si es una expresión OR de términos AND principales, formatear cada término
# Buscar el patrón de OR principales (no anidados en paréntesis)
if " OR " in str_expr and not str_expr.startswith("("):
# Dividir por OR de nivel principal
# Esto es más complejo debido a paréntesis anidados
parts = self._split_main_or_terms(str_expr)
if len(parts) > 1:
formatted_terms = []
for part in parts:
part = part.strip()
# Asegurar que cada término tenga paréntesis si es complejo
if " AND " in part and not (
part.startswith("(") and part.endswith(")")
):
part = f"({part})"
formatted_terms.append(part)
# Unir con OR y saltos de línea para mejor legibilidad
return "\n OR ".join(formatted_terms)
return str_expr
def _split_main_or_terms(self, expr):
"""Dividir expresión por OR de nivel principal, respetando paréntesis"""
parts = []
current_part = ""
paren_level = 0
i = 0
while i < len(expr):
char = expr[i]
if char == "(":
paren_level += 1
current_part += char
elif char == ")":
paren_level -= 1
current_part += char
elif paren_level == 0 and expr[i : i + 4] == " OR ":
# OR de nivel principal encontrado
parts.append(current_part.strip())
current_part = ""
i += 3 # Saltar ' OR '
else:
current_part += char
i += 1
# Agregar la última parte
if current_part.strip():
parts.append(current_part.strip())
return parts if len(parts) > 1 else [expr]
def _parse_actions(self, content):
"""Extraer todas las ACTIONs del programa"""
# Buscar patrón ACTION nombre: ... END_ACTION
action_pattern = r"ACTION\s+(\w+)\s*:(.*?)END_ACTION"
action_matches = re.findall(action_pattern, content, re.DOTALL | re.IGNORECASE)
for action_name, action_code in action_matches:
# Limpiar el código de la ACTION
clean_code = action_code.strip()
# Verificar si es código LAD o ST
if "_LD_BODY" in clean_code:
# Es código LAD - necesita parsing
print(
f"ACTION LAD encontrada: {action_name} ({len(clean_code)} caracteres)"
)
self.actions[action_name] = self._parse_action_lad(
action_name, clean_code
)
else:
# Es código ST - copiar directamente
print(
f"ACTION ST encontrada: {action_name} ({len(clean_code)} caracteres)"
)
self.actions[action_name] = {"type": "ST", "code": clean_code}
print(f"Total ACTIONs: {len(self.actions)}")
def _parse_action_lad(self, action_name, action_content):
# We create a completely new, isolated instance of the converter to parse the action's LAD code.
# This prevents any state (like network counts) from leaking between the main program and the action parsing.
action_converter = SimpleLadConverter(self.function_registry)
# Extract just the LAD body from the action content
lad_body_match = re.search(r"_LD_BODY(.*)", action_content, re.DOTALL)
if lad_body_match:
lad_content = lad_body_match.group(1)
lines = lad_content.strip().split("\n")
# The parse_networks method will process all networks it finds in the provided lines.
action_converter._parse_networks(lines)
# The parsed networks are stored in the temporary converter's `networks` list.
# We return this list as the logic for the action.
return {"type": "LAD", "networks": action_converter.networks}
# If no LAD body is found, return an empty structure.
return {"type": "LAD", "networks": []}
def _extract_st_code(self, content):
"""Extraer código ST del programa principal"""
# Buscar desde después de END_VAR hasta ACTION o END_PROGRAM
var_end_match = re.search(r"END_VAR", content)
if not var_end_match:
self.st_main_code = None
return
start_index = var_end_match.end()
# Buscar el final del código principal
action_start_match = re.search(r"\nACTION", content[start_index:])
end_program_match = re.search(r"\nEND_PROGRAM", content[start_index:])
end_index = -1
if action_start_match:
end_index = start_index + action_start_match.start()
if end_program_match:
program_end = start_index + end_program_match.start()
if end_index == -1 or program_end < end_index:
end_index = program_end
if end_index == -1:
end_index = len(content)
# Extraer código ST
st_code = content[start_index:end_index].strip()
if st_code:
self.st_main_code = st_code
print(f"Código ST principal extraído: {len(st_code)} caracteres")
else:
self.st_main_code = None
def _extract_function_return_type(self):
"""Extraer el tipo de retorno de una FUNCTION desde el header"""
if not hasattr(self, "_original_content"):
return None
# Buscar patrón FUNCTION nombre : TIPO
match = re.search(r"FUNCTION\s+\w+\s*:\s*(\w+)", self._original_content)
if match:
return match.group(1)
return None
def _extract_type_content(self, content):
"""Extraer el contenido completo de un TYPE"""
# Buscar desde TYPE hasta END_TYPE
type_start = content.find("TYPE")
if type_start == -1:
return
# Buscar END_TYPE
type_end = content.find("END_TYPE", type_start)
if type_end == -1:
type_end = len(content)
else:
type_end += len("END_TYPE")
# Extraer contenido del TYPE
type_content = content[type_start:type_end].strip()
if type_content:
self.type_content = type_content
print(f"Contenido TYPE extraído: {len(type_content)} caracteres")
else:
self.type_content = None
def _convert_type_to_scl(self, output):
"""Convertir TYPE a SCL - simplemente copiar el contenido original"""
if self.type_content:
# Copiar directamente el contenido del TYPE preservando formato
for line in self.type_content.split("\n"):
if line.strip():
output.append(line)
else:
output.append("")
else:
# Si no hay contenido TYPE, generar estructura básica
program_name = self.program_name if self.program_name else "UnknownType"
output.append(f"TYPE {program_name}:")
output.append("STRUCT")
output.append(" (* Contenido TYPE no detectado *)")
output.append(" Dummy : INT;")
output.append("END_STRUCT")
output.append("END_TYPE")
return "\n".join(output)
def _extract_global_variables(self, content):
"""Extraer todas las secciones de variables globales"""
# Buscar todas las secciones VAR_GLOBAL
var_global_patterns = [
(r"VAR_GLOBAL\s+PERSISTENT(.*?)END_VAR", "VAR_GLOBAL PERSISTENT"),
(r"VAR_GLOBAL((?:(?!PERSISTENT)(?!END_VAR).)*?)END_VAR", "VAR_GLOBAL"),
]
for pattern, var_type in var_global_patterns:
matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE)
if matches:
variables = []
for match in matches:
# Parsear cada línea de variable
for line in match.split("\n"):
line = line.strip()
if (
line
and not line.startswith("(*")
and not line.startswith("(************")
and ":" in line
and not line.startswith("*")
):
# Limpiar comentarios inline
if "(*" in line:
# Mantener el comentario inline
pass # No limpiar comentarios para preservar información
variables.append(line.strip())
if variables:
if var_type not in self.var_sections:
self.var_sections[var_type] = []
self.var_sections[var_type].extend(variables)
# Mostrar resumen
for var_type, variables in self.var_sections.items():
print(f"Variables {var_type}: {len(variables)} encontradas")
def _convert_global_vars_to_scl(self, output):
"""Convertir archivo de variables globales a SCL"""
# Para archivos de variables globales, simplemente copiar las declaraciones
# Agregar secciones de variables encontradas
section_order = ["VAR_GLOBAL", "VAR_GLOBAL PERSISTENT"]
for section_type in section_order:
if section_type in self.var_sections:
output.append(f"{section_type}")
for var_line in self.var_sections[section_type]:
# Asegurar que termine con punto y coma
if not var_line.endswith(";"):
var_line += " ;"
output.append(f" {var_line}")
output.append("END_VAR")
output.append("")
# Si no se encontraron variables, agregar estructura básica
if not self.var_sections:
output.append("VAR_GLOBAL")
output.append(" (* No se detectaron variables globales *)")
output.append(" Dummy : INT ;")
output.append("END_VAR")
return "\n".join(output)
def is_function_only_file(file_path):
"""Verificar si un archivo contiene solo una función (sin código LAD)"""
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
# Buscar patrón de función
func_match = re.search(
r"(FUNCTION|FUNCTION_BLOCK)\s+(\w+)\s*:\s*(\w+)", content
)
if not func_match:
return False
# Verificar que NO tenga código LAD
has_lad = "_LD_BODY" in content
# Verificar que tenga principalmente declaraciones de variables
has_vars = "VAR_INPUT" in content or "VAR_OUTPUT" in content or "VAR" in content
# Es un archivo de función si tiene función, variables, pero no LAD
return func_match and has_vars and not has_lad
except:
return False
def collect_function_interfaces(input_directory, debug_mode=False):
"""Primera pasada: recopilar información de todas las funciones"""
print("=== PRIMERA PASADA: RECOPILANDO INTERFACES DE FUNCIONES ===")
function_registry = FunctionRegistry()
# Obtener todos los archivos .EXP
exp_pattern = os.path.join(input_directory, "*.EXP")
exp_files = glob.glob(exp_pattern)
function_files = []
for exp_file in exp_files:
filename = os.path.basename(exp_file)
# Verificar si es un archivo de función
if is_function_only_file(exp_file):
function_files.append(exp_file)
print(f" Analizando función: {filename}")
# Crear convertidor temporal solo para extraer interfaz
temp_converter = SimpleLadConverter()
func_info = temp_converter.parse_function_interface(exp_file)
if func_info:
function_registry.add_function(func_info)
else:
if debug_mode:
print(f" Saltando (no es función pura): {filename}")
print(f"\nFunciones encontradas: {len(function_registry.functions)}")
print(f"Function Blocks encontrados: {len(function_registry.function_blocks)}")
if debug_mode:
function_registry.print_summary()
return function_registry, function_files
def run_debug_mode(debug_file_arg):
"""Ejecuta el convertidor en modo debug para un solo archivo."""
print(f"=== MODO DEBUG: Procesando archivo específico ===")
print(f"Archivo: {debug_file_arg}")
print("🔧 VERIFICACIÓN: Script modificado y ejecutándose en tiempo real")
# Determinar el path absoluto del archivo de debug.
# Esto permite que el script se ejecute desde cualquier lugar.
debug_file_path = os.path.abspath(debug_file_arg)
if not debug_file_path.endswith(".EXP"):
debug_file_path += ".EXP"
if not os.path.exists(debug_file_path):
print(f"Error: No se encontró el archivo de debug: {debug_file_path}")
# Intenta buscarlo en el directorio relativo ExportTwinCat por si acaso
script_dir = os.path.dirname(os.path.abspath(__file__))
export_dir_path = os.path.join(
script_dir, "..", "ExportTwinCat", os.path.basename(debug_file_path)
)
if os.path.exists(export_dir_path):
debug_file_path = os.path.abspath(export_dir_path)
print(f"Archivo encontrado en path alternativo: {debug_file_path}")
else:
return
# El directorio de entrada y salida es el del archivo de debug
input_directory = os.path.dirname(debug_file_path)
scl_output_dir = input_directory
print(f"Directorio de análisis para interfaces: {input_directory}")
print(f"Directorio de salida para .SCL: {scl_output_dir}")
# PRIMERA PASADA: Recopilar interfaces de TODOS los archivos .EXP en el directorio
function_registry, _ = collect_function_interfaces(input_directory, debug_mode=True)
if function_registry.functions or function_registry.function_blocks:
print("\n🔧 REGISTRY DE FUNCIONES PARA EL DEBUG:")
function_registry.print_summary()
# SEGUNDA PASADA: Procesar solo el archivo de debug
print("\n=== SEGUNDA PASADA (DEBUG): CONVIRTIENDO ARCHIVO ÚNICO ===")
filename = os.path.basename(debug_file_path)
base_name = os.path.splitext(filename)[0]
scl_filename = f"{base_name}.scl"
scl_output_path = os.path.join(scl_output_dir, scl_filename)
print(f"{'='*60}")
print(f"Procesando: {filename}")
print(f"Salida: {scl_output_path}")
try:
converter = SimpleLadConverter(function_registry)
converter.parse_file(debug_file_path)
print(f" ✓ Redes encontradas: {len(converter.networks)}")
print(f" ✓ Secciones de variables: {list(converter.var_sections.keys())}")
print(f" ✓ ACTIONs encontradas: {list(converter.actions.keys())}")
converter.print_debug_info()
converter.optimize_expressions()
converter.group_common_conditions()
print(f" Generando código SCL...")
structured_code = converter.save_to_file(scl_output_path)
# Mostrar código por stdout como fue solicitado
print("\n\n=== CÓDIGO SCL GENERADO (STDOUT) ===")
print(structured_code)
print("====================================")
print(f"\n ✓ Guardado en: {scl_output_path}")
except Exception as e:
print(f" ✗ Error procesando {filename}: {e}")
import traceback
traceback.print_exc()
print(f"\n✓ Proceso de debug finalizado.")
def run_mass_conversion():
"""Ejecuta el convertidor en modo masivo usando la configuración global."""
print("=== Convertidor Masivo LAD a SCL con SymPy (2 Pasadas) ===")
# Cargar configuración
configs = load_configuration()
if not configs:
print("Error: No se pudo cargar la configuración. Abortando.")
return
# Obtener configuraciones
working_directory = configs.get("working_directory", "./")
level1_config = configs.get("level1", {})
level2_config = configs.get("level2", {})
level3_config = configs.get("level3", {})
debug_mode = level1_config.get("debug_mode", True)
show_optimizations = level1_config.get("show_optimizations", True)
scl_output_dir = level2_config.get("scl_output_dir", "scl")
show_generated_code = level2_config.get("show_generated_code", False)
max_display_lines = level2_config.get("max_display_lines", 50)
force_regenerate = level2_config.get("force_regenerate", False)
input_directory = level3_config.get("twincat_exp_directory", working_directory)
sympy_optimization = level3_config.get("sympy_optimization", True)
group_analysis = level3_config.get("group_analysis", True)
if not os.path.exists(working_directory):
print(f"Error: El directorio de trabajo no existe: {working_directory}")
return
if not os.path.exists(input_directory):
print(f"Error: El directorio de entrada no existe: {input_directory}")
return
full_scl_path = os.path.join(working_directory, scl_output_dir)
if not os.path.exists(full_scl_path):
os.makedirs(full_scl_path)
print(f"Directorio creado: {full_scl_path}")
# PRIMERA PASADA: Recopilar interfaces de funciones
function_registry, _ = collect_function_interfaces(input_directory, debug_mode)
# SEGUNDA PASADA: Procesar todos los archivos
exp_pattern = os.path.join(input_directory, "*.EXP")
exp_files = glob.glob(exp_pattern)
if not exp_files:
print(f"No se encontraron archivos .EXP en: {input_directory}")
return
print(f"\nEncontrados {len(exp_files)} archivos .EXP en: {input_directory}")
print(f"Directorio de salida SCL: {full_scl_path}")
print("\n=== SEGUNDA PASADA: CONVERSIÓN CON INTERFACES CONOCIDAS ===")
successful_conversions, failed_conversions = 0, 0
for exp_file in exp_files:
filename = os.path.basename(exp_file)
base_name = os.path.splitext(filename)[0]
scl_filename = f"{base_name}.scl"
scl_output_path = os.path.join(full_scl_path, scl_filename)
if os.path.exists(scl_output_path) and not force_regenerate:
print(
f"SALTANDO: {filename} - Ya existe. (Usar 'force_regenerate: true' para sobreescribir)"
)
successful_conversions += 1
continue
print(f"\n{'='*60}")
print(f"Procesando: {filename} -> {scl_filename}")
try:
converter = SimpleLadConverter(function_registry)
converter.parse_file(exp_file)
if debug_mode:
converter.print_debug_info()
if sympy_optimization and show_optimizations:
converter.optimize_expressions()
if group_analysis and show_optimizations:
converter.group_common_conditions()
structured_code = converter.save_to_file(scl_output_path)
if show_generated_code:
lines = structured_code.split("\n")
display_lines = min(max_display_lines, len(lines))
print(
f" Código SCL generado ({len(lines)} líneas, mostrando {display_lines}):"
)
for i in range(display_lines):
print(f" {i+1:3d}: {lines[i]}")
if len(lines) > display_lines:
print(f" ... ({len(lines) - display_lines} líneas más)")
print(f" ✓ Guardado en: {scl_output_path}")
successful_conversions += 1
except Exception as e:
print(f" ✗ Error procesando {filename}: {e}")
if debug_mode:
import traceback
traceback.print_exc()
failed_conversions += 1
print(f"\n{'='*60}")
print(f"RESUMEN DE CONVERSIÓN MASIVA:")
print(f" ✓ Exitosas: {successful_conversions}")
print(f" ✗ Fallidas: {failed_conversions}")
print(f" 📁 Directorio salida: {full_scl_path}")
def main():
"""Función principal - Despachador para modo debug o masivo."""
try:
import time
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
print("=== SCRIPT VERIFICADO Y EJECUTÁNDOSE CORRECTAMENTE ===")
print(f"🕒 Timestamp: {timestamp}")
if len(sys.argv) > 1:
# Modo debug si hay argumentos en la línea de comandos
run_debug_mode(sys.argv[1])
else:
# Modo masivo por defecto
run_mass_conversion()
except Exception as e:
print(f"Error general en main: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()