ParamManagerScripts/backend/script_groups/TwinCat/x1_lad_converter.py

1250 lines
51 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Convertidor simplificado de LAD TwinCAT a pseudocódigo estructurado
Versión mejorada con SymPy para optimización de expresiones lógicas
"""
import re
import sympy
import os
import sys
import glob
from sympy import symbols, And, Or, Not, simplify
from sympy.logic.boolalg import to_dnf
# Configurar el path al directorio raíz del proyecto
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
# Importar la función de configuración
from backend.script_utils import load_configuration
class SymbolManager:
"""Gestor de símbolos para SymPy"""
def __init__(self):
self._symbol_cache = {}
def get_symbol(self, name):
"""Obtener o crear símbolo SymPy"""
if name not in self._symbol_cache:
# Limpiar nombre para SymPy (sin espacios, caracteres especiales)
clean_name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
self._symbol_cache[name] = symbols(clean_name)
return self._symbol_cache[name]
class SimpleLadConverter:
"""Convertidor simplificado de LAD a código estructurado"""
def __init__(self):
self.networks = []
self.current_network_id = 0
self.symbol_manager = SymbolManager()
self.sympy_expressions = {} # Mapeo de network_id -> sympy expression
# Nuevas propiedades para estructura SCL completa
self.program_name = ""
self.program_path = ""
self.var_sections = {} # VAR, VAR_INPUT, VAR_OUTPUT, etc.
self.actions = {} # Diccionario de ACTIONs
def parse_file(self, file_path):
"""Parse el archivo LAD completo incluyendo variables y ACTIONs"""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Extraer información del header
self._parse_header_info(content)
# Extraer declaraciones de variables
self._parse_variable_declarations(content)
# Encontrar sección LAD
lad_start = content.find('_LD_BODY')
if lad_start != -1:
# Extraer contenido LAD hasta ACTION o END_PROGRAM
action_start = content.find('\nACTION', lad_start)
end_program = content.find('\nEND_PROGRAM', lad_start)
lad_end = action_start if action_start != -1 else end_program
if lad_end == -1:
lad_end = len(content)
lad_content = content[lad_start:lad_end]
lines = lad_content.split('\n')
self._parse_networks(lines)
else:
print("No se encontró _LD_BODY")
# Extraer ACTIONs
self._parse_actions(content)
def _parse_header_info(self, content):
"""Extraer información del header del programa"""
# Buscar PATH y nombre del programa
path_match = re.search(r'\(\* @PATH := \'([^\']+)\' \*\)', content)
if path_match:
self.program_path = path_match.group(1)
# Buscar nombre del programa/function_block
program_match = re.search(r'(PROGRAM|FUNCTION_BLOCK)\s+(\w+)', content)
if program_match:
self.program_name = program_match.group(2)
print(f"Programa encontrado: {self.program_name}")
if self.program_path:
print(f"Path: {self.program_path}")
def _parse_variable_declarations(self, content):
"""Extraer todas las declaraciones de variables"""
# Buscar todas las secciones VAR
var_patterns = [
(r'VAR_INPUT(.*?)END_VAR', 'VAR_INPUT'),
(r'VAR_OUTPUT(.*?)END_VAR', 'VAR_OUTPUT'),
(r'VAR_IN_OUT(.*?)END_VAR', 'VAR_IN_OUT'),
(r'VAR\s+(.*?)END_VAR', 'VAR'),
]
for pattern, var_type in var_patterns:
matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE)
if matches:
variables = []
for match in matches:
# Parsear cada línea de variable
for line in match.split('\n'):
line = line.strip()
if line and not line.startswith('(*') and ':' in line:
# Limpiar comentarios inline
if '(*' in line:
line = line[:line.find('(*')]
variables.append(line.strip())
if variables:
self.var_sections[var_type] = variables
print(f"Variables {var_type}: {len(variables)} encontradas")
def _parse_networks(self, lines):
"""Parse todas las redes"""
i = 0
expected_networks = 0
# Buscar cuántas redes se esperan
for line in lines:
if line.strip().startswith('_NETWORKS :'):
expected_networks = int(line.strip().split(':')[1].strip())
print(f"Se esperan {expected_networks} redes según el archivo")
break
while i < len(lines):
if lines[i].strip() == '_NETWORK':
self.current_network_id += 1
print(f"Procesando red {self.current_network_id}...")
i = self._parse_network(lines, i)
else:
i += 1
if len(self.networks) != expected_networks:
print(f"ADVERTENCIA: Se esperaban {expected_networks} redes pero solo se parsearon {len(self.networks)}")
print("Esto puede indicar redes con _EMPTY o estructuras no reconocidas")
def _parse_network(self, lines, start_idx):
"""Parse una red individual con soporte mejorado para operadores LAD"""
network = {
'id': self.current_network_id,
'comment': '',
'logic': None,
'target': '',
'function_blocks': []
}
i = start_idx + 1
# Parse comentario
if i < len(lines) and lines[i].strip() == '_COMMENT':
i, comment = self._parse_comment(lines, i)
network['comment'] = comment
# Parse contenido de la red
while i < len(lines):
line = lines[i].strip()
if line == '_NETWORK':
break
elif line == '_LD_ASSIGN':
i += 1
# Parsear la lógica LAD después de _LD_ASSIGN
i, logic = self._parse_lad_expression(lines, i)
network['logic'] = logic
elif line.startswith('_OUTPUT'):
# Buscar variable de salida
i += 1
while i < len(lines) and lines[i].strip().startswith('_'):
i += 1
if i < len(lines) and lines[i].strip() and 'ENABLELIST' not in lines[i]:
network['target'] = lines[i].strip()
i += 1
else:
i += 1
self.networks.append(network)
print(f" Red {network['id']} agregada. Total redes: {len(self.networks)}")
if network['logic']:
print(f" Con lógica: {network['logic']['type']} - {network['logic'].get('name', 'Sin nombre')}")
print(f" Target: '{network['target']}'")
return i
def _parse_lad_expression(self, lines, start_idx):
"""Parse una expresión LAD recursivamente"""
i = start_idx
while i < len(lines):
line = lines[i].strip()
if line == '_LD_AND':
return self._parse_and_expression(lines, i + 1)
elif line == '_LD_OR':
return self._parse_or_expression(lines, i + 1)
elif line == '_LD_CONTACT':
return self._parse_contact(lines, i + 1)
elif line.startswith('_FUNCTIONBLOCK'):
return self._parse_function_block(lines, i)
elif line == '_EMPTY':
# Red vacía que puede contener funciones después
print(f" Encontrada _EMPTY dentro de _LD_ASSIGN en línea {i}")
i += 1
# Buscar funciones en las siguientes líneas
i, empty_logic = self._parse_empty_network(lines, i)
return i, empty_logic
elif line.startswith('_OUTPUT') or line == 'ENABLELIST : 0':
break
else:
i += 1
return i, None
def _parse_and_expression(self, lines, start_idx):
"""Parse una expresión AND"""
i = start_idx
operands = []
# Buscar operadores
if i < len(lines) and lines[i].strip().startswith('_LD_OPERATOR'):
# Extraer número de operandos
operator_line = lines[i].strip()
num_operands = int(operator_line.split(':')[-1].strip()) if ':' in operator_line else 2
i += 1
# Parse cada operando
for _ in range(num_operands):
i, operand = self._parse_lad_expression(lines, i)
if operand:
operands.append(operand)
return i, {'type': 'AND', 'operands': operands}
def _parse_or_expression(self, lines, start_idx):
"""Parse una expresión OR"""
i = start_idx
operands = []
# Buscar operadores
if i < len(lines) and lines[i].strip().startswith('_LD_OPERATOR'):
# Extraer número de operandos
operator_line = lines[i].strip()
num_operands = int(operator_line.split(':')[-1].strip()) if ':' in operator_line else 2
i += 1
# Parse cada operando
for _ in range(num_operands):
i, operand = self._parse_lad_expression(lines, i)
if operand:
operands.append(operand)
return i, {'type': 'OR', 'operands': operands}
def _parse_contact(self, lines, start_idx):
"""Parse un contacto LAD"""
i = start_idx
contact_name = ""
negated = False
# Obtener nombre del contacto
if i < len(lines):
contact_name = lines[i].strip()
i += 1
# Verificar si hay expresión
if i < len(lines) and lines[i].strip() == '_EXPRESSION':
i += 1
# Verificar si está negado
if i < len(lines):
if lines[i].strip() == '_NEGATIV':
negated = True
i += 1
elif lines[i].strip() == '_POSITIV':
i += 1
return i, {'type': 'CONTACT', 'name': contact_name, 'negated': negated}
def _parse_function_block(self, lines, start_idx):
"""Parse un bloque de función o llamada a ACTION"""
i = start_idx + 1
fb_name = ""
inputs = []
action_call = None
max_iterations = 50 # Protección contra bucles infinitos
iterations = 0
if i < len(lines):
fb_name = lines[i].strip()
i += 1
# Verificar si es una llamada a ACTION (patrón ??? seguido de namespace.actionname)
if fb_name == "???":
# Buscar el nombre completo de la ACTION más adelante
action_call = self._find_action_call_name(lines, i)
if action_call:
print(f" Detectada llamada a ACTION: {action_call}")
return i, {'type': 'ACTION_CALL', 'name': action_call, 'inputs': []}
# Parse inputs del function block normal
while (i < len(lines) and
not lines[i].strip().startswith('_OUTPUT') and
iterations < max_iterations):
line = lines[i].strip()
iterations += 1
if line.startswith('_OPERAND'):
i += 2 # Saltar _EXPRESSION
if i < len(lines):
inputs.append(lines[i].strip())
i += 1
elif line and not line.startswith('_'):
# Podría ser el nombre de la ACTION
if '.' in line and action_call is None:
action_call = line
print(f" Detectada llamada a ACTION: {action_call}")
return i, {'type': 'ACTION_CALL', 'name': action_call, 'inputs': inputs}
elif line.startswith('ENABLELIST'):
# Fin del bloque
break
else:
i += 1
# Salida de emergencia del bucle
if iterations >= max_iterations:
print(f" ADVERTENCIA: Bucle infinito evitado en function block en línea {start_idx}")
break
return i, {'type': 'FUNCTION_BLOCK', 'name': fb_name, 'inputs': inputs}
def _find_action_call_name(self, lines, start_idx):
"""Buscar el nombre completo de la ACTION después de ???"""
i = start_idx
while i < len(lines) and i < start_idx + 10: # Buscar en las próximas 10 líneas
line = lines[i].strip()
# Buscar patrón namespace.actionname
if '.' in line and not line.startswith('_') and 'ENABLELIST' not in line:
return line
i += 1
return None
def _parse_comment(self, lines, start_idx):
"""Parse comentario"""
i = start_idx + 1
comment_lines = []
while i < len(lines):
line = lines[i].strip()
if line == '_END_COMMENT':
break
if line and not line.startswith('_'):
comment_lines.append(line)
i += 1
return i + 1, ' '.join(comment_lines)
def _parse_empty_network(self, lines, start_idx):
"""Parse una red que empieza con _EMPTY y puede contener funciones"""
i = start_idx
max_iterations = 20
iterations = 0
function_found = None
target_found = None
print(f" Entrando a _parse_empty_network desde línea {start_idx}")
while (i < len(lines) and
iterations < max_iterations and
not lines[i].strip().startswith('_OUTPUT') and
not lines[i].strip() == '_NETWORK'):
line = lines[i].strip()
iterations += 1
print(f" Línea {i}: '{line}'")
if line.startswith('_FUNCTIONBLOCK'):
# Encontrada llamada a función/ACTION
print(f" ENCONTRADO _FUNCTIONBLOCK en línea {i}")
i, logic = self._parse_function_block(lines, i)
function_found = logic
elif line.startswith('_FUNCTION'):
# Otra variante de función
print(f" ENCONTRADO _FUNCTION en línea {i}")
i += 1
# Buscar el nombre de la función
while i < len(lines) and i < start_idx + 10:
func_line = lines[i].strip()
print(f" Buscando nombre función línea {i}: '{func_line}'")
if (func_line and
not func_line.startswith('_') and
'ENABLELIST' not in func_line):
print(f" ENCONTRADO nombre función: {func_line}")
function_found = {'type': 'FUNCTION_CALL', 'name': func_line, 'inputs': []}
break
i += 1
elif line.startswith('ENABLELIST'):
print(f" Encontrado ENABLELIST, continuando búsqueda...")
# No terminar aquí, continuar buscando _ASSIGN
i += 1
elif line.startswith('_ASSIGN'):
print(f" ENCONTRADO _ASSIGN en línea {i}")
# Buscar función dentro de _ASSIGN
i += 1
i, assign_logic = self._parse_assign_section(lines, i)
if assign_logic:
function_found = assign_logic
elif line.startswith('_OUTPUT'):
# Buscar el target después de _OUTPUT
print(f" ENCONTRADO _OUTPUT, buscando target...")
i += 1
while i < len(lines) and lines[i].strip().startswith('_'):
i += 1
if i < len(lines) and lines[i].strip() and 'ENABLELIST' not in lines[i]:
target_found = lines[i].strip()
print(f" ENCONTRADO target: {target_found}")
break
else:
i += 1
# Si encontramos una función, crear red independiente
if function_found and target_found:
print(f" Creando red independiente para función con target: {target_found}")
self._create_function_network(function_found, target_found)
return i, function_found
elif function_found:
print(f" Función encontrada pero sin target específico")
# Crear target por defecto basado en el tipo de función
if function_found['type'] == 'ACTION_CALL':
default_target = 'mDummy' # Variable dummy típica para ACTION calls
elif function_found['type'] == 'FUNCTION_CALL':
# Para funciones como ProductLiterInTank, usar la variable global correspondiente
if function_found['name'] == 'gProductTankLevel':
default_target = 'gTankProdAmount' # Variable que se asigna según el contexto
else:
default_target = 'mDummy'
else:
# Caso por defecto para otros tipos de función (FUNCTION_BLOCK, etc.)
default_target = 'mDummy'
print(f" Usando target por defecto: {default_target}")
self._create_function_network(function_found, default_target)
return i, function_found
print(f" _parse_empty_network terminó sin encontrar función")
return i, None
def _parse_assign_section(self, lines, start_idx):
"""Parse una sección _ASSIGN que puede contener funciones"""
i = start_idx
max_iterations = 15
iterations = 0
print(f" Entrando a _parse_assign_section desde línea {start_idx}")
while (i < len(lines) and
iterations < max_iterations and
not lines[i].strip() == '_NETWORK' and
not lines[i].strip() == 'ENABLELIST_END'):
line = lines[i].strip()
iterations += 1
print(f" Línea {i}: '{line}'")
if line.startswith('_FUNCTIONBLOCK'):
print(f" ENCONTRADO _FUNCTIONBLOCK en _ASSIGN: línea {i}")
i, logic = self._parse_function_block(lines, i)
return i, logic
elif line.startswith('_FUNCTION'):
print(f" ENCONTRADO _FUNCTION en _ASSIGN: línea {i}")
# Buscar el nombre de la función en las siguientes líneas
func_name = None
j = i + 1
while j < len(lines) and j < i + 8:
func_line = lines[j].strip()
print(f" Buscando nombre función línea {j}: '{func_line}'")
if (func_line and
not func_line.startswith('_') and
'ENABLELIST' not in func_line and
':' not in func_line):
func_name = func_line
print(f" ENCONTRADO nombre función: {func_name}")
break
j += 1
if func_name:
return j, {'type': 'FUNCTION_CALL', 'name': func_name, 'inputs': []}
else:
i += 1
elif line == 'ENABLELIST_END':
print(f" Encontrado ENABLELIST_END, terminando")
break
else:
i += 1
print(f" _parse_assign_section terminó sin encontrar función")
return i, None
def _create_function_network(self, function_logic, target_name):
"""Crear una red independiente para una función o ACTION call"""
self.current_network_id += 1
function_network = {
'id': self.current_network_id,
'comment': f'Llamada a función: {function_logic.get("name", "unknown")}',
'logic': function_logic,
'target': target_name,
'function_blocks': []
}
self.networks.append(function_network)
print(f" Red de función {function_network['id']} creada para {function_logic['type']}: {function_logic.get('name', 'Sin nombre')}")
print(f" Target: '{target_name}'")
return function_network
def convert_to_structured(self):
"""Convertir a código SCL completo con variables y ACTIONs"""
output = []
# Header del archivo
output.append("(* Código SCL generado desde LAD TwinCAT *)")
output.append("(* Convertidor mejorado con SymPy - Estructura DNF preferida *)")
if self.program_path:
output.append(f"(* Path original: {self.program_path} *)")
output.append("")
# Declaración del programa
program_name = self.program_name if self.program_name else "ConvertedProgram"
output.append(f"PROGRAM {program_name}")
# Declaraciones de variables
self._add_variable_sections(output)
# Inicio del código principal
output.append("")
output.append("(* === CÓDIGO PRINCIPAL === *)")
output.append("")
# Lógica de las redes LAD
for network in self.networks:
output.append(f" // Red {network['id']}")
if network['comment']:
output.append(f" // {network['comment']}")
if network['logic'] and network['target']:
# Verificar si es una llamada a ACTION sin condiciones
if (network['logic']['type'] == 'ACTION_CALL' and
network['target'] in ['mDummy', 'EN_Out']): # Variables dummy típicas
# Es una llamada a ACTION incondicional
action_call = self._convert_logic_to_string(network['logic'])
output.append(f" {action_call};")
output.append(f" {network['target']} := TRUE; // ACTION ejecutada")
elif (network['logic']['type'] == 'FUNCTION_CALL' and
network['target'] in ['mDummy', 'EN_Out', 'gTankProdAmount']): # Variables típicas
# Es una llamada a FUNCTION incondicional
function_call = self._convert_logic_to_string(network['logic'])
output.append(f" {network['target']} := {function_call};")
else:
# Usar expresión DNF optimizada si está disponible
if network['id'] in self.sympy_expressions:
sympy_expr = self.sympy_expressions[network['id']]
condition_str = self._format_dnf_for_lad(sympy_expr)
output.append(f" // Optimizada con SymPy DNF")
else:
# Fallback al método original
condition_str = self._convert_logic_to_string(network['logic'])
output.append(f" // Sin optimización SymPy")
if condition_str:
# Si hay saltos de línea en la condición (múltiples términos OR)
if '\n' in condition_str:
output.append(f" IF {condition_str} THEN")
else:
output.append(f" IF {condition_str} THEN")
output.append(f" {network['target']} := TRUE;")
output.append(" ELSE")
output.append(f" {network['target']} := FALSE;")
output.append(" END_IF;")
else:
output.append(f" {network['target']} := TRUE; // Logic no reconocida")
output.append("")
# Fin del programa principal
output.append("END_PROGRAM")
# Agregar ACTIONs como subfunciones
self._add_actions_as_procedures(output, program_name)
return '\n'.join(output)
def _add_variable_sections(self, output):
"""Agregar todas las secciones de variables al código"""
# Orden preferido de las secciones
section_order = ['VAR_INPUT', 'VAR_OUTPUT', 'VAR_IN_OUT', 'VAR']
for section_type in section_order:
if section_type in self.var_sections:
output.append(f"{section_type}")
for var_line in self.var_sections[section_type]:
# Asegurar que termine con punto y coma
if not var_line.endswith(';'):
var_line += ' ;'
output.append(f" {var_line}")
output.append("END_VAR")
output.append("")
def _add_actions_as_procedures(self, output, program_name):
"""Agregar ACTIONs como procedimientos independientes"""
if not self.actions:
return
output.append("")
output.append("(* === SUBFUNCIONES (ACTIONs convertidas) === *)")
output.append("")
for action_name, action_code in self.actions.items():
# Nombre completo como se usa en TwinCAT
full_name = f"{program_name}_{action_name}"
output.append(f"PROCEDURE {full_name}")
output.append("(* Convertida desde ACTION *)")
output.append("")
# Agregar el código de la ACTION con indentación
for line in action_code.split('\n'):
if line.strip():
output.append(f" {line.strip()}")
else:
output.append("")
output.append("")
output.append("END_PROCEDURE")
output.append("")
def _convert_logic_to_string(self, logic):
"""Convertir lógica LAD a string estructurado"""
if not logic:
return ""
if logic['type'] == 'CONTACT':
if logic['negated']:
return f"NOT {logic['name']}"
else:
return logic['name']
elif logic['type'] == 'AND':
operand_strings = []
for operand in logic['operands']:
operand_str = self._convert_logic_to_string(operand)
if operand_str:
operand_strings.append(operand_str)
if len(operand_strings) > 1:
return "(" + " AND ".join(operand_strings) + ")"
elif len(operand_strings) == 1:
return operand_strings[0]
else:
return ""
elif logic['type'] == 'OR':
operand_strings = []
for operand in logic['operands']:
operand_str = self._convert_logic_to_string(operand)
if operand_str:
operand_strings.append(operand_str)
if len(operand_strings) > 1:
return "(" + " OR ".join(operand_strings) + ")"
elif len(operand_strings) == 1:
return operand_strings[0]
else:
return ""
elif logic['type'] == 'FUNCTION_BLOCK':
inputs_str = ", ".join(logic['inputs']) if logic['inputs'] else ""
return f"{logic['name']}({inputs_str})"
elif logic['type'] == 'ACTION_CALL':
return f"CALL {logic['name']}()"
elif logic['type'] == 'FUNCTION_CALL':
return f"{logic['name']}()"
return ""
def save_to_file(self, output_path):
"""Guardar código estructurado"""
structured_code = self.convert_to_structured()
with open(output_path, 'w', encoding='utf-8') as f:
f.write(structured_code)
return structured_code
def print_debug_info(self):
"""Mostrar información de debug sobre los networks parseados"""
print(f"\n=== DEBUG INFO - {len(self.networks)} networks encontrados ===")
for network in self.networks:
print(f"\nRed {network['id']}:")
if network['comment']:
print(f" Comentario: {network['comment']}")
print(f" Target: {network['target']}")
if network['logic']:
print(f" Lógica: {self._debug_logic_string(network['logic'])}")
condition_str = self._convert_logic_to_string(network['logic'])
print(f" Condición: {condition_str}")
else:
print(" Sin lógica")
def _debug_logic_string(self, logic, indent=0):
"""Crear string de debug para la lógica"""
if not logic:
return "None"
prefix = " " * indent
if logic['type'] == 'CONTACT':
neg_str = " (NEGADO)" if logic['negated'] else ""
return f"{prefix}CONTACT: {logic['name']}{neg_str}"
elif logic['type'] == 'AND':
result = f"{prefix}AND:\n"
for operand in logic['operands']:
result += self._debug_logic_string(operand, indent + 1) + "\n"
return result.rstrip()
elif logic['type'] == 'OR':
result = f"{prefix}OR:\n"
for operand in logic['operands']:
result += self._debug_logic_string(operand, indent + 1) + "\n"
return result.rstrip()
elif logic['type'] == 'FUNCTION_BLOCK':
return f"{prefix}FUNCTION_BLOCK: {logic['name']} inputs: {logic['inputs']}"
elif logic['type'] == 'ACTION_CALL':
return f"{prefix}ACTION_CALL: {logic['name']}"
elif logic['type'] == 'FUNCTION_CALL':
return f"{prefix}FUNCTION_CALL: {logic['name']}"
return f"{prefix}UNKNOWN: {logic}"
def _logic_to_sympy(self, logic):
"""Convertir lógica LAD a expresión SymPy"""
if not logic:
return None
if logic['type'] == 'CONTACT':
symbol = self.symbol_manager.get_symbol(logic['name'])
if logic['negated']:
return Not(symbol)
else:
return symbol
elif logic['type'] == 'AND':
sympy_operands = []
for operand in logic['operands']:
operand_sympy = self._logic_to_sympy(operand)
if operand_sympy is not None:
sympy_operands.append(operand_sympy)
if len(sympy_operands) > 1:
return And(*sympy_operands)
elif len(sympy_operands) == 1:
return sympy_operands[0]
else:
return None
elif logic['type'] == 'OR':
sympy_operands = []
for operand in logic['operands']:
operand_sympy = self._logic_to_sympy(operand)
if operand_sympy is not None:
sympy_operands.append(operand_sympy)
if len(sympy_operands) > 1:
return Or(*sympy_operands)
elif len(sympy_operands) == 1:
return sympy_operands[0]
else:
return None
elif logic['type'] == 'FUNCTION_BLOCK':
# Para function blocks, creamos un símbolo especial
fb_name = f"{logic['name']}({', '.join(logic['inputs'])})"
return self.symbol_manager.get_symbol(fb_name)
elif logic['type'] == 'ACTION_CALL':
# Para llamadas a ACTION, creamos un símbolo especial
action_name = f"CALL_{logic['name'].replace('.', '_')}"
return self.symbol_manager.get_symbol(action_name)
return None
def _sympy_to_structured_string(self, sympy_expr):
"""Convertir expresión SymPy a string estructurado"""
if sympy_expr is None:
return ""
# Crear un mapeo inverso de símbolos a nombres originales
symbol_to_name = {}
for original_name, symbol in self.symbol_manager._symbol_cache.items():
symbol_to_name[str(symbol)] = original_name
# Convertir la expresión a string
str_expr = str(sympy_expr)
# Reemplazar símbolos por nombres originales
for symbol_str, original_name in symbol_to_name.items():
str_expr = str_expr.replace(symbol_str, original_name)
# Reemplazar operadores SymPy por operadores IEC61131-3
str_expr = str_expr.replace('&', ' AND ')
str_expr = str_expr.replace('|', ' OR ')
str_expr = str_expr.replace('~', 'NOT ')
# Limpiar espacios múltiples
str_expr = re.sub(r'\s+', ' ', str_expr)
return str_expr.strip()
def optimize_expressions(self):
"""Optimizar todas las expresiones usando SymPy - Preferir DNF para LAD"""
print("\n=== Optimizando expresiones con SymPy (forzando DNF para LAD) ===")
for network in self.networks:
if network['logic']:
print(f"\nOptimizando Red {network['id']}:")
# Convertir a SymPy
sympy_expr = self._logic_to_sympy(network['logic'])
if sympy_expr:
print(f" Expresión original: {sympy_expr}")
# Simplificar primero
try:
simplified = simplify(sympy_expr)
print(f" Simplificada: {simplified}")
# Verificar complejidad antes de DNF
num_symbols = len(simplified.free_symbols)
complexity = self._estimate_expression_complexity(simplified)
if num_symbols > 12 or complexity > 800:
print(f" ADVERTENCIA: Expresión muy compleja ({num_symbols} símbolos, complejidad {complexity})")
print(f" Saltando conversión DNF por rendimiento - usando simplificación básica")
final_expr = simplified
else:
# SIEMPRE convertir a DNF para LAD (forma natural: (AND) OR (AND) OR (AND))
dnf_expr = to_dnf(simplified)
print(f" DNF (forma LAD preferida): {dnf_expr}")
# Post-procesar para eliminar contradicciones
final_expr = self._post_process_expression(dnf_expr)
# Verificar si el post-procesamiento cambió algo
if str(final_expr) != str(dnf_expr):
print(f" Post-procesada: {final_expr}")
self.sympy_expressions[network['id']] = final_expr
except Exception as e:
print(f" Error optimizando: {e}")
self.sympy_expressions[network['id']] = sympy_expr
def group_common_conditions(self):
"""Agrupar networks con condiciones similares o complementarias"""
print("\n=== Analizando agrupación de condiciones ===")
# Buscar networks que podrían agruparse
groupable_networks = []
for network in self.networks:
if (network['logic'] and network['target'] and
network['id'] in self.sympy_expressions):
groupable_networks.append(network)
if len(groupable_networks) < 2:
print("No hay suficientes networks para agrupar")
return
# Analizar condiciones comunes
print(f"Analizando {len(groupable_networks)} networks para agrupación:")
for i, net1 in enumerate(groupable_networks):
for j, net2 in enumerate(groupable_networks[i+1:], i+1):
expr1 = self.sympy_expressions[net1['id']]
expr2 = self.sympy_expressions[net2['id']]
# Buscar factores comunes
try:
# Extraer factores comunes si es posible
common_factors = self._find_common_factors(expr1, expr2)
if common_factors:
print(f" Red {net1['id']} y Red {net2['id']} comparten: {common_factors}")
# Verificar si son complementarias (útil para SET/RESET)
if self._are_complementary(expr1, expr2):
print(f" Red {net1['id']} y Red {net2['id']} son complementarias")
except Exception as e:
print(f" Error analizando Red {net1['id']} y Red {net2['id']}: {e}")
def _find_common_factors(self, expr1, expr2):
"""Encontrar factores comunes entre dos expresiones"""
try:
# Convertir a conjuntos de símbolos para análisis básico
symbols1 = expr1.free_symbols
symbols2 = expr2.free_symbols
common_symbols = symbols1.intersection(symbols2)
if len(common_symbols) > 1:
return f"{len(common_symbols)} símbolos comunes"
return None
except:
return None
def _are_complementary(self, expr1, expr2):
"""Verificar si dos expresiones son complementarias"""
try:
# Verificar si expr1 == NOT(expr2) simplificado
complement = Not(expr2)
simplified_complement = simplify(complement)
simplified_expr1 = simplify(expr1)
return simplified_expr1.equals(simplified_complement)
except:
return False
def _estimate_expression_complexity(self, expr):
"""Estimar la complejidad de una expresión para evitar explosión exponencial"""
try:
# Contar operadores AND/OR anidados
complexity = 0
expr_str = str(expr)
# Contar número de operadores
and_count = expr_str.count('&')
or_count = expr_str.count('|')
not_count = expr_str.count('~')
complexity += and_count * 2 # AND
complexity += or_count * 4 # OR (más costoso en DNF)
complexity += not_count * 1 # NOT
# Penalizar anidamiento profundo
depth = expr_str.count('(')
complexity += depth * 6
# Número de símbolos únicos
num_symbols = len(expr.free_symbols)
complexity += num_symbols * 15
# Detectar patrones problemáticos específicos
# CNF con muchos términos (patrón típico que causa explosión)
if and_count > 10 and or_count > 15:
complexity += 2000 # Penalización severa para CNF complejas
# Expresiones con muchos términos negativos (difíciles para DNF)
if not_count > 8:
complexity += 500
# Longitud total como indicador de complejidad
if len(expr_str) > 1000:
complexity += len(expr_str) // 2
return complexity
except:
return 999999 # Asumir muy complejo si hay error
def _post_process_expression(self, expr):
"""Post-procesar expresión para eliminar contradicciones obvias"""
try:
# Detectar contradicciones como X & ~X que deberían ser False
cleaned_expr = expr
# Aplicar simplificaciones adicionales
cleaned_expr = simplify(cleaned_expr)
# Si la expresión contiene contradicciones obvias, intentar limpiar
free_symbols = cleaned_expr.free_symbols
for symbol in free_symbols:
# Verificar si tenemos symbol & ~symbol en alguna parte
contradiction = And(symbol, Not(symbol))
if cleaned_expr.has(contradiction):
print(f" Detectada contradicción eliminable: {symbol} AND NOT {symbol}")
# Reemplazar contradicción por False
cleaned_expr = cleaned_expr.replace(contradiction, sympy.false)
cleaned_expr = simplify(cleaned_expr)
return cleaned_expr
except:
return expr
def _format_dnf_for_lad(self, sympy_expr):
"""Formatear expresión DNF para código LAD más legible"""
if sympy_expr is None:
return ""
# Crear mapeo de símbolos a nombres originales
symbol_to_name = {}
for original_name, symbol in self.symbol_manager._symbol_cache.items():
symbol_to_name[str(symbol)] = original_name
# Convertir a string y analizar la estructura
str_expr = str(sympy_expr)
# Reemplazar símbolos por nombres originales
for symbol_str, original_name in symbol_to_name.items():
str_expr = str_expr.replace(symbol_str, original_name)
# Reemplazar operadores SymPy por IEC61131-3 primero
str_expr = str_expr.replace('&', ' AND ')
str_expr = str_expr.replace('|', ' OR ')
str_expr = str_expr.replace('~', 'NOT ')
# Limpiar espacios múltiples
str_expr = re.sub(r'\s+', ' ', str_expr).strip()
# Si es una expresión OR de términos AND principales, formatear cada término
# Buscar el patrón de OR principales (no anidados en paréntesis)
if ' OR ' in str_expr and not str_expr.startswith('('):
# Dividir por OR de nivel principal
# Esto es más complejo debido a paréntesis anidados
parts = self._split_main_or_terms(str_expr)
if len(parts) > 1:
formatted_terms = []
for part in parts:
part = part.strip()
# Asegurar que cada término tenga paréntesis si es complejo
if ' AND ' in part and not (part.startswith('(') and part.endswith(')')):
part = f"({part})"
formatted_terms.append(part)
# Unir con OR y saltos de línea para mejor legibilidad
return '\n OR '.join(formatted_terms)
return str_expr
def _split_main_or_terms(self, expr):
"""Dividir expresión por OR de nivel principal, respetando paréntesis"""
parts = []
current_part = ""
paren_level = 0
i = 0
while i < len(expr):
char = expr[i]
if char == '(':
paren_level += 1
current_part += char
elif char == ')':
paren_level -= 1
current_part += char
elif paren_level == 0 and expr[i:i+4] == ' OR ':
# OR de nivel principal encontrado
parts.append(current_part.strip())
current_part = ""
i += 3 # Saltar ' OR '
else:
current_part += char
i += 1
# Agregar la última parte
if current_part.strip():
parts.append(current_part.strip())
return parts if len(parts) > 1 else [expr]
def _parse_actions(self, content):
"""Extraer todas las ACTIONs del programa"""
# Buscar patrón ACTION nombre: ... END_ACTION
action_pattern = r'ACTION\s+(\w+)\s*:(.*?)END_ACTION'
action_matches = re.findall(action_pattern, content, re.DOTALL | re.IGNORECASE)
for action_name, action_code in action_matches:
# Limpiar el código de la ACTION
clean_code = action_code.strip()
self.actions[action_name] = clean_code
print(f"ACTION encontrada: {action_name} ({len(clean_code)} caracteres)")
print(f"Total ACTIONs: {len(self.actions)}")
def main():
"""Función principal - Convierte todos los archivos .EXP a .SCL"""
try:
print("=== Convertidor Masivo LAD a SCL con SymPy ===")
# Cargar configuración
configs = load_configuration()
# Verificar que se cargó correctamente
if not configs:
print("Advertencia: No se pudo cargar la configuración, usando valores por defecto")
working_directory = "./"
scl_output_dir = "scl"
debug_mode = True
show_optimizations = True
show_generated_code = False
max_display_lines = 50
force_regenerate = False
else:
# Obtener configuraciones
working_directory = configs.get("working_directory", "./")
level1_config = configs.get("level1", {})
level2_config = configs.get("level2", {})
level3_config = configs.get("level3", {})
# Parámetros de configuración
debug_mode = level1_config.get("debug_mode", True)
show_optimizations = level1_config.get("show_optimizations", True)
scl_output_dir = level2_config.get("scl_output_dir", "scl")
backup_existing = level2_config.get("backup_existing", True)
show_generated_code = level2_config.get("show_generated_code", False)
max_display_lines = level2_config.get("max_display_lines", 50)
sympy_optimization = level3_config.get("sympy_optimization", True)
group_analysis = level3_config.get("group_analysis", True)
force_regenerate = level2_config.get("force_regenerate", False) # Nueva opción
# Verificar directorio de trabajo
if not os.path.exists(working_directory):
print(f"Error: El directorio de trabajo no existe: {working_directory}")
return
# Crear directorio de salida SCL
full_scl_path = os.path.join(working_directory, scl_output_dir)
if not os.path.exists(full_scl_path):
os.makedirs(full_scl_path)
print(f"Directorio creado: {full_scl_path}")
# Buscar todos los archivos .EXP
exp_pattern = os.path.join(working_directory, "*.EXP")
exp_files = glob.glob(exp_pattern)
if not exp_files:
print(f"No se encontraron archivos .EXP en: {working_directory}")
return
print(f"Encontrados {len(exp_files)} archivos .EXP en: {working_directory}")
print(f"Directorio de salida SCL: {full_scl_path}")
print()
# Procesar cada archivo
successful_conversions = 0
failed_conversions = 0
for exp_file in exp_files:
filename = os.path.basename(exp_file)
base_name = os.path.splitext(filename)[0]
scl_filename = f"{base_name}.scl"
scl_output_path = os.path.join(full_scl_path, scl_filename)
# Verificar si ya existe el archivo SCL (exportación progresiva)
if os.path.exists(scl_output_path) and not force_regenerate:
print(f"{'='*60}")
print(f"SALTANDO: {filename} - Ya existe {scl_filename}")
print(f" (usa force_regenerate: true en configuración para forzar regeneración)")
successful_conversions += 1 # Contar como exitoso
continue
print(f"{'='*60}")
print(f"Procesando: {filename}")
print(f"Salida: {scl_filename}")
try:
# Crear nuevo convertidor para cada archivo
converter = SimpleLadConverter()
# Parsear archivo
converter.parse_file(exp_file)
print(f" ✓ Redes encontradas: {len(converter.networks)}")
print(f" ✓ Secciones de variables: {list(converter.var_sections.keys())}")
print(f" ✓ ACTIONs encontradas: {list(converter.actions.keys())}")
# Mostrar información de debug si está habilitado
if debug_mode:
converter.print_debug_info()
# Optimizar expresiones con SymPy si está habilitado
if sympy_optimization and show_optimizations:
converter.optimize_expressions()
# Analizar agrupación de condiciones si está habilitado
if group_analysis and show_optimizations:
converter.group_common_conditions()
# Convertir y guardar
print(f" Generando código SCL...")
structured_code = converter.save_to_file(scl_output_path)
# Mostrar parte del código generado si está habilitado
if show_generated_code:
lines = structured_code.split('\n')
display_lines = min(max_display_lines, len(lines))
print(f" Código SCL generado ({len(lines)} líneas, mostrando {display_lines}):")
for i in range(display_lines):
print(f" {i+1:3d}: {lines[i]}")
if len(lines) > display_lines:
print(f" ... ({len(lines) - display_lines} líneas más)")
print(f" ✓ Guardado en: {scl_output_path}")
successful_conversions += 1
except Exception as e:
print(f" ✗ Error procesando {filename}: {e}")
if debug_mode:
import traceback
traceback.print_exc()
failed_conversions += 1
print()
# Resumen final
print(f"{'='*60}")
print(f"RESUMEN DE CONVERSIÓN:")
print(f" ✓ Exitosas: {successful_conversions}")
print(f" ✗ Fallidas: {failed_conversions}")
print(f" 📁 Directorio salida: {full_scl_path}")
if successful_conversions > 0:
print(f"\n✓ Conversión masiva completada!")
except Exception as e:
print(f"Error general: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()