Mejorado de la conversion LAD de Twincat

This commit is contained in:
Miguel 2025-06-19 14:45:27 +02:00
parent c597eaa28f
commit 205e1f4c8d
7 changed files with 807 additions and 1104 deletions

View File

@ -1,71 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Ejemplo de uso del convertidor LAD a pseudocódigo estructurado
"""
from lad_to_pseudocode_converter import LadToStructuredConverter
def convertir_input_exp():
"""Ejemplo de conversión del archivo INPUT.EXP"""
# Crear instancia del convertidor
converter = LadToStructuredConverter()
try:
# Cargar y parsear el archivo LAD
print("Parseando archivo INPUT.EXP...")
converter.parse_file(".example/INPUT.EXP")
# Convertir a código estructurado
print("Convirtiendo a pseudocódigo estructurado...")
structured_code = converter.convert_to_structured()
# Guardar resultado
output_file = "INPUT_pseudocode.txt"
converter.save_to_file(output_file)
# Mostrar estadísticas
print(f"\n=== Estadísticas de conversión ===")
print(f"Redes procesadas: {len(converter.networks)}")
print(f"Archivo de salida: {output_file}")
# Mostrar las primeras líneas como ejemplo
print(f"\n=== Primeras líneas del resultado ===")
lines = structured_code.split('\n')
for i, line in enumerate(lines[:30]):
print(f"{i+1:3d}: {line}")
if len(lines) > 30:
print("...")
print(f"(Total de {len(lines)} líneas)")
return True
except Exception as e:
print(f"Error durante la conversión: {e}")
return False
def mostrar_detalles_red(converter, network_id):
"""Muestra los detalles de una red específica"""
if network_id <= len(converter.networks):
network = converter.networks[network_id - 1]
print(f"\n=== Detalles de Red {network_id} ===")
print(f"Comentario: {network.comment}")
print(f"Elementos: {len(network.elements)}")
for i, element in enumerate(network.elements):
print(f" {i+1}. {element.element_type.value}: {element.name}")
if element.expression:
print(f" Expresión: {element.expression}")
if element.parameters:
print(f" Parámetros: {element.parameters}")
if __name__ == "__main__":
print("=== Convertidor LAD a Pseudocódigo Estructurado ===")
print("Procesando archivo INPUT.EXP de TwinCAT...")
if convertir_input_exp():
print("\n✓ Conversión completada exitosamente!")
else:
print("\n✗ Error en la conversión")

View File

@ -1,391 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Convertidor de código LAD (Ladder Diagram) de TwinCAT a pseudocódigo estructurado
Compatible con IEC61131-3
Autor: Asistente AI
Fecha: 2024
"""
import re
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ElementType(Enum):
CONTACT = "CONTACT"
COIL = "COIL"
FUNCTION_BLOCK = "FUNCTION_BLOCK"
OPERATOR = "OPERATOR"
EXPRESSION = "EXPRESSION"
ASSIGN = "ASSIGN"
@dataclass
class LadderElement:
"""Representa un elemento de la escalera LAD"""
element_type: ElementType
name: str
expression: str = ""
positive: bool = True
enabled: bool = True
parameters: List[str] = None
outputs: List[str] = None
def __post_init__(self):
if self.parameters is None:
self.parameters = []
if self.outputs is None:
self.outputs = []
@dataclass
class LadderNetwork:
"""Representa una red completa de LAD"""
network_id: int
comment: str = ""
elements: List[LadderElement] = None
def __post_init__(self):
if self.elements is None:
self.elements = []
class LadToStructuredConverter:
"""Conversor principal de LAD a código estructurado"""
def __init__(self):
self.networks: List[LadderNetwork] = []
self.current_network: Optional[LadderNetwork] = None
self.output_lines: List[str] = []
def parse_file(self, file_path: str) -> None:
"""Parse el archivo LAD completo"""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Encontrar la sección LAD
lad_start = content.find('_LD_BODY')
if lad_start == -1:
raise ValueError("No se encontró sección _LD_BODY en el archivo")
lad_content = content[lad_start:]
self._parse_lad_content(lad_content)
def _parse_lad_content(self, content: str) -> None:
"""Parse el contenido LAD línea por línea"""
lines = content.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
if line == '_NETWORK':
i = self._parse_network(lines, i)
else:
i += 1
def _parse_network(self, lines: List[str], start_idx: int) -> int:
"""Parse una red individual"""
network_id = len(self.networks) + 1
network = LadderNetwork(network_id=network_id)
self.current_network = network
i = start_idx + 1
while i < len(lines) and not lines[i].strip().startswith('_NETWORK'):
line = lines[i].strip()
if line == '_COMMENT':
i, comment = self._parse_comment(lines, i)
network.comment = comment
elif line == '_LD_ASSIGN':
i = self._parse_assignment(lines, i)
elif line == '_LD_CONTACT':
i = self._parse_contact(lines, i)
elif line == '_LD_AND' or line == '_LD_OR':
i = self._parse_logic_operation(lines, i, line)
elif line.startswith('_FUNCTIONBLOCK'):
i = self._parse_function_block(lines, i)
elif line.startswith('_OPERATOR'):
i = self._parse_operator(lines, i)
else:
i += 1
self.networks.append(network)
return i
def _parse_comment(self, lines: List[str], start_idx: int) -> tuple[int, str]:
"""Parse un comentario"""
i = start_idx + 1
comment_lines = []
while i < len(lines):
line = lines[i].strip()
if line == '_END_COMMENT':
break
if line and not line.startswith('_'):
comment_lines.append(line)
i += 1
return i + 1, '\n'.join(comment_lines)
def _parse_assignment(self, lines: List[str], start_idx: int) -> int:
"""Parse una asignación"""
i = start_idx + 1
# Buscar el elemento a asignar
target_var = ""
source_expr = ""
while i < len(lines):
line = lines[i].strip()
if line == '_EMPTY':
i += 1
continue
elif line == '_EXPRESSION':
i += 1
continue
elif line == '_POSITIV' or line == '_NEGATIV':
i += 1
continue
elif line.startswith('_OUTPUT'):
# Siguiente línea debería ser la variable de salida
i += 2 # Saltar _POSITIV/_NEGATIV
i += 1 # Saltar _NO_SET o _SET
if i < len(lines):
target_var = lines[i].strip()
break
elif line == 'ENABLELIST_END':
break
elif not line.startswith('_') and not line.startswith('ENABLELIST'):
if not target_var:
source_expr = line
i += 1
if target_var:
element = LadderElement(
element_type=ElementType.ASSIGN,
name=target_var,
expression=source_expr
)
self.current_network.elements.append(element)
return i + 1
def _parse_contact(self, lines: List[str], start_idx: int) -> int:
"""Parse un contacto"""
i = start_idx + 1
contact_name = ""
is_positive = True
if i < len(lines):
contact_name = lines[i].strip()
i += 1
if i < len(lines):
polarity = lines[i].strip()
is_positive = polarity == '_POSITIV'
i += 1
element = LadderElement(
element_type=ElementType.CONTACT,
name=contact_name,
positive=is_positive
)
self.current_network.elements.append(element)
return i
def _parse_logic_operation(self, lines: List[str], start_idx: int, operation: str) -> int:
"""Parse operaciones lógicas AND/OR"""
i = start_idx + 1
# Extraer el tipo de operación
op_type = operation.replace('_LD_', '')
element = LadderElement(
element_type=ElementType.OPERATOR,
name=op_type
)
self.current_network.elements.append(element)
return i
def _parse_function_block(self, lines: List[str], start_idx: int) -> int:
"""Parse un bloque de función"""
i = start_idx + 1
fb_name = ""
parameters = []
if i < len(lines):
fb_name = lines[i].strip()
i += 1
# Buscar parámetros
while i < len(lines):
line = lines[i].strip()
if line.startswith('_OPERAND'):
i += 2 # Saltar _EXPRESSION y _POSITIV
if i < len(lines):
param = lines[i].strip()
parameters.append(param)
elif line == '_OUTPUTS':
break
i += 1
element = LadderElement(
element_type=ElementType.FUNCTION_BLOCK,
name=fb_name,
parameters=parameters
)
self.current_network.elements.append(element)
return i
def _parse_operator(self, lines: List[str], start_idx: int) -> int:
"""Parse un operador"""
i = start_idx + 1
operator_name = ""
operands = []
while i < len(lines):
line = lines[i].strip()
if line.startswith('_OPERAND'):
i += 2 # Saltar _EXPRESSION y _POSITIV
if i < len(lines):
operand = lines[i].strip()
operands.append(operand)
elif line.startswith('_EXPRESSION') and i + 1 < len(lines):
next_line = lines[i + 1].strip()
if not next_line.startswith('_'):
operator_name = next_line
i += 1
elif line == '_OUTPUTS':
break
i += 1
element = LadderElement(
element_type=ElementType.OPERATOR,
name=operator_name,
parameters=operands
)
self.current_network.elements.append(element)
return i
def convert_to_structured(self) -> str:
"""Convierte las redes LAD a código pseudo estructurado"""
self.output_lines = []
self.output_lines.append("// Código pseudo estructurado generado desde LAD")
self.output_lines.append("// Compatible con IEC61131-3")
self.output_lines.append("")
for network in self.networks:
self._convert_network_to_structured(network)
self.output_lines.append("")
return '\n'.join(self.output_lines)
def _convert_network_to_structured(self, network: LadderNetwork) -> None:
"""Convierte una red a código estructurado"""
self.output_lines.append(f"// Red {network.network_id}")
if network.comment:
comment_lines = network.comment.split('\n')
for line in comment_lines:
if line.strip():
self.output_lines.append(f"// {line.strip()}")
# Procesar elementos de la red
conditions = []
assignments = []
for element in network.elements:
if element.element_type == ElementType.CONTACT:
condition = element.name
if not element.positive:
condition = f"NOT {condition}"
conditions.append(condition)
elif element.element_type == ElementType.ASSIGN:
if conditions:
condition_str = " AND ".join(conditions)
self.output_lines.append(f"IF {condition_str} THEN")
self.output_lines.append(f" {element.name} := {element.expression};")
self.output_lines.append("END_IF;")
else:
self.output_lines.append(f"{element.name} := {element.expression};")
elif element.element_type == ElementType.FUNCTION_BLOCK:
params_str = ", ".join(element.parameters) if element.parameters else ""
if conditions:
condition_str = " AND ".join(conditions)
self.output_lines.append(f"IF {condition_str} THEN")
self.output_lines.append(f" {element.name}({params_str});")
self.output_lines.append("END_IF;")
else:
self.output_lines.append(f"{element.name}({params_str});")
elif element.element_type == ElementType.OPERATOR:
if element.name == "AND":
# AND lógico - ya manejado en condiciones
pass
elif element.name == "OR":
# OR lógico - cambiar estrategia de condiciones
if len(conditions) > 1:
last_condition = conditions.pop()
conditions[-1] = f"({conditions[-1]} OR {last_condition})"
else:
# Otros operadores matemáticos
if element.parameters and len(element.parameters) >= 2:
expr = f"{element.parameters[0]} {element.name} {element.parameters[1]}"
if conditions:
condition_str = " AND ".join(conditions)
self.output_lines.append(f"IF {condition_str} THEN")
self.output_lines.append(f" // Resultado := {expr};")
self.output_lines.append("END_IF;")
else:
self.output_lines.append(f"// Resultado := {expr};")
def save_to_file(self, output_path: str) -> None:
"""Guarda el código estructurado a un archivo"""
structured_code = self.convert_to_structured()
with open(output_path, 'w', encoding='utf-8') as f:
f.write(structured_code)
print(f"Código pseudo estructurado guardado en: {output_path}")
def main():
"""Función principal"""
import sys
if len(sys.argv) != 3:
print("Uso: python lad_to_pseudocode_converter.py <archivo_entrada.EXP> <archivo_salida.txt>")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2]
try:
converter = LadToStructuredConverter()
converter.parse_file(input_file)
converter.save_to_file(output_file)
print(f"Conversión completada exitosamente!")
print(f"Redes procesadas: {len(converter.networks)}")
except Exception as e:
print(f"Error durante la conversión: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,429 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Convertidor mejorado de código LAD (Ladder Diagram) de TwinCAT a pseudocódigo estructurado
Compatible con IEC61131-3
Autor: Asistente AI
Fecha: 2024
"""
import re
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
class LadElementType(Enum):
CONTACT = "CONTACT"
COIL = "COIL"
FUNCTION_BLOCK = "FUNCTION_BLOCK"
OPERATOR = "OPERATOR"
ASSIGNMENT = "ASSIGNMENT"
CONDITION = "CONDITION"
@dataclass
class LadVariable:
"""Representa una variable en LAD"""
name: str
negated: bool = False
value: str = ""
@dataclass
class LadExpression:
"""Representa una expresión en LAD"""
operator: str = ""
operands: List[str] = field(default_factory=list)
result_var: str = ""
@dataclass
class LadNetwork:
"""Representa una red LAD completa"""
id: int
comment: str = ""
contacts: List[LadVariable] = field(default_factory=list)
operators: List[str] = field(default_factory=list)
function_blocks: List[Dict] = field(default_factory=list)
expressions: List[LadExpression] = field(default_factory=list)
outputs: List[LadVariable] = field(default_factory=list)
assignments: List[Dict] = field(default_factory=list)
class EnhancedLadConverter:
"""Convertidor mejorado de LAD a código estructurado"""
def __init__(self):
self.networks: List[LadNetwork] = []
self.output_lines: List[str] = []
def parse_file(self, file_path: str) -> None:
"""Parse el archivo LAD completo"""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Encontrar la sección LAD
lad_start = content.find('_LD_BODY')
if lad_start == -1:
raise ValueError("No se encontró sección _LD_BODY en el archivo")
# Extraer solo la sección LAD hasta END_PROGRAM
lad_end = content.find('END_PROGRAM', lad_start)
if lad_end == -1:
lad_content = content[lad_start:]
else:
lad_content = content[lad_start:lad_end]
self._parse_networks(lad_content)
def _parse_networks(self, content: str) -> None:
"""Parse todas las redes del contenido LAD"""
lines = content.split('\n')
i = 0
network_id = 0
while i < len(lines):
if lines[i].strip() == '_NETWORK':
network_id += 1
i = self._parse_single_network(lines, i, network_id)
else:
i += 1
def _parse_single_network(self, lines: List[str], start_idx: int, network_id: int) -> int:
"""Parse una red individual"""
network = LadNetwork(id=network_id)
i = start_idx + 1
# Parse comment
if i < len(lines) and lines[i].strip() == '_COMMENT':
i, comment = self._extract_comment(lines, i)
network.comment = comment
# Parse network content
while i < len(lines):
line = lines[i].strip()
if line == '_NETWORK':
# Inicio de nueva red
break
elif line == '_LD_ASSIGN':
i = self._parse_assignment_block(lines, i, network)
elif line == '_LD_CONTACT':
i = self._parse_contact(lines, i, network)
elif line == '_LD_AND' or line == '_LD_OR':
network.operators.append(line.replace('_LD_', ''))
i += 1
elif line.startswith('_FUNCTIONBLOCK'):
i = self._parse_function_block_detailed(lines, i, network)
else:
i += 1
self.networks.append(network)
return i
def _extract_comment(self, lines: List[str], start_idx: int) -> Tuple[int, str]:
"""Extrae comentario de la red"""
i = start_idx + 1
comment_lines = []
while i < len(lines):
line = lines[i].strip()
if line == '_END_COMMENT':
break
if line and not line.startswith('_'):
comment_lines.append(line)
i += 1
return i + 1, ' '.join(comment_lines)
def _parse_contact(self, lines: List[str], start_idx: int, network: LadNetwork) -> int:
"""Parse un contacto"""
i = start_idx + 1
if i < len(lines):
contact_name = lines[i].strip()
i += 1
# Verificar polaridad
negated = False
if i < len(lines) and lines[i].strip() == '_NEGATIV':
negated = True
contact = LadVariable(name=contact_name, negated=negated)
network.contacts.append(contact)
return i + 1
def _parse_assignment_block(self, lines: List[str], start_idx: int, network: LadNetwork) -> int:
"""Parse un bloque de asignación completo"""
i = start_idx + 1
assignment_data = {
'conditions': [],
'target_var': '',
'source_expr': '',
'operator': '',
'operands': [],
'function_name': '',
'fb_params': []
}
while i < len(lines):
line = lines[i].strip()
if line == '_NETWORK' or line == 'ENABLELIST_END':
break
elif line == '_LD_CONTACT':
i += 1
if i < len(lines):
contact_name = lines[i].strip()
i += 1
negated = False
if i < len(lines) and lines[i].strip() == '_NEGATIV':
negated = True
i += 1
assignment_data['conditions'].append(LadVariable(contact_name, negated))
elif line.startswith('_FUNCTIONBLOCK'):
# Parse function block dentro de asignación
i, fb_data = self._parse_functionblock_in_assignment(lines, i)
assignment_data.update(fb_data)
elif line.startswith('_OPERATOR'):
i, op_data = self._parse_operator_in_assignment(lines, i)
assignment_data.update(op_data)
elif line.startswith('_OUTPUT'):
# Siguiente información es sobre la salida
i = self._parse_output_info(lines, i, assignment_data)
elif not line.startswith('_') and line and 'ENABLELIST' not in line:
# Variable o expresión
if not assignment_data['target_var']:
assignment_data['source_expr'] = line
else:
i += 1
if assignment_data['target_var'] or assignment_data['function_name']:
network.assignments.append(assignment_data)
return i
def _parse_functionblock_in_assignment(self, lines: List[str], start_idx: int) -> Tuple[int, Dict]:
"""Parse function block dentro de asignación"""
i = start_idx + 1
fb_data = {'function_name': '', 'fb_params': []}
if i < len(lines):
fb_data['function_name'] = lines[i].strip()
i += 1
# Buscar parámetros
while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'):
line = lines[i].strip()
if line.startswith('_OPERAND'):
i += 2 # Saltar _EXPRESSION y _POSITIV/_NEGATIV
if i < len(lines):
param = lines[i].strip()
fb_data['fb_params'].append(param)
i += 1
return i, fb_data
def _parse_operator_in_assignment(self, lines: List[str], start_idx: int) -> Tuple[int, Dict]:
"""Parse operador dentro de asignación"""
i = start_idx + 1
op_data = {'operator': '', 'operands': []}
# Buscar operandos y operador
while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'):
line = lines[i].strip()
if line.startswith('_OPERAND'):
i += 2 # Saltar _EXPRESSION y _POSITIV/_NEGATIV
if i < len(lines):
operand = lines[i].strip()
op_data['operands'].append(operand)
elif line in ['ADD', 'SUB', 'MUL', 'DIV', 'AND', 'OR', 'LT', 'GT', 'EQ', 'SEL', 'MOVE']:
op_data['operator'] = line
i += 1
return i, op_data
def _parse_output_info(self, lines: List[str], start_idx: int, assignment_data: Dict) -> int:
"""Parse información de salida"""
i = start_idx + 1
# Saltar información de salida hasta encontrar la variable
while i < len(lines):
line = lines[i].strip()
if not line.startswith('_') and line and 'ENABLELIST' not in line:
assignment_data['target_var'] = line
break
i += 1
return i + 1
def _parse_function_block_detailed(self, lines: List[str], start_idx: int, network: LadNetwork) -> int:
"""Parse function block detallado"""
i = start_idx + 1
fb_data = {'name': '', 'params': [], 'outputs': []}
if i < len(lines):
fb_data['name'] = lines[i].strip()
i += 1
# Parse parámetros y salidas
while i < len(lines):
line = lines[i].strip()
if line.startswith('_OPERAND'):
i += 2
if i < len(lines):
param = lines[i].strip()
fb_data['params'].append(param)
elif line.startswith('_OUTPUT'):
i += 3 # Saltar información de salida
if i < len(lines):
output = lines[i].strip()
fb_data['outputs'].append(output)
elif line == '_NETWORK':
break
i += 1
network.function_blocks.append(fb_data)
return i
def convert_to_structured(self) -> str:
"""Convierte las redes a código pseudo estructurado"""
self.output_lines = [
"// Código pseudo estructurado generado desde LAD TwinCAT",
"// Compatible con IEC61131-3",
"PROGRAM Input_Converted",
""
]
for network in self.networks:
self._convert_network_to_structured(network)
self.output_lines.append("END_PROGRAM")
return '\n'.join(self.output_lines)
def _convert_network_to_structured(self, network: LadNetwork) -> None:
"""Convierte una red a código estructurado"""
self.output_lines.append(f" // Red {network.id}")
if network.comment:
self.output_lines.append(f" // {network.comment}")
# Procesar asignaciones de la red
for assignment in network.assignments:
self._convert_assignment_to_structured(assignment)
# Procesar function blocks independientes
for fb in network.function_blocks:
self._convert_function_block_to_structured(fb)
self.output_lines.append("")
def _convert_assignment_to_structured(self, assignment: Dict) -> None:
"""Convierte una asignación a código estructurado"""
conditions = assignment.get('conditions', [])
target_var = assignment.get('target_var', '')
source_expr = assignment.get('source_expr', '')
operator = assignment.get('operator', '')
operands = assignment.get('operands', [])
function_name = assignment.get('function_name', '')
fb_params = assignment.get('fb_params', [])
# Construir condiciones
condition_str = ""
if conditions:
condition_parts = []
for cond in conditions:
if cond.negated:
condition_parts.append(f"NOT {cond.name}")
else:
condition_parts.append(cond.name)
condition_str = " AND ".join(condition_parts)
# Construir la expresión de asignación
if function_name:
# Function block call
params_str = ", ".join(fb_params) if fb_params else ""
if target_var:
expr = f"{target_var} := {function_name}({params_str})"
else:
expr = f"{function_name}({params_str})"
elif operator and operands:
# Operación matemática/lógica
if len(operands) == 2:
if operator == "SEL":
expr = f"{target_var} := SEL({operands[0]}, {operands[1]}, condition)"
else:
expr = f"{target_var} := {operands[0]} {operator} {operands[1]}"
elif len(operands) == 1:
if operator == "MOVE":
expr = f"{target_var} := {operands[0]}"
else:
expr = f"{target_var} := {operator}({operands[0]})"
else:
expr = f"{target_var} := {operator}({', '.join(operands)})"
elif source_expr and target_var:
# Asignación simple
expr = f"{target_var} := {source_expr}"
else:
return # Sin información suficiente
# Generar código estructurado
if condition_str:
self.output_lines.append(f" IF {condition_str} THEN")
self.output_lines.append(f" {expr};")
self.output_lines.append(f" END_IF;")
else:
self.output_lines.append(f" {expr};")
def _convert_function_block_to_structured(self, fb: Dict) -> None:
"""Convierte un function block a código estructurado"""
name = fb.get('name', '')
params = fb.get('params', [])
if name:
params_str = ", ".join(params) if params else ""
self.output_lines.append(f" {name}({params_str});")
def save_to_file(self, output_path: str) -> None:
"""Guarda el código estructurado a un archivo"""
structured_code = self.convert_to_structured()
with open(output_path, 'w', encoding='utf-8') as f:
f.write(structured_code)
print(f"Código pseudo estructurado guardado en: {output_path}")
def main():
"""Función principal"""
import sys
if len(sys.argv) != 3:
print("Uso: python lad_to_pseudocode_converter_enhanced.py <archivo_entrada.EXP> <archivo_salida.txt>")
return
input_file = sys.argv[1]
output_file = sys.argv[2]
try:
converter = EnhancedLadConverter()
converter.parse_file(input_file)
converter.save_to_file(output_file)
print(f"Conversión completada exitosamente!")
print(f"Redes procesadas: {len(converter.networks)}")
# Mostrar estadísticas por red
for network in converter.networks[:5]: # Primeras 5 redes
print(f"Red {network.id}: {len(network.assignments)} asignaciones, {len(network.contacts)} contactos")
except Exception as e:
print(f"Error durante la conversión: {e}")
if __name__ == "__main__":
main()

View File

@ -39,13 +39,13 @@ class SimpleLadConverter:
i += 1
def _parse_network(self, lines, start_idx):
"""Parse una red individual"""
"""Parse una red individual con soporte mejorado para operadores LAD"""
network = {
'id': self.current_network_id,
'comment': '',
'contacts': [],
'assignments': [],
'outputs': []
'logic': None,
'target': '',
'function_blocks': []
}
i = start_idx + 1
@ -56,86 +56,139 @@ class SimpleLadConverter:
network['comment'] = comment
# Parse contenido de la red
current_assignment = {
'conditions': [],
'target': '',
'expression': '',
'function_block': '',
'operator': '',
'operands': []
}
while i < len(lines):
line = lines[i].strip()
if line == '_NETWORK':
break
elif line == '_LD_ASSIGN':
if current_assignment['target'] or current_assignment['function_block']:
network['assignments'].append(current_assignment.copy())
current_assignment = {
'conditions': [],
'target': '',
'expression': '',
'function_block': '',
'operator': '',
'operands': []
}
i += 1
elif line == '_LD_CONTACT':
i += 1
if i < len(lines):
contact_name = lines[i].strip()
i += 1
negated = False
if i < len(lines) and lines[i].strip() == '_NEGATIV':
negated = True
i += 1
current_assignment['conditions'].append({
'name': contact_name,
'negated': negated
})
elif line.startswith('_FUNCTIONBLOCK'):
i += 1
if i < len(lines):
current_assignment['function_block'] = lines[i].strip()
i += 1
elif line.startswith('_OPERATOR'):
i += 1
# Buscar operador y operandos
while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'):
subline = lines[i].strip()
if subline in ['ADD', 'SUB', 'MUL', 'DIV', 'AND', 'OR', 'LT', 'GT', 'EQ', 'SEL', 'MOVE']:
current_assignment['operator'] = subline
elif subline.startswith('_OPERAND'):
i += 2 # Saltar _EXPRESSION y _POSITIV
if i < len(lines):
operand = lines[i].strip()
current_assignment['operands'].append(operand)
i += 1
continue
# Parsear la lógica LAD después de _LD_ASSIGN
i, logic = self._parse_lad_expression(lines, i)
network['logic'] = logic
elif line.startswith('_OUTPUT'):
# Buscar variable de salida
i += 1
while i < len(lines) and lines[i].strip().startswith('_'):
i += 1
if i < len(lines):
current_assignment['target'] = lines[i].strip()
i += 1
elif not line.startswith('_') and line and 'ENABLELIST' not in line:
if not current_assignment['expression']:
current_assignment['expression'] = line
if i < len(lines) and lines[i].strip() and 'ENABLELIST' not in lines[i]:
network['target'] = lines[i].strip()
i += 1
else:
i += 1
# Agregar última asignación si existe
if current_assignment['target'] or current_assignment['function_block']:
network['assignments'].append(current_assignment)
self.networks.append(network)
return i
def _parse_lad_expression(self, lines, start_idx):
"""Parse una expresión LAD recursivamente"""
i = start_idx
while i < len(lines):
line = lines[i].strip()
if line == '_LD_AND':
return self._parse_and_expression(lines, i + 1)
elif line == '_LD_OR':
return self._parse_or_expression(lines, i + 1)
elif line == '_LD_CONTACT':
return self._parse_contact(lines, i + 1)
elif line.startswith('_FUNCTIONBLOCK'):
return self._parse_function_block(lines, i)
elif line.startswith('_OUTPUT') or line == 'ENABLELIST : 0':
break
else:
i += 1
return i, None
def _parse_and_expression(self, lines, start_idx):
"""Parse una expresión AND"""
i = start_idx
operands = []
# Buscar operadores
if i < len(lines) and lines[i].strip().startswith('_LD_OPERATOR'):
# Extraer número de operandos
operator_line = lines[i].strip()
num_operands = int(operator_line.split(':')[-1].strip()) if ':' in operator_line else 2
i += 1
# Parse cada operando
for _ in range(num_operands):
i, operand = self._parse_lad_expression(lines, i)
if operand:
operands.append(operand)
return i, {'type': 'AND', 'operands': operands}
def _parse_or_expression(self, lines, start_idx):
"""Parse una expresión OR"""
i = start_idx
operands = []
# Buscar operadores
if i < len(lines) and lines[i].strip().startswith('_LD_OPERATOR'):
# Extraer número de operandos
operator_line = lines[i].strip()
num_operands = int(operator_line.split(':')[-1].strip()) if ':' in operator_line else 2
i += 1
# Parse cada operando
for _ in range(num_operands):
i, operand = self._parse_lad_expression(lines, i)
if operand:
operands.append(operand)
return i, {'type': 'OR', 'operands': operands}
def _parse_contact(self, lines, start_idx):
"""Parse un contacto LAD"""
i = start_idx
contact_name = ""
negated = False
# Obtener nombre del contacto
if i < len(lines):
contact_name = lines[i].strip()
i += 1
# Verificar si hay expresión
if i < len(lines) and lines[i].strip() == '_EXPRESSION':
i += 1
# Verificar si está negado
if i < len(lines):
if lines[i].strip() == '_NEGATIV':
negated = True
i += 1
elif lines[i].strip() == '_POSITIV':
i += 1
return i, {'type': 'CONTACT', 'name': contact_name, 'negated': negated}
def _parse_function_block(self, lines, start_idx):
"""Parse un bloque de función"""
i = start_idx + 1
fb_name = ""
inputs = []
if i < len(lines):
fb_name = lines[i].strip()
i += 1
# Parse inputs del function block
while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'):
line = lines[i].strip()
if line.startswith('_OPERAND'):
i += 2 # Saltar _EXPRESSION
if i < len(lines):
inputs.append(lines[i].strip())
i += 1
else:
i += 1
return i, {'type': 'FUNCTION_BLOCK', 'name': fb_name, 'inputs': inputs}
def _parse_comment(self, lines, start_idx):
"""Parse comentario"""
i = start_idx + 1
@ -156,7 +209,7 @@ class SimpleLadConverter:
output = []
output.append("// Código pseudo estructurado generado desde LAD TwinCAT")
output.append("// Compatible con IEC61131-3")
output.append("PROGRAM Input_Converted")
output.append("PROGRAM PumpControl_Converted")
output.append("")
for network in self.networks:
@ -164,60 +217,66 @@ class SimpleLadConverter:
if network['comment']:
output.append(f" // {network['comment']}")
for assignment in network['assignments']:
structured_line = self._convert_assignment(assignment)
if structured_line:
if assignment['conditions']:
# Construir condición
conditions = []
for cond in assignment['conditions']:
if cond['negated']:
conditions.append(f"NOT {cond['name']}")
else:
conditions.append(cond['name'])
condition_str = " AND ".join(conditions)
output.append(f" IF {condition_str} THEN")
output.append(f" {structured_line};")
output.append(" END_IF;")
else:
output.append(f" {structured_line};")
if network['logic'] and network['target']:
condition_str = self._convert_logic_to_string(network['logic'])
if condition_str:
output.append(f" IF {condition_str} THEN")
output.append(f" {network['target']} := TRUE;")
output.append(" ELSE")
output.append(f" {network['target']} := FALSE;")
output.append(" END_IF;")
else:
output.append(f" {network['target']} := TRUE; // Logic no reconocida")
output.append("")
output.append("END_PROGRAM")
return '\n'.join(output)
def _convert_assignment(self, assignment):
"""Convertir una asignación a código estructurado"""
target = assignment['target']
expression = assignment['expression']
function_block = assignment['function_block']
operator = assignment['operator']
operands = assignment['operands']
def _convert_logic_to_string(self, logic):
"""Convertir lógica LAD a string estructurado"""
if not logic:
return ""
if function_block:
params = ", ".join(operands) if operands else ""
if target:
return f"{target} := {function_block}({params})"
if logic['type'] == 'CONTACT':
if logic['negated']:
return f"NOT {logic['name']}"
else:
return f"{function_block}({params})"
return logic['name']
elif operator and operands:
if len(operands) >= 2:
if operator == "SEL":
return f"{target} := SEL({operands[0]}, {operands[1]}, condition)"
else:
return f"{target} := {operands[0]} {operator} {operands[1]}"
elif len(operands) == 1:
if operator == "MOVE":
return f"{target} := {operands[0]}"
else:
return f"{target} := {operator}({operands[0]})"
elif logic['type'] == 'AND':
operand_strings = []
for operand in logic['operands']:
operand_str = self._convert_logic_to_string(operand)
if operand_str:
operand_strings.append(operand_str)
if len(operand_strings) > 1:
return "(" + " AND ".join(operand_strings) + ")"
elif len(operand_strings) == 1:
return operand_strings[0]
else:
return ""
elif expression and target:
return f"{target} := {expression}"
elif logic['type'] == 'OR':
operand_strings = []
for operand in logic['operands']:
operand_str = self._convert_logic_to_string(operand)
if operand_str:
operand_strings.append(operand_str)
if len(operand_strings) > 1:
return "(" + " OR ".join(operand_strings) + ")"
elif len(operand_strings) == 1:
return operand_strings[0]
else:
return ""
return None
elif logic['type'] == 'FUNCTION_BLOCK':
inputs_str = ", ".join(logic['inputs']) if logic['inputs'] else ""
return f"{logic['name']}({inputs_str})"
return ""
def save_to_file(self, output_path):
"""Guardar código estructurado"""
@ -229,35 +288,77 @@ class SimpleLadConverter:
print(f"Código guardado en: {output_path}")
return structured_code
def print_debug_info(self):
"""Mostrar información de debug sobre los networks parseados"""
print(f"\n=== DEBUG INFO - {len(self.networks)} networks encontrados ===")
for network in self.networks:
print(f"\nRed {network['id']}:")
if network['comment']:
print(f" Comentario: {network['comment']}")
print(f" Target: {network['target']}")
if network['logic']:
print(f" Lógica: {self._debug_logic_string(network['logic'])}")
condition_str = self._convert_logic_to_string(network['logic'])
print(f" Condición: {condition_str}")
else:
print(" Sin lógica")
def _debug_logic_string(self, logic, indent=0):
"""Crear string de debug para la lógica"""
if not logic:
return "None"
prefix = " " * indent
if logic['type'] == 'CONTACT':
neg_str = " (NEGADO)" if logic['negated'] else ""
return f"{prefix}CONTACT: {logic['name']}{neg_str}"
elif logic['type'] == 'AND':
result = f"{prefix}AND:\n"
for operand in logic['operands']:
result += self._debug_logic_string(operand, indent + 1) + "\n"
return result.rstrip()
elif logic['type'] == 'OR':
result = f"{prefix}OR:\n"
for operand in logic['operands']:
result += self._debug_logic_string(operand, indent + 1) + "\n"
return result.rstrip()
elif logic['type'] == 'FUNCTION_BLOCK':
return f"{prefix}FUNCTION_BLOCK: {logic['name']} inputs: {logic['inputs']}"
return f"{prefix}UNKNOWN: {logic}"
def main():
"""Función principal"""
converter = SimpleLadConverter()
try:
print("=== Convertidor LAD Simplificado ===")
print("Parseando archivo INPUT.EXP...")
print("=== Convertidor LAD Mejorado ===")
print("Parseando archivo _PUMPCONTROL.EXP...")
converter.parse_file(".example/_PUMPCONTROL.EXP")
print(f"Redes encontradas: {len(converter.networks)}")
# Mostrar algunas estadísticas
for i, network in enumerate(converter.networks[:5]):
print(f"Red {network['id']}: {len(network['assignments'])} asignaciones")
if network['comment']:
print(f" Comentario: {network['comment']}")
# Mostrar información de debug
converter.print_debug_info()
# Convertir y guardar
print("\nGenerando código estructurado...")
structured_code = converter.save_to_file("simple_output.txt")
structured_code = converter.save_to_file("pump_control_output.txt")
# Mostrar primeras líneas
# Mostrar el código generado
lines = structured_code.split('\n')
print(f"\nPrimeras {min(30, len(lines))} líneas:")
for i, line in enumerate(lines[:30]):
print(f"\nCódigo generado ({len(lines)} líneas):")
for i, line in enumerate(lines):
print(f"{i+1:3d}: {line}")
print(f"\n✓ Conversión completada! Total de líneas: {len(lines)}")
print(f"\n✓ Conversión completada!")
except Exception as e:
print(f"Error: {e}")

View File

@ -1,69 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de prueba para el convertidor LAD mejorado
"""
from lad_to_pseudocode_converter_enhanced import EnhancedLadConverter
def test_enhanced_converter():
"""Prueba el convertidor mejorado"""
print("=== Convertidor LAD Mejorado ===")
try:
# Crear el convertidor
converter = EnhancedLadConverter()
# Parsear el archivo
print("Parseando INPUT.EXP...")
converter.parse_file(".example/INPUT.EXP")
print(f"Redes encontradas: {len(converter.networks)}")
# Mostrar detalles de las primeras redes
for i, network in enumerate(converter.networks[:10]):
print(f"\n--- Red {network.id} ---")
print(f"Comentario: {network.comment}")
print(f"Contactos: {len(network.contacts)}")
print(f"Asignaciones: {len(network.assignments)}")
print(f"Function Blocks: {len(network.function_blocks)}")
# Mostrar detalles de asignaciones
for j, assignment in enumerate(network.assignments[:3]):
print(f" Asignación {j+1}:")
if assignment['conditions']:
conditions = [f"{'NOT ' if c.negated else ''}{c.name}" for c in assignment['conditions']]
print(f" Condiciones: {' AND '.join(conditions)}")
if assignment['target_var']:
print(f" Variable destino: {assignment['target_var']}")
if assignment['function_name']:
print(f" Function Block: {assignment['function_name']}")
if assignment['operator']:
print(f" Operador: {assignment['operator']}")
if assignment['operands']:
print(f" Operandos: {assignment['operands']}")
# Convertir a pseudocódigo
print("\n=== Generando pseudocódigo ===")
structured_code = converter.convert_to_structured()
# Guardar archivo
output_file = "INPUT_enhanced_pseudocode.txt"
converter.save_to_file(output_file)
# Mostrar las primeras líneas
print("\n=== Primeras líneas del resultado ===")
lines = structured_code.split('\n')
for i, line in enumerate(lines[:50]):
print(f"{i+1:3d}: {line}")
return True
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
test_enhanced_converter()

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python3
from simple_lad_converter import SimpleLadConverter
converter = SimpleLadConverter()
print("Iniciando conversión...")
try:
converter.parse_file(".example/INPUT.EXP")
print(f"Redes encontradas: {len(converter.networks)}")
structured_code = converter.save_to_file("output_simple.txt")
print("Conversión completada!")
# Mostrar primeras líneas
lines = structured_code.split('\n')
for i, line in enumerate(lines[:20]):
print(f"{i+1:2d}: {line}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()

View File

@ -0,0 +1,585 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script convierte un archivo JSON simplificado (resultado de un análisis de un XML de Siemens) a un
JSON enriquecido con lógica SCL. Se enfoca en la lógica de programación y la agrupación de instrucciones IF.
"""
# -*- coding: utf-8 -*-
import json
import argparse
import os
import copy
import traceback
import re
import importlib
import sys
import sympy
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Import necessary components from processors directory
from processors.processor_utils import format_variable_name, sympy_expr_to_scl
from processors.symbol_manager import SymbolManager
# --- Constantes y Configuración ---
SCL_SUFFIX = "_sympy_processed"
GROUPED_COMMENT = "// Logic included in grouped IF"
SIMPLIFIED_IF_COMMENT = "// Simplified IF condition by script"
# Global data dictionary
data = {}
# --- (process_group_ifs y load_processors SIN CAMBIOS) ---
def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data):
instr_uid = instruction["instruction_uid"]
instr_type_original = (
instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "")
)
made_change = False
if (
not instruction.get("type", "").endswith(SCL_SUFFIX)
or "_error" in instruction.get("type", "")
or instruction.get("grouped", False)
or instr_type_original
not in [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"PBox",
"NBox",
"And",
"Xor",
"Not",
]
):
return False
current_scl = instruction.get("scl", "")
if (
current_scl.strip().startswith("IF")
and "END_IF;" in current_scl
and GROUPED_COMMENT not in current_scl
):
return False
map_key_out = (network_id, instr_uid, "out")
sympy_condition_expr = sympy_map.get(map_key_out)
if sympy_condition_expr is None or sympy_condition_expr in [
sympy.true,
sympy.false,
]:
return False
grouped_instructions_cores = []
consumer_instr_list = []
network_logic = next(
(net["logic"] for net in data["networks"] if net["id"] == network_id), []
)
if not network_logic:
return False
groupable_types = [
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call_FC",
"Call_FB",
"SCoil",
"RCoil",
"BLKMOV",
"TON",
"TOF",
"TP",
"Se",
"Sd",
"CTU",
"CTD",
"CTUD",
]
for consumer_instr in network_logic:
consumer_uid = consumer_instr["instruction_uid"]
if consumer_instr.get("grouped", False) or consumer_uid == instr_uid:
continue
consumer_en = consumer_instr.get("inputs", {}).get("en")
consumer_type = consumer_instr.get("type", "")
consumer_type_original = consumer_type.replace(SCL_SUFFIX, "").replace(
"_error", ""
)
is_enabled_by_us = False
if (
isinstance(consumer_en, dict)
and consumer_en.get("type") == "connection"
and consumer_en.get("source_instruction_uid") == instr_uid
and consumer_en.get("source_pin") == "out"
):
is_enabled_by_us = True
if (
is_enabled_by_us
and consumer_type.endswith(SCL_SUFFIX)
and consumer_type_original in groupable_types
):
consumer_scl = consumer_instr.get("scl", "")
core_scl = None
if consumer_scl:
if consumer_scl.strip().startswith("IF"):
match = re.search(
r"IF\s+.*?THEN\s*(.*?)\s*END_IF;",
consumer_scl,
re.DOTALL | re.IGNORECASE,
)
core_scl = match.group(1).strip() if match else None
elif not consumer_scl.strip().startswith("//"):
core_scl = consumer_scl.strip()
if core_scl:
grouped_instructions_cores.append(core_scl)
consumer_instr_list.append(consumer_instr)
if len(grouped_instructions_cores) > 1:
print(
f"INFO: Agrupando {len(grouped_instructions_cores)} instr. bajo condición de {instr_type_original} UID {instr_uid}"
)
try:
simplified_expr = sympy.logic.boolalg.to_dnf(
sympy_condition_expr, simplify=True
)
except Exception as e:
print(f"Error simplifying condition for grouping UID {instr_uid}: {e}")
simplified_expr = sympy_condition_expr
condition_scl_simplified = sympy_expr_to_scl(simplified_expr, symbol_manager)
scl_grouped_lines = [f"IF {condition_scl_simplified} THEN"]
for core_line in grouped_instructions_cores:
indented_core = "\n".join(
[f" {line.strip()}" for line in core_line.splitlines()]
)
scl_grouped_lines.append(indented_core)
scl_grouped_lines.append("END_IF;")
final_grouped_scl = "\n".join(scl_grouped_lines)
instruction["scl"] = final_grouped_scl
for consumer_instr in consumer_instr_list:
consumer_instr["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})"
consumer_instr["grouped"] = True
made_change = True
return made_change
def load_processors(processors_dir="processors"):
processor_map = {}
processor_list_unsorted = []
default_priority = 10
if not os.path.isdir(processors_dir):
print(f"Error: Directorio de procesadores no encontrado: '{processors_dir}'")
return processor_map, []
print(f"Cargando procesadores desde: '{processors_dir}'")
processors_package = os.path.basename(processors_dir)
for filename in os.listdir(processors_dir):
if filename.startswith("process_") and filename.endswith(".py"):
module_name_rel = filename[:-3]
full_module_name = f"{processors_package}.{module_name_rel}"
try:
module = importlib.import_module(full_module_name)
if hasattr(module, "get_processor_info") and callable(
module.get_processor_info
):
processor_info = module.get_processor_info()
info_list = []
if isinstance(processor_info, dict):
info_list = [processor_info]
elif isinstance(processor_info, list):
info_list = processor_info
else:
print(
f" Advertencia: get_processor_info en {full_module_name} devolvió tipo inesperado."
)
continue
for info in info_list:
if (
isinstance(info, dict)
and "type_name" in info
and "processor_func" in info
):
type_name = info["type_name"].lower()
processor_func = info["processor_func"]
priority = info.get("priority", default_priority)
if callable(processor_func):
if type_name in processor_map:
print(
f" Advertencia: '{type_name}' en {full_module_name} sobrescribe definición anterior."
)
processor_map[type_name] = processor_func
processor_list_unsorted.append(
{
"priority": priority,
"type_name": type_name,
"func": processor_func,
}
)
else:
print(
f" Advertencia: 'processor_func' para '{type_name}' en {full_module_name} no es callable."
)
else:
print(
f" Advertencia: Entrada inválida en {full_module_name}: {info}"
)
else:
print(
f" Advertencia: Módulo {module_name_rel}.py no tiene 'get_processor_info'."
)
except ImportError as e:
print(f"Error importando {full_module_name}: {e}")
except Exception as e:
print(f"Error procesando {full_module_name}: {e}")
traceback.print_exc()
processor_list_sorted = sorted(processor_list_unsorted, key=lambda x: x["priority"])
return processor_map, processor_list_sorted
# --- Bucle Principal de Procesamiento (MODIFICADO para copiar metadatos) ---
def process_json_to_scl(json_filepath, output_json_filepath):
"""
Lee JSON simplificado, copia metadatos, aplica procesadores dinámicos,
y guarda JSON procesado en la ruta especificada.
"""
global data
if not os.path.exists(json_filepath):
print(
f"Error Crítico (x2): JSON de entrada no encontrado: {json_filepath}",
file=sys.stderr,
)
return False
print(f"Cargando JSON desde: {json_filepath}")
try:
with open(json_filepath, "r", encoding="utf-8") as f:
data = json.load(f) # Carga el JSON de entrada
except Exception as e:
print(f"Error Crítico al cargar JSON de entrada: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
return False
# <-- NUEVO: Extraer metadatos del JSON de entrada (si existen) -->
source_xml_mod_time = data.get("source_xml_mod_time")
source_xml_size = data.get("source_xml_size")
# <-- FIN NUEVO -->
block_type = data.get("block_type", "Unknown")
print(f"Procesando bloque tipo: {block_type}")
if block_type in ["GlobalDB", "PlcUDT", "PlcTagTable", "InstanceDB"]: # <-- MODIFIED: Add InstanceDB
print(f"INFO: El bloque es {block_type}. Saltando procesamiento lógico de x2.")
print(
f"Guardando JSON de {block_type} (con metadatos) en: {output_json_filepath}"
)
try:
# <-- MODIFICADO: Asegurar que los metadatos se guarden aunque se salte -->
data_to_save = copy.deepcopy(data) # Copiar datos originales
if source_xml_mod_time is not None:
data_to_save["source_xml_mod_time"] = source_xml_mod_time
if source_xml_size is not None:
data_to_save["source_xml_size"] = source_xml_size
# <-- FIN MODIFICADO -->
with open(output_json_filepath, "w", encoding="utf-8") as f_out:
json.dump(data_to_save, f_out, indent=4, ensure_ascii=False)
print(f"Guardado de {block_type} completado.")
return True
except Exception as e:
print(f"Error Crítico al guardar JSON de {block_type}: {e}")
traceback.print_exc(file=sys.stderr)
return False
print(f"INFO: El bloque es {block_type}. Iniciando procesamiento lógico...")
script_dir = os.path.dirname(__file__)
processors_dir_path = os.path.join(script_dir, "processors")
processor_map, sorted_processors = load_processors(processors_dir_path)
if not processor_map:
print("Error crítico: No se cargaron procesadores. Abortando.", file=sys.stderr)
return False
# (Mapas de acceso y bucle iterativo SIN CAMBIOS relevantes, solo pasan 'data' que ya tiene metadatos)
network_access_maps = {}
for network in data.get("networks", []):
net_id = network["id"]
current_access_map = {}
for instr in network.get("logic", []):
for _, source in instr.get("inputs", {}).items():
sources_to_check = (
source
if isinstance(source, list)
else ([source] if isinstance(source, dict) else [])
)
for src in sources_to_check:
if (
isinstance(src, dict)
and src.get("uid")
and src.get("type") in ["variable", "constant"]
):
current_access_map[src["uid"]] = src
for _, dest_list in instr.get("outputs", {}).items():
if isinstance(dest_list, list):
for dest in dest_list:
if (
isinstance(dest, dict)
and dest.get("uid")
and dest.get("type") in ["variable", "constant"]
):
current_access_map[dest["uid"]] = dest
network_access_maps[net_id] = current_access_map
symbol_manager = SymbolManager()
sympy_map = {}
max_passes = 30
passes = 0
processing_complete = False
print(f"\n--- Iniciando Bucle de Procesamiento Iterativo ({block_type}) ---")
while passes < max_passes and not processing_complete:
passes += 1
made_change_in_base_pass = False
made_change_in_group_pass = False
print(f"\n--- Pase {passes} ---")
num_sympy_processed_this_pass = 0
num_grouped_this_pass = 0
print(f" Fase 1 (SymPy Base - Orden por Prioridad):")
num_sympy_processed_this_pass = 0
for processor_info in sorted_processors:
current_type_name = processor_info["type_name"]
func_to_call = processor_info["func"]
for network in data.get("networks", []):
network_id = network["id"]
network_lang = network.get("language", "LAD")
if network_lang == "STL":
continue
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
instr_uid = instruction.get("instruction_uid")
instr_type_current = instruction.get("type", "Unknown")
if (
instr_type_current.endswith(SCL_SUFFIX)
or "_error" in instr_type_current
or instruction.get("grouped", False)
or instr_type_current
in [
"RAW_STL_CHUNK",
"RAW_SCL_CHUNK",
"UNSUPPORTED_LANG",
"UNSUPPORTED_CONTENT",
"PARSING_ERROR",
]
):
continue
lookup_key = instr_type_current.lower()
effective_type_name = lookup_key
if instr_type_current == "Call":
call_block_type = instruction.get("block_type", "").upper()
if call_block_type == "FC":
effective_type_name = "call_fc"
elif call_block_type == "FB":
effective_type_name = "call_fb"
if effective_type_name == current_type_name:
try:
changed = func_to_call(
instruction, network_id, sympy_map, symbol_manager, data
) # data se pasa aquí
if changed:
made_change_in_base_pass = True
num_sympy_processed_this_pass += 1
except Exception as e:
print(
f"ERROR(SymPy Base) al procesar {instr_type_current} UID {instr_uid}: {e}"
)
traceback.print_exc()
instruction["scl"] = (
f"// ERROR en SymPy procesador base: {e}"
)
instruction["type"] = instr_type_current + "_error"
made_change_in_base_pass = True
print(
f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy."
)
if made_change_in_base_pass or passes == 1:
print(f" Fase 2 (Agrupación IF con Simplificación):")
num_grouped_this_pass = 0
for network in data.get("networks", []):
network_id = network["id"]
network_lang = network.get("language", "LAD")
if network_lang == "STL":
continue
network_logic = network.get("logic", [])
uids_in_network = sorted(
[
instr.get("instruction_uid", "Z")
for instr in network_logic
if instr.get("instruction_uid")
]
)
for uid_to_process in uids_in_network:
instruction = next(
(
instr
for instr in network_logic
if instr.get("instruction_uid") == uid_to_process
),
None,
)
if not instruction:
continue
if instruction.get("grouped") or "_error" in instruction.get(
"type", ""
):
continue
if instruction.get("type", "").endswith(SCL_SUFFIX):
try:
group_changed = process_group_ifs(
instruction, network_id, sympy_map, symbol_manager, data
) # data se pasa aquí
if group_changed:
made_change_in_group_pass = True
num_grouped_this_pass += 1
except Exception as e:
print(
f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}"
)
traceback.print_exc()
print(
f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL)."
)
if not made_change_in_base_pass and not made_change_in_group_pass:
print(
f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---"
)
processing_complete = True
else:
print(
f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando..."
)
if passes == max_passes and not processing_complete:
print(f"\n--- ADVERTENCIA: Límite de {max_passes} pases alcanzado...")
# --- Verificación Final y Guardado JSON ---
print(f"\n--- Verificación Final de Instrucciones No Procesadas ({block_type}) ---")
unprocessed_count = 0
unprocessed_details = []
ignored_types = [
"raw_scl_chunk",
"unsupported_lang",
"raw_stl_chunk",
"unsupported_content",
"parsing_error",
]
for network in data.get("networks", []):
network_id = network.get("id", "Unknown ID")
network_title = network.get("title", f"Network {network_id}")
network_lang = network.get("language", "LAD")
if network_lang == "STL":
continue
for instruction in network.get("logic", []):
instr_uid = instruction.get("instruction_uid", "Unknown UID")
instr_type = instruction.get("type", "Unknown Type")
is_grouped = instruction.get("grouped", False)
if (
not instr_type.endswith(SCL_SUFFIX)
and "_error" not in instr_type
and not is_grouped
and instr_type.lower() not in ignored_types
):
unprocessed_count += 1
unprocessed_details.append(
f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), Instrucción UID: {instr_uid}, Tipo: '{instr_type}'"
)
if unprocessed_count > 0:
print(
f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:"
)
[print(detail) for detail in unprocessed_details]
else:
print(
"INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas."
)
# <-- MODIFICADO: Asegurar que los metadatos se añadan al 'data' final antes de guardar -->
if source_xml_mod_time is not None:
data["source_xml_mod_time"] = source_xml_mod_time
if source_xml_size is not None:
data["source_xml_size"] = source_xml_size
# <-- FIN MODIFICADO -->
print(f"\nGuardando JSON procesado ({block_type}) en: {output_json_filepath}")
try:
with open(output_json_filepath, "w", encoding="utf-8") as f:
json.dump(
data, f, indent=4, ensure_ascii=False
) # Guardar el diccionario 'data' modificado
print("Guardado completado.")
return True
except Exception as e:
print(f"Error Crítico al guardar JSON procesado: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
return False
# --- Ejecución (MODIFICADO) ---
if __name__ == "__main__":
# Lógica para ejecución standalone
try:
import tkinter as tk
from tkinter import filedialog
except ImportError:
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr)
tk = None
input_json_file = ""
if tk:
root = tk.Tk()
root.withdraw()
print("Por favor, selecciona el archivo JSON de entrada (generado por x1)...")
input_json_file = filedialog.askopenfilename(
title="Selecciona el archivo JSON de entrada (.json)",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
root.destroy()
if not input_json_file:
print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr)
else:
print(f"Archivo JSON de entrada seleccionado: {input_json_file}")
# Calcular ruta de salida JSON procesado
json_filename_base = os.path.splitext(os.path.basename(input_json_file))[0]
# Asumimos que el _processed.json va al mismo directorio 'parsing'
parsing_dir = os.path.dirname(input_json_file)
output_json_file = os.path.join(parsing_dir, f"{json_filename_base}_processed.json")
# Asegurarse de que el directorio de salida exista (aunque debería si el input existe)
os.makedirs(parsing_dir, exist_ok=True)
print(
f"(x2 - Standalone) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'"
)
try:
success = process_json_to_scl(input_json_file, output_json_file)
if success:
print("\nProcesamiento completado exitosamente.")
else:
print(f"\nError durante el procesamiento de '{os.path.relpath(input_json_file)}'.", file=sys.stderr)
# sys.exit(1) # No usar sys.exit
except Exception as e:
print(
f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}",
file=sys.stderr,
)
traceback.print_exc(file=sys.stderr)
# sys.exit(1) # No usar sys.exit