From 205e1f4c8da9d0e248d5bcbf2c9425e3b76ff33a Mon Sep 17 00:00:00 2001 From: Miguel Date: Thu, 19 Jun 2025 14:45:27 +0200 Subject: [PATCH] Mejorado de la conversion LAD de Twincat --- .../TwinCat/ejemplo_conversion.py | 71 --- .../TwinCat/lad_to_pseudocode_converter.py | 391 ------------ .../lad_to_pseudocode_converter_enhanced.py | 429 ------------- .../TwinCat/simple_lad_converter.py | 343 ++++++---- .../TwinCat/test_enhanced_converter.py | 69 --- backend/script_groups/TwinCat/test_simple.py | 23 - backend/script_groups/TwinCat/x2_process.py | 585 ++++++++++++++++++ 7 files changed, 807 insertions(+), 1104 deletions(-) delete mode 100644 backend/script_groups/TwinCat/ejemplo_conversion.py delete mode 100644 backend/script_groups/TwinCat/lad_to_pseudocode_converter.py delete mode 100644 backend/script_groups/TwinCat/lad_to_pseudocode_converter_enhanced.py delete mode 100644 backend/script_groups/TwinCat/test_enhanced_converter.py delete mode 100644 backend/script_groups/TwinCat/test_simple.py create mode 100644 backend/script_groups/TwinCat/x2_process.py diff --git a/backend/script_groups/TwinCat/ejemplo_conversion.py b/backend/script_groups/TwinCat/ejemplo_conversion.py deleted file mode 100644 index 644a586..0000000 --- a/backend/script_groups/TwinCat/ejemplo_conversion.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Ejemplo de uso del convertidor LAD a pseudocódigo estructurado -""" - -from lad_to_pseudocode_converter import LadToStructuredConverter - -def convertir_input_exp(): - """Ejemplo de conversión del archivo INPUT.EXP""" - - # Crear instancia del convertidor - converter = LadToStructuredConverter() - - try: - # Cargar y parsear el archivo LAD - print("Parseando archivo INPUT.EXP...") - converter.parse_file(".example/INPUT.EXP") - - # Convertir a código estructurado - print("Convirtiendo a pseudocódigo estructurado...") - structured_code = converter.convert_to_structured() - - # Guardar resultado - output_file = "INPUT_pseudocode.txt" - converter.save_to_file(output_file) - - # Mostrar estadísticas - print(f"\n=== Estadísticas de conversión ===") - print(f"Redes procesadas: {len(converter.networks)}") - print(f"Archivo de salida: {output_file}") - - # Mostrar las primeras líneas como ejemplo - print(f"\n=== Primeras líneas del resultado ===") - lines = structured_code.split('\n') - for i, line in enumerate(lines[:30]): - print(f"{i+1:3d}: {line}") - - if len(lines) > 30: - print("...") - print(f"(Total de {len(lines)} líneas)") - - return True - - except Exception as e: - print(f"Error durante la conversión: {e}") - return False - -def mostrar_detalles_red(converter, network_id): - """Muestra los detalles de una red específica""" - if network_id <= len(converter.networks): - network = converter.networks[network_id - 1] - print(f"\n=== Detalles de Red {network_id} ===") - print(f"Comentario: {network.comment}") - print(f"Elementos: {len(network.elements)}") - - for i, element in enumerate(network.elements): - print(f" {i+1}. {element.element_type.value}: {element.name}") - if element.expression: - print(f" Expresión: {element.expression}") - if element.parameters: - print(f" Parámetros: {element.parameters}") - -if __name__ == "__main__": - print("=== Convertidor LAD a Pseudocódigo Estructurado ===") - print("Procesando archivo INPUT.EXP de TwinCAT...") - - if convertir_input_exp(): - print("\n✓ Conversión completada exitosamente!") - else: - print("\n✗ Error en la conversión") \ No newline at end of file diff --git a/backend/script_groups/TwinCat/lad_to_pseudocode_converter.py b/backend/script_groups/TwinCat/lad_to_pseudocode_converter.py deleted file mode 100644 index f8fb791..0000000 --- a/backend/script_groups/TwinCat/lad_to_pseudocode_converter.py +++ /dev/null @@ -1,391 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Convertidor de código LAD (Ladder Diagram) de TwinCAT a pseudocódigo estructurado -Compatible con IEC61131-3 - -Autor: Asistente AI -Fecha: 2024 -""" - -import re -from typing import List, Dict, Any, Optional -from dataclasses import dataclass -from enum import Enum - - -class ElementType(Enum): - CONTACT = "CONTACT" - COIL = "COIL" - FUNCTION_BLOCK = "FUNCTION_BLOCK" - OPERATOR = "OPERATOR" - EXPRESSION = "EXPRESSION" - ASSIGN = "ASSIGN" - - -@dataclass -class LadderElement: - """Representa un elemento de la escalera LAD""" - element_type: ElementType - name: str - expression: str = "" - positive: bool = True - enabled: bool = True - parameters: List[str] = None - outputs: List[str] = None - - def __post_init__(self): - if self.parameters is None: - self.parameters = [] - if self.outputs is None: - self.outputs = [] - - -@dataclass -class LadderNetwork: - """Representa una red completa de LAD""" - network_id: int - comment: str = "" - elements: List[LadderElement] = None - - def __post_init__(self): - if self.elements is None: - self.elements = [] - - -class LadToStructuredConverter: - """Conversor principal de LAD a código estructurado""" - - def __init__(self): - self.networks: List[LadderNetwork] = [] - self.current_network: Optional[LadderNetwork] = None - self.output_lines: List[str] = [] - - def parse_file(self, file_path: str) -> None: - """Parse el archivo LAD completo""" - with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: - content = f.read() - - # Encontrar la sección LAD - lad_start = content.find('_LD_BODY') - if lad_start == -1: - raise ValueError("No se encontró sección _LD_BODY en el archivo") - - lad_content = content[lad_start:] - self._parse_lad_content(lad_content) - - def _parse_lad_content(self, content: str) -> None: - """Parse el contenido LAD línea por línea""" - lines = content.split('\n') - i = 0 - - while i < len(lines): - line = lines[i].strip() - - if line == '_NETWORK': - i = self._parse_network(lines, i) - else: - i += 1 - - def _parse_network(self, lines: List[str], start_idx: int) -> int: - """Parse una red individual""" - network_id = len(self.networks) + 1 - network = LadderNetwork(network_id=network_id) - self.current_network = network - - i = start_idx + 1 - - while i < len(lines) and not lines[i].strip().startswith('_NETWORK'): - line = lines[i].strip() - - if line == '_COMMENT': - i, comment = self._parse_comment(lines, i) - network.comment = comment - elif line == '_LD_ASSIGN': - i = self._parse_assignment(lines, i) - elif line == '_LD_CONTACT': - i = self._parse_contact(lines, i) - elif line == '_LD_AND' or line == '_LD_OR': - i = self._parse_logic_operation(lines, i, line) - elif line.startswith('_FUNCTIONBLOCK'): - i = self._parse_function_block(lines, i) - elif line.startswith('_OPERATOR'): - i = self._parse_operator(lines, i) - else: - i += 1 - - self.networks.append(network) - return i - - def _parse_comment(self, lines: List[str], start_idx: int) -> tuple[int, str]: - """Parse un comentario""" - i = start_idx + 1 - comment_lines = [] - - while i < len(lines): - line = lines[i].strip() - if line == '_END_COMMENT': - break - if line and not line.startswith('_'): - comment_lines.append(line) - i += 1 - - return i + 1, '\n'.join(comment_lines) - - def _parse_assignment(self, lines: List[str], start_idx: int) -> int: - """Parse una asignación""" - i = start_idx + 1 - - # Buscar el elemento a asignar - target_var = "" - source_expr = "" - - while i < len(lines): - line = lines[i].strip() - - if line == '_EMPTY': - i += 1 - continue - elif line == '_EXPRESSION': - i += 1 - continue - elif line == '_POSITIV' or line == '_NEGATIV': - i += 1 - continue - elif line.startswith('_OUTPUT'): - # Siguiente línea debería ser la variable de salida - i += 2 # Saltar _POSITIV/_NEGATIV - i += 1 # Saltar _NO_SET o _SET - if i < len(lines): - target_var = lines[i].strip() - break - elif line == 'ENABLELIST_END': - break - elif not line.startswith('_') and not line.startswith('ENABLELIST'): - if not target_var: - source_expr = line - i += 1 - - if target_var: - element = LadderElement( - element_type=ElementType.ASSIGN, - name=target_var, - expression=source_expr - ) - self.current_network.elements.append(element) - - return i + 1 - - def _parse_contact(self, lines: List[str], start_idx: int) -> int: - """Parse un contacto""" - i = start_idx + 1 - contact_name = "" - is_positive = True - - if i < len(lines): - contact_name = lines[i].strip() - i += 1 - - if i < len(lines): - polarity = lines[i].strip() - is_positive = polarity == '_POSITIV' - i += 1 - - element = LadderElement( - element_type=ElementType.CONTACT, - name=contact_name, - positive=is_positive - ) - self.current_network.elements.append(element) - - return i - - def _parse_logic_operation(self, lines: List[str], start_idx: int, operation: str) -> int: - """Parse operaciones lógicas AND/OR""" - i = start_idx + 1 - - # Extraer el tipo de operación - op_type = operation.replace('_LD_', '') - - element = LadderElement( - element_type=ElementType.OPERATOR, - name=op_type - ) - self.current_network.elements.append(element) - - return i - - def _parse_function_block(self, lines: List[str], start_idx: int) -> int: - """Parse un bloque de función""" - i = start_idx + 1 - fb_name = "" - parameters = [] - - if i < len(lines): - fb_name = lines[i].strip() - i += 1 - - # Buscar parámetros - while i < len(lines): - line = lines[i].strip() - if line.startswith('_OPERAND'): - i += 2 # Saltar _EXPRESSION y _POSITIV - if i < len(lines): - param = lines[i].strip() - parameters.append(param) - elif line == '_OUTPUTS': - break - i += 1 - - element = LadderElement( - element_type=ElementType.FUNCTION_BLOCK, - name=fb_name, - parameters=parameters - ) - self.current_network.elements.append(element) - - return i - - def _parse_operator(self, lines: List[str], start_idx: int) -> int: - """Parse un operador""" - i = start_idx + 1 - operator_name = "" - operands = [] - - while i < len(lines): - line = lines[i].strip() - - if line.startswith('_OPERAND'): - i += 2 # Saltar _EXPRESSION y _POSITIV - if i < len(lines): - operand = lines[i].strip() - operands.append(operand) - elif line.startswith('_EXPRESSION') and i + 1 < len(lines): - next_line = lines[i + 1].strip() - if not next_line.startswith('_'): - operator_name = next_line - i += 1 - elif line == '_OUTPUTS': - break - i += 1 - - element = LadderElement( - element_type=ElementType.OPERATOR, - name=operator_name, - parameters=operands - ) - self.current_network.elements.append(element) - - return i - - def convert_to_structured(self) -> str: - """Convierte las redes LAD a código pseudo estructurado""" - self.output_lines = [] - - self.output_lines.append("// Código pseudo estructurado generado desde LAD") - self.output_lines.append("// Compatible con IEC61131-3") - self.output_lines.append("") - - for network in self.networks: - self._convert_network_to_structured(network) - self.output_lines.append("") - - return '\n'.join(self.output_lines) - - def _convert_network_to_structured(self, network: LadderNetwork) -> None: - """Convierte una red a código estructurado""" - self.output_lines.append(f"// Red {network.network_id}") - - if network.comment: - comment_lines = network.comment.split('\n') - for line in comment_lines: - if line.strip(): - self.output_lines.append(f"// {line.strip()}") - - # Procesar elementos de la red - conditions = [] - assignments = [] - - for element in network.elements: - if element.element_type == ElementType.CONTACT: - condition = element.name - if not element.positive: - condition = f"NOT {condition}" - conditions.append(condition) - - elif element.element_type == ElementType.ASSIGN: - if conditions: - condition_str = " AND ".join(conditions) - self.output_lines.append(f"IF {condition_str} THEN") - self.output_lines.append(f" {element.name} := {element.expression};") - self.output_lines.append("END_IF;") - else: - self.output_lines.append(f"{element.name} := {element.expression};") - - elif element.element_type == ElementType.FUNCTION_BLOCK: - params_str = ", ".join(element.parameters) if element.parameters else "" - if conditions: - condition_str = " AND ".join(conditions) - self.output_lines.append(f"IF {condition_str} THEN") - self.output_lines.append(f" {element.name}({params_str});") - self.output_lines.append("END_IF;") - else: - self.output_lines.append(f"{element.name}({params_str});") - - elif element.element_type == ElementType.OPERATOR: - if element.name == "AND": - # AND lógico - ya manejado en condiciones - pass - elif element.name == "OR": - # OR lógico - cambiar estrategia de condiciones - if len(conditions) > 1: - last_condition = conditions.pop() - conditions[-1] = f"({conditions[-1]} OR {last_condition})" - else: - # Otros operadores matemáticos - if element.parameters and len(element.parameters) >= 2: - expr = f"{element.parameters[0]} {element.name} {element.parameters[1]}" - if conditions: - condition_str = " AND ".join(conditions) - self.output_lines.append(f"IF {condition_str} THEN") - self.output_lines.append(f" // Resultado := {expr};") - self.output_lines.append("END_IF;") - else: - self.output_lines.append(f"// Resultado := {expr};") - - def save_to_file(self, output_path: str) -> None: - """Guarda el código estructurado a un archivo""" - structured_code = self.convert_to_structured() - - with open(output_path, 'w', encoding='utf-8') as f: - f.write(structured_code) - - print(f"Código pseudo estructurado guardado en: {output_path}") - - -def main(): - """Función principal""" - import sys - - if len(sys.argv) != 3: - print("Uso: python lad_to_pseudocode_converter.py ") - sys.exit(1) - - input_file = sys.argv[1] - output_file = sys.argv[2] - - try: - converter = LadToStructuredConverter() - converter.parse_file(input_file) - converter.save_to_file(output_file) - - print(f"Conversión completada exitosamente!") - print(f"Redes procesadas: {len(converter.networks)}") - - except Exception as e: - print(f"Error durante la conversión: {e}") - sys.exit(1) - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/backend/script_groups/TwinCat/lad_to_pseudocode_converter_enhanced.py b/backend/script_groups/TwinCat/lad_to_pseudocode_converter_enhanced.py deleted file mode 100644 index aa1c64a..0000000 --- a/backend/script_groups/TwinCat/lad_to_pseudocode_converter_enhanced.py +++ /dev/null @@ -1,429 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Convertidor mejorado de código LAD (Ladder Diagram) de TwinCAT a pseudocódigo estructurado -Compatible con IEC61131-3 - -Autor: Asistente AI -Fecha: 2024 -""" - -import re -from typing import List, Dict, Any, Optional, Tuple -from dataclasses import dataclass, field -from enum import Enum - - -class LadElementType(Enum): - CONTACT = "CONTACT" - COIL = "COIL" - FUNCTION_BLOCK = "FUNCTION_BLOCK" - OPERATOR = "OPERATOR" - ASSIGNMENT = "ASSIGNMENT" - CONDITION = "CONDITION" - - -@dataclass -class LadVariable: - """Representa una variable en LAD""" - name: str - negated: bool = False - value: str = "" - - -@dataclass -class LadExpression: - """Representa una expresión en LAD""" - operator: str = "" - operands: List[str] = field(default_factory=list) - result_var: str = "" - - -@dataclass -class LadNetwork: - """Representa una red LAD completa""" - id: int - comment: str = "" - contacts: List[LadVariable] = field(default_factory=list) - operators: List[str] = field(default_factory=list) - function_blocks: List[Dict] = field(default_factory=list) - expressions: List[LadExpression] = field(default_factory=list) - outputs: List[LadVariable] = field(default_factory=list) - assignments: List[Dict] = field(default_factory=list) - - -class EnhancedLadConverter: - """Convertidor mejorado de LAD a código estructurado""" - - def __init__(self): - self.networks: List[LadNetwork] = [] - self.output_lines: List[str] = [] - - def parse_file(self, file_path: str) -> None: - """Parse el archivo LAD completo""" - with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: - content = f.read() - - # Encontrar la sección LAD - lad_start = content.find('_LD_BODY') - if lad_start == -1: - raise ValueError("No se encontró sección _LD_BODY en el archivo") - - # Extraer solo la sección LAD hasta END_PROGRAM - lad_end = content.find('END_PROGRAM', lad_start) - if lad_end == -1: - lad_content = content[lad_start:] - else: - lad_content = content[lad_start:lad_end] - - self._parse_networks(lad_content) - - def _parse_networks(self, content: str) -> None: - """Parse todas las redes del contenido LAD""" - lines = content.split('\n') - i = 0 - network_id = 0 - - while i < len(lines): - if lines[i].strip() == '_NETWORK': - network_id += 1 - i = self._parse_single_network(lines, i, network_id) - else: - i += 1 - - def _parse_single_network(self, lines: List[str], start_idx: int, network_id: int) -> int: - """Parse una red individual""" - network = LadNetwork(id=network_id) - i = start_idx + 1 - - # Parse comment - if i < len(lines) and lines[i].strip() == '_COMMENT': - i, comment = self._extract_comment(lines, i) - network.comment = comment - - # Parse network content - while i < len(lines): - line = lines[i].strip() - - if line == '_NETWORK': - # Inicio de nueva red - break - elif line == '_LD_ASSIGN': - i = self._parse_assignment_block(lines, i, network) - elif line == '_LD_CONTACT': - i = self._parse_contact(lines, i, network) - elif line == '_LD_AND' or line == '_LD_OR': - network.operators.append(line.replace('_LD_', '')) - i += 1 - elif line.startswith('_FUNCTIONBLOCK'): - i = self._parse_function_block_detailed(lines, i, network) - else: - i += 1 - - self.networks.append(network) - return i - - def _extract_comment(self, lines: List[str], start_idx: int) -> Tuple[int, str]: - """Extrae comentario de la red""" - i = start_idx + 1 - comment_lines = [] - - while i < len(lines): - line = lines[i].strip() - if line == '_END_COMMENT': - break - if line and not line.startswith('_'): - comment_lines.append(line) - i += 1 - - return i + 1, ' '.join(comment_lines) - - def _parse_contact(self, lines: List[str], start_idx: int, network: LadNetwork) -> int: - """Parse un contacto""" - i = start_idx + 1 - - if i < len(lines): - contact_name = lines[i].strip() - i += 1 - - # Verificar polaridad - negated = False - if i < len(lines) and lines[i].strip() == '_NEGATIV': - negated = True - - contact = LadVariable(name=contact_name, negated=negated) - network.contacts.append(contact) - - return i + 1 - - def _parse_assignment_block(self, lines: List[str], start_idx: int, network: LadNetwork) -> int: - """Parse un bloque de asignación completo""" - i = start_idx + 1 - assignment_data = { - 'conditions': [], - 'target_var': '', - 'source_expr': '', - 'operator': '', - 'operands': [], - 'function_name': '', - 'fb_params': [] - } - - while i < len(lines): - line = lines[i].strip() - - if line == '_NETWORK' or line == 'ENABLELIST_END': - break - elif line == '_LD_CONTACT': - i += 1 - if i < len(lines): - contact_name = lines[i].strip() - i += 1 - negated = False - if i < len(lines) and lines[i].strip() == '_NEGATIV': - negated = True - i += 1 - assignment_data['conditions'].append(LadVariable(contact_name, negated)) - elif line.startswith('_FUNCTIONBLOCK'): - # Parse function block dentro de asignación - i, fb_data = self._parse_functionblock_in_assignment(lines, i) - assignment_data.update(fb_data) - elif line.startswith('_OPERATOR'): - i, op_data = self._parse_operator_in_assignment(lines, i) - assignment_data.update(op_data) - elif line.startswith('_OUTPUT'): - # Siguiente información es sobre la salida - i = self._parse_output_info(lines, i, assignment_data) - elif not line.startswith('_') and line and 'ENABLELIST' not in line: - # Variable o expresión - if not assignment_data['target_var']: - assignment_data['source_expr'] = line - else: - i += 1 - - if assignment_data['target_var'] or assignment_data['function_name']: - network.assignments.append(assignment_data) - - return i - - def _parse_functionblock_in_assignment(self, lines: List[str], start_idx: int) -> Tuple[int, Dict]: - """Parse function block dentro de asignación""" - i = start_idx + 1 - fb_data = {'function_name': '', 'fb_params': []} - - if i < len(lines): - fb_data['function_name'] = lines[i].strip() - i += 1 - - # Buscar parámetros - while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'): - line = lines[i].strip() - if line.startswith('_OPERAND'): - i += 2 # Saltar _EXPRESSION y _POSITIV/_NEGATIV - if i < len(lines): - param = lines[i].strip() - fb_data['fb_params'].append(param) - i += 1 - - return i, fb_data - - def _parse_operator_in_assignment(self, lines: List[str], start_idx: int) -> Tuple[int, Dict]: - """Parse operador dentro de asignación""" - i = start_idx + 1 - op_data = {'operator': '', 'operands': []} - - # Buscar operandos y operador - while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'): - line = lines[i].strip() - - if line.startswith('_OPERAND'): - i += 2 # Saltar _EXPRESSION y _POSITIV/_NEGATIV - if i < len(lines): - operand = lines[i].strip() - op_data['operands'].append(operand) - elif line in ['ADD', 'SUB', 'MUL', 'DIV', 'AND', 'OR', 'LT', 'GT', 'EQ', 'SEL', 'MOVE']: - op_data['operator'] = line - i += 1 - - return i, op_data - - def _parse_output_info(self, lines: List[str], start_idx: int, assignment_data: Dict) -> int: - """Parse información de salida""" - i = start_idx + 1 - - # Saltar información de salida hasta encontrar la variable - while i < len(lines): - line = lines[i].strip() - if not line.startswith('_') and line and 'ENABLELIST' not in line: - assignment_data['target_var'] = line - break - i += 1 - - return i + 1 - - def _parse_function_block_detailed(self, lines: List[str], start_idx: int, network: LadNetwork) -> int: - """Parse function block detallado""" - i = start_idx + 1 - fb_data = {'name': '', 'params': [], 'outputs': []} - - if i < len(lines): - fb_data['name'] = lines[i].strip() - i += 1 - - # Parse parámetros y salidas - while i < len(lines): - line = lines[i].strip() - if line.startswith('_OPERAND'): - i += 2 - if i < len(lines): - param = lines[i].strip() - fb_data['params'].append(param) - elif line.startswith('_OUTPUT'): - i += 3 # Saltar información de salida - if i < len(lines): - output = lines[i].strip() - fb_data['outputs'].append(output) - elif line == '_NETWORK': - break - i += 1 - - network.function_blocks.append(fb_data) - return i - - def convert_to_structured(self) -> str: - """Convierte las redes a código pseudo estructurado""" - self.output_lines = [ - "// Código pseudo estructurado generado desde LAD TwinCAT", - "// Compatible con IEC61131-3", - "PROGRAM Input_Converted", - "" - ] - - for network in self.networks: - self._convert_network_to_structured(network) - - self.output_lines.append("END_PROGRAM") - return '\n'.join(self.output_lines) - - def _convert_network_to_structured(self, network: LadNetwork) -> None: - """Convierte una red a código estructurado""" - self.output_lines.append(f" // Red {network.id}") - - if network.comment: - self.output_lines.append(f" // {network.comment}") - - # Procesar asignaciones de la red - for assignment in network.assignments: - self._convert_assignment_to_structured(assignment) - - # Procesar function blocks independientes - for fb in network.function_blocks: - self._convert_function_block_to_structured(fb) - - self.output_lines.append("") - - def _convert_assignment_to_structured(self, assignment: Dict) -> None: - """Convierte una asignación a código estructurado""" - conditions = assignment.get('conditions', []) - target_var = assignment.get('target_var', '') - source_expr = assignment.get('source_expr', '') - operator = assignment.get('operator', '') - operands = assignment.get('operands', []) - function_name = assignment.get('function_name', '') - fb_params = assignment.get('fb_params', []) - - # Construir condiciones - condition_str = "" - if conditions: - condition_parts = [] - for cond in conditions: - if cond.negated: - condition_parts.append(f"NOT {cond.name}") - else: - condition_parts.append(cond.name) - condition_str = " AND ".join(condition_parts) - - # Construir la expresión de asignación - if function_name: - # Function block call - params_str = ", ".join(fb_params) if fb_params else "" - if target_var: - expr = f"{target_var} := {function_name}({params_str})" - else: - expr = f"{function_name}({params_str})" - elif operator and operands: - # Operación matemática/lógica - if len(operands) == 2: - if operator == "SEL": - expr = f"{target_var} := SEL({operands[0]}, {operands[1]}, condition)" - else: - expr = f"{target_var} := {operands[0]} {operator} {operands[1]}" - elif len(operands) == 1: - if operator == "MOVE": - expr = f"{target_var} := {operands[0]}" - else: - expr = f"{target_var} := {operator}({operands[0]})" - else: - expr = f"{target_var} := {operator}({', '.join(operands)})" - elif source_expr and target_var: - # Asignación simple - expr = f"{target_var} := {source_expr}" - else: - return # Sin información suficiente - - # Generar código estructurado - if condition_str: - self.output_lines.append(f" IF {condition_str} THEN") - self.output_lines.append(f" {expr};") - self.output_lines.append(f" END_IF;") - else: - self.output_lines.append(f" {expr};") - - def _convert_function_block_to_structured(self, fb: Dict) -> None: - """Convierte un function block a código estructurado""" - name = fb.get('name', '') - params = fb.get('params', []) - - if name: - params_str = ", ".join(params) if params else "" - self.output_lines.append(f" {name}({params_str});") - - def save_to_file(self, output_path: str) -> None: - """Guarda el código estructurado a un archivo""" - structured_code = self.convert_to_structured() - - with open(output_path, 'w', encoding='utf-8') as f: - f.write(structured_code) - - print(f"Código pseudo estructurado guardado en: {output_path}") - - -def main(): - """Función principal""" - import sys - - if len(sys.argv) != 3: - print("Uso: python lad_to_pseudocode_converter_enhanced.py ") - return - - input_file = sys.argv[1] - output_file = sys.argv[2] - - try: - converter = EnhancedLadConverter() - converter.parse_file(input_file) - converter.save_to_file(output_file) - - print(f"Conversión completada exitosamente!") - print(f"Redes procesadas: {len(converter.networks)}") - - # Mostrar estadísticas por red - for network in converter.networks[:5]: # Primeras 5 redes - print(f"Red {network.id}: {len(network.assignments)} asignaciones, {len(network.contacts)} contactos") - - except Exception as e: - print(f"Error durante la conversión: {e}") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/backend/script_groups/TwinCat/simple_lad_converter.py b/backend/script_groups/TwinCat/simple_lad_converter.py index c466cd0..04b9713 100644 --- a/backend/script_groups/TwinCat/simple_lad_converter.py +++ b/backend/script_groups/TwinCat/simple_lad_converter.py @@ -39,13 +39,13 @@ class SimpleLadConverter: i += 1 def _parse_network(self, lines, start_idx): - """Parse una red individual""" + """Parse una red individual con soporte mejorado para operadores LAD""" network = { 'id': self.current_network_id, 'comment': '', - 'contacts': [], - 'assignments': [], - 'outputs': [] + 'logic': None, + 'target': '', + 'function_blocks': [] } i = start_idx + 1 @@ -56,86 +56,139 @@ class SimpleLadConverter: network['comment'] = comment # Parse contenido de la red - current_assignment = { - 'conditions': [], - 'target': '', - 'expression': '', - 'function_block': '', - 'operator': '', - 'operands': [] - } - while i < len(lines): line = lines[i].strip() if line == '_NETWORK': break elif line == '_LD_ASSIGN': - if current_assignment['target'] or current_assignment['function_block']: - network['assignments'].append(current_assignment.copy()) - current_assignment = { - 'conditions': [], - 'target': '', - 'expression': '', - 'function_block': '', - 'operator': '', - 'operands': [] - } i += 1 - elif line == '_LD_CONTACT': - i += 1 - if i < len(lines): - contact_name = lines[i].strip() - i += 1 - negated = False - if i < len(lines) and lines[i].strip() == '_NEGATIV': - negated = True - i += 1 - current_assignment['conditions'].append({ - 'name': contact_name, - 'negated': negated - }) - elif line.startswith('_FUNCTIONBLOCK'): - i += 1 - if i < len(lines): - current_assignment['function_block'] = lines[i].strip() - i += 1 - elif line.startswith('_OPERATOR'): - i += 1 - # Buscar operador y operandos - while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'): - subline = lines[i].strip() - if subline in ['ADD', 'SUB', 'MUL', 'DIV', 'AND', 'OR', 'LT', 'GT', 'EQ', 'SEL', 'MOVE']: - current_assignment['operator'] = subline - elif subline.startswith('_OPERAND'): - i += 2 # Saltar _EXPRESSION y _POSITIV - if i < len(lines): - operand = lines[i].strip() - current_assignment['operands'].append(operand) - i += 1 - continue + # Parsear la lógica LAD después de _LD_ASSIGN + i, logic = self._parse_lad_expression(lines, i) + network['logic'] = logic elif line.startswith('_OUTPUT'): # Buscar variable de salida i += 1 while i < len(lines) and lines[i].strip().startswith('_'): i += 1 - if i < len(lines): - current_assignment['target'] = lines[i].strip() - i += 1 - elif not line.startswith('_') and line and 'ENABLELIST' not in line: - if not current_assignment['expression']: - current_assignment['expression'] = line + if i < len(lines) and lines[i].strip() and 'ENABLELIST' not in lines[i]: + network['target'] = lines[i].strip() i += 1 else: i += 1 - # Agregar última asignación si existe - if current_assignment['target'] or current_assignment['function_block']: - network['assignments'].append(current_assignment) - self.networks.append(network) return i + def _parse_lad_expression(self, lines, start_idx): + """Parse una expresión LAD recursivamente""" + i = start_idx + + while i < len(lines): + line = lines[i].strip() + + if line == '_LD_AND': + return self._parse_and_expression(lines, i + 1) + elif line == '_LD_OR': + return self._parse_or_expression(lines, i + 1) + elif line == '_LD_CONTACT': + return self._parse_contact(lines, i + 1) + elif line.startswith('_FUNCTIONBLOCK'): + return self._parse_function_block(lines, i) + elif line.startswith('_OUTPUT') or line == 'ENABLELIST : 0': + break + else: + i += 1 + + return i, None + + def _parse_and_expression(self, lines, start_idx): + """Parse una expresión AND""" + i = start_idx + operands = [] + + # Buscar operadores + if i < len(lines) and lines[i].strip().startswith('_LD_OPERATOR'): + # Extraer número de operandos + operator_line = lines[i].strip() + num_operands = int(operator_line.split(':')[-1].strip()) if ':' in operator_line else 2 + i += 1 + + # Parse cada operando + for _ in range(num_operands): + i, operand = self._parse_lad_expression(lines, i) + if operand: + operands.append(operand) + + return i, {'type': 'AND', 'operands': operands} + + def _parse_or_expression(self, lines, start_idx): + """Parse una expresión OR""" + i = start_idx + operands = [] + + # Buscar operadores + if i < len(lines) and lines[i].strip().startswith('_LD_OPERATOR'): + # Extraer número de operandos + operator_line = lines[i].strip() + num_operands = int(operator_line.split(':')[-1].strip()) if ':' in operator_line else 2 + i += 1 + + # Parse cada operando + for _ in range(num_operands): + i, operand = self._parse_lad_expression(lines, i) + if operand: + operands.append(operand) + + return i, {'type': 'OR', 'operands': operands} + + def _parse_contact(self, lines, start_idx): + """Parse un contacto LAD""" + i = start_idx + contact_name = "" + negated = False + + # Obtener nombre del contacto + if i < len(lines): + contact_name = lines[i].strip() + i += 1 + + # Verificar si hay expresión + if i < len(lines) and lines[i].strip() == '_EXPRESSION': + i += 1 + # Verificar si está negado + if i < len(lines): + if lines[i].strip() == '_NEGATIV': + negated = True + i += 1 + elif lines[i].strip() == '_POSITIV': + i += 1 + + return i, {'type': 'CONTACT', 'name': contact_name, 'negated': negated} + + def _parse_function_block(self, lines, start_idx): + """Parse un bloque de función""" + i = start_idx + 1 + fb_name = "" + inputs = [] + + if i < len(lines): + fb_name = lines[i].strip() + i += 1 + + # Parse inputs del function block + while i < len(lines) and not lines[i].strip().startswith('_OUTPUT'): + line = lines[i].strip() + if line.startswith('_OPERAND'): + i += 2 # Saltar _EXPRESSION + if i < len(lines): + inputs.append(lines[i].strip()) + i += 1 + else: + i += 1 + + return i, {'type': 'FUNCTION_BLOCK', 'name': fb_name, 'inputs': inputs} + def _parse_comment(self, lines, start_idx): """Parse comentario""" i = start_idx + 1 @@ -156,7 +209,7 @@ class SimpleLadConverter: output = [] output.append("// Código pseudo estructurado generado desde LAD TwinCAT") output.append("// Compatible con IEC61131-3") - output.append("PROGRAM Input_Converted") + output.append("PROGRAM PumpControl_Converted") output.append("") for network in self.networks: @@ -164,60 +217,66 @@ class SimpleLadConverter: if network['comment']: output.append(f" // {network['comment']}") - for assignment in network['assignments']: - structured_line = self._convert_assignment(assignment) - if structured_line: - if assignment['conditions']: - # Construir condición - conditions = [] - for cond in assignment['conditions']: - if cond['negated']: - conditions.append(f"NOT {cond['name']}") - else: - conditions.append(cond['name']) - condition_str = " AND ".join(conditions) - output.append(f" IF {condition_str} THEN") - output.append(f" {structured_line};") - output.append(" END_IF;") - else: - output.append(f" {structured_line};") + if network['logic'] and network['target']: + condition_str = self._convert_logic_to_string(network['logic']) + if condition_str: + output.append(f" IF {condition_str} THEN") + output.append(f" {network['target']} := TRUE;") + output.append(" ELSE") + output.append(f" {network['target']} := FALSE;") + output.append(" END_IF;") + else: + output.append(f" {network['target']} := TRUE; // Logic no reconocida") output.append("") output.append("END_PROGRAM") return '\n'.join(output) - def _convert_assignment(self, assignment): - """Convertir una asignación a código estructurado""" - target = assignment['target'] - expression = assignment['expression'] - function_block = assignment['function_block'] - operator = assignment['operator'] - operands = assignment['operands'] + def _convert_logic_to_string(self, logic): + """Convertir lógica LAD a string estructurado""" + if not logic: + return "" - if function_block: - params = ", ".join(operands) if operands else "" - if target: - return f"{target} := {function_block}({params})" + if logic['type'] == 'CONTACT': + if logic['negated']: + return f"NOT {logic['name']}" else: - return f"{function_block}({params})" + return logic['name'] - elif operator and operands: - if len(operands) >= 2: - if operator == "SEL": - return f"{target} := SEL({operands[0]}, {operands[1]}, condition)" - else: - return f"{target} := {operands[0]} {operator} {operands[1]}" - elif len(operands) == 1: - if operator == "MOVE": - return f"{target} := {operands[0]}" - else: - return f"{target} := {operator}({operands[0]})" + elif logic['type'] == 'AND': + operand_strings = [] + for operand in logic['operands']: + operand_str = self._convert_logic_to_string(operand) + if operand_str: + operand_strings.append(operand_str) + + if len(operand_strings) > 1: + return "(" + " AND ".join(operand_strings) + ")" + elif len(operand_strings) == 1: + return operand_strings[0] + else: + return "" - elif expression and target: - return f"{target} := {expression}" + elif logic['type'] == 'OR': + operand_strings = [] + for operand in logic['operands']: + operand_str = self._convert_logic_to_string(operand) + if operand_str: + operand_strings.append(operand_str) + + if len(operand_strings) > 1: + return "(" + " OR ".join(operand_strings) + ")" + elif len(operand_strings) == 1: + return operand_strings[0] + else: + return "" - return None + elif logic['type'] == 'FUNCTION_BLOCK': + inputs_str = ", ".join(logic['inputs']) if logic['inputs'] else "" + return f"{logic['name']}({inputs_str})" + + return "" def save_to_file(self, output_path): """Guardar código estructurado""" @@ -229,35 +288,77 @@ class SimpleLadConverter: print(f"Código guardado en: {output_path}") return structured_code + def print_debug_info(self): + """Mostrar información de debug sobre los networks parseados""" + print(f"\n=== DEBUG INFO - {len(self.networks)} networks encontrados ===") + + for network in self.networks: + print(f"\nRed {network['id']}:") + if network['comment']: + print(f" Comentario: {network['comment']}") + print(f" Target: {network['target']}") + + if network['logic']: + print(f" Lógica: {self._debug_logic_string(network['logic'])}") + condition_str = self._convert_logic_to_string(network['logic']) + print(f" Condición: {condition_str}") + else: + print(" Sin lógica") + + def _debug_logic_string(self, logic, indent=0): + """Crear string de debug para la lógica""" + if not logic: + return "None" + + prefix = " " * indent + + if logic['type'] == 'CONTACT': + neg_str = " (NEGADO)" if logic['negated'] else "" + return f"{prefix}CONTACT: {logic['name']}{neg_str}" + + elif logic['type'] == 'AND': + result = f"{prefix}AND:\n" + for operand in logic['operands']: + result += self._debug_logic_string(operand, indent + 1) + "\n" + return result.rstrip() + + elif logic['type'] == 'OR': + result = f"{prefix}OR:\n" + for operand in logic['operands']: + result += self._debug_logic_string(operand, indent + 1) + "\n" + return result.rstrip() + + elif logic['type'] == 'FUNCTION_BLOCK': + return f"{prefix}FUNCTION_BLOCK: {logic['name']} inputs: {logic['inputs']}" + + return f"{prefix}UNKNOWN: {logic}" + def main(): """Función principal""" converter = SimpleLadConverter() try: - print("=== Convertidor LAD Simplificado ===") - print("Parseando archivo INPUT.EXP...") + print("=== Convertidor LAD Mejorado ===") + print("Parseando archivo _PUMPCONTROL.EXP...") converter.parse_file(".example/_PUMPCONTROL.EXP") print(f"Redes encontradas: {len(converter.networks)}") - # Mostrar algunas estadísticas - for i, network in enumerate(converter.networks[:5]): - print(f"Red {network['id']}: {len(network['assignments'])} asignaciones") - if network['comment']: - print(f" Comentario: {network['comment']}") + # Mostrar información de debug + converter.print_debug_info() # Convertir y guardar print("\nGenerando código estructurado...") - structured_code = converter.save_to_file("simple_output.txt") + structured_code = converter.save_to_file("pump_control_output.txt") - # Mostrar primeras líneas + # Mostrar el código generado lines = structured_code.split('\n') - print(f"\nPrimeras {min(30, len(lines))} líneas:") - for i, line in enumerate(lines[:30]): + print(f"\nCódigo generado ({len(lines)} líneas):") + for i, line in enumerate(lines): print(f"{i+1:3d}: {line}") - print(f"\n✓ Conversión completada! Total de líneas: {len(lines)}") + print(f"\n✓ Conversión completada!") except Exception as e: print(f"Error: {e}") diff --git a/backend/script_groups/TwinCat/test_enhanced_converter.py b/backend/script_groups/TwinCat/test_enhanced_converter.py deleted file mode 100644 index d9a1acf..0000000 --- a/backend/script_groups/TwinCat/test_enhanced_converter.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Script de prueba para el convertidor LAD mejorado -""" - -from lad_to_pseudocode_converter_enhanced import EnhancedLadConverter - -def test_enhanced_converter(): - """Prueba el convertidor mejorado""" - print("=== Convertidor LAD Mejorado ===") - - try: - # Crear el convertidor - converter = EnhancedLadConverter() - - # Parsear el archivo - print("Parseando INPUT.EXP...") - converter.parse_file(".example/INPUT.EXP") - - print(f"Redes encontradas: {len(converter.networks)}") - - # Mostrar detalles de las primeras redes - for i, network in enumerate(converter.networks[:10]): - print(f"\n--- Red {network.id} ---") - print(f"Comentario: {network.comment}") - print(f"Contactos: {len(network.contacts)}") - print(f"Asignaciones: {len(network.assignments)}") - print(f"Function Blocks: {len(network.function_blocks)}") - - # Mostrar detalles de asignaciones - for j, assignment in enumerate(network.assignments[:3]): - print(f" Asignación {j+1}:") - if assignment['conditions']: - conditions = [f"{'NOT ' if c.negated else ''}{c.name}" for c in assignment['conditions']] - print(f" Condiciones: {' AND '.join(conditions)}") - if assignment['target_var']: - print(f" Variable destino: {assignment['target_var']}") - if assignment['function_name']: - print(f" Function Block: {assignment['function_name']}") - if assignment['operator']: - print(f" Operador: {assignment['operator']}") - if assignment['operands']: - print(f" Operandos: {assignment['operands']}") - - # Convertir a pseudocódigo - print("\n=== Generando pseudocódigo ===") - structured_code = converter.convert_to_structured() - - # Guardar archivo - output_file = "INPUT_enhanced_pseudocode.txt" - converter.save_to_file(output_file) - - # Mostrar las primeras líneas - print("\n=== Primeras líneas del resultado ===") - lines = structured_code.split('\n') - for i, line in enumerate(lines[:50]): - print(f"{i+1:3d}: {line}") - - return True - - except Exception as e: - print(f"Error: {e}") - import traceback - traceback.print_exc() - return False - -if __name__ == "__main__": - test_enhanced_converter() \ No newline at end of file diff --git a/backend/script_groups/TwinCat/test_simple.py b/backend/script_groups/TwinCat/test_simple.py deleted file mode 100644 index 79f9cb1..0000000 --- a/backend/script_groups/TwinCat/test_simple.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python3 - -from simple_lad_converter import SimpleLadConverter - -converter = SimpleLadConverter() -print("Iniciando conversión...") - -try: - converter.parse_file(".example/INPUT.EXP") - print(f"Redes encontradas: {len(converter.networks)}") - - structured_code = converter.save_to_file("output_simple.txt") - print("Conversión completada!") - - # Mostrar primeras líneas - lines = structured_code.split('\n') - for i, line in enumerate(lines[:20]): - print(f"{i+1:2d}: {line}") - -except Exception as e: - print(f"Error: {e}") - import traceback - traceback.print_exc() \ No newline at end of file diff --git a/backend/script_groups/TwinCat/x2_process.py b/backend/script_groups/TwinCat/x2_process.py new file mode 100644 index 0000000..e840d01 --- /dev/null +++ b/backend/script_groups/TwinCat/x2_process.py @@ -0,0 +1,585 @@ +""" +LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL + +Este script convierte un archivo JSON simplificado (resultado de un análisis de un XML de Siemens) a un +JSON enriquecido con lógica SCL. Se enfoca en la lógica de programación y la agrupación de instrucciones IF. + +""" +# -*- coding: utf-8 -*- +import json +import argparse +import os +import copy +import traceback +import re +import importlib +import sys +import sympy +script_root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +) +sys.path.append(script_root) +from backend.script_utils import load_configuration + +# Import necessary components from processors directory +from processors.processor_utils import format_variable_name, sympy_expr_to_scl +from processors.symbol_manager import SymbolManager + +# --- Constantes y Configuración --- +SCL_SUFFIX = "_sympy_processed" +GROUPED_COMMENT = "// Logic included in grouped IF" +SIMPLIFIED_IF_COMMENT = "// Simplified IF condition by script" + +# Global data dictionary +data = {} + + +# --- (process_group_ifs y load_processors SIN CAMBIOS) --- +def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data): + instr_uid = instruction["instruction_uid"] + instr_type_original = ( + instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "") + ) + made_change = False + if ( + not instruction.get("type", "").endswith(SCL_SUFFIX) + or "_error" in instruction.get("type", "") + or instruction.get("grouped", False) + or instr_type_original + not in [ + "Contact", + "O", + "Eq", + "Ne", + "Gt", + "Lt", + "Ge", + "Le", + "PBox", + "NBox", + "And", + "Xor", + "Not", + ] + ): + return False + current_scl = instruction.get("scl", "") + if ( + current_scl.strip().startswith("IF") + and "END_IF;" in current_scl + and GROUPED_COMMENT not in current_scl + ): + return False + map_key_out = (network_id, instr_uid, "out") + sympy_condition_expr = sympy_map.get(map_key_out) + if sympy_condition_expr is None or sympy_condition_expr in [ + sympy.true, + sympy.false, + ]: + return False + + grouped_instructions_cores = [] + consumer_instr_list = [] + network_logic = next( + (net["logic"] for net in data["networks"] if net["id"] == network_id), [] + ) + if not network_logic: + return False + groupable_types = [ + "Move", + "Add", + "Sub", + "Mul", + "Div", + "Mod", + "Convert", + "Call_FC", + "Call_FB", + "SCoil", + "RCoil", + "BLKMOV", + "TON", + "TOF", + "TP", + "Se", + "Sd", + "CTU", + "CTD", + "CTUD", + ] + for consumer_instr in network_logic: + consumer_uid = consumer_instr["instruction_uid"] + if consumer_instr.get("grouped", False) or consumer_uid == instr_uid: + continue + consumer_en = consumer_instr.get("inputs", {}).get("en") + consumer_type = consumer_instr.get("type", "") + consumer_type_original = consumer_type.replace(SCL_SUFFIX, "").replace( + "_error", "" + ) + is_enabled_by_us = False + if ( + isinstance(consumer_en, dict) + and consumer_en.get("type") == "connection" + and consumer_en.get("source_instruction_uid") == instr_uid + and consumer_en.get("source_pin") == "out" + ): + is_enabled_by_us = True + if ( + is_enabled_by_us + and consumer_type.endswith(SCL_SUFFIX) + and consumer_type_original in groupable_types + ): + consumer_scl = consumer_instr.get("scl", "") + core_scl = None + if consumer_scl: + if consumer_scl.strip().startswith("IF"): + match = re.search( + r"IF\s+.*?THEN\s*(.*?)\s*END_IF;", + consumer_scl, + re.DOTALL | re.IGNORECASE, + ) + core_scl = match.group(1).strip() if match else None + elif not consumer_scl.strip().startswith("//"): + core_scl = consumer_scl.strip() + if core_scl: + grouped_instructions_cores.append(core_scl) + consumer_instr_list.append(consumer_instr) + if len(grouped_instructions_cores) > 1: + print( + f"INFO: Agrupando {len(grouped_instructions_cores)} instr. bajo condición de {instr_type_original} UID {instr_uid}" + ) + try: + simplified_expr = sympy.logic.boolalg.to_dnf( + sympy_condition_expr, simplify=True + ) + except Exception as e: + print(f"Error simplifying condition for grouping UID {instr_uid}: {e}") + simplified_expr = sympy_condition_expr + condition_scl_simplified = sympy_expr_to_scl(simplified_expr, symbol_manager) + scl_grouped_lines = [f"IF {condition_scl_simplified} THEN"] + for core_line in grouped_instructions_cores: + indented_core = "\n".join( + [f" {line.strip()}" for line in core_line.splitlines()] + ) + scl_grouped_lines.append(indented_core) + scl_grouped_lines.append("END_IF;") + final_grouped_scl = "\n".join(scl_grouped_lines) + instruction["scl"] = final_grouped_scl + for consumer_instr in consumer_instr_list: + consumer_instr["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})" + consumer_instr["grouped"] = True + made_change = True + return made_change + + +def load_processors(processors_dir="processors"): + processor_map = {} + processor_list_unsorted = [] + default_priority = 10 + if not os.path.isdir(processors_dir): + print(f"Error: Directorio de procesadores no encontrado: '{processors_dir}'") + return processor_map, [] + print(f"Cargando procesadores desde: '{processors_dir}'") + processors_package = os.path.basename(processors_dir) + for filename in os.listdir(processors_dir): + if filename.startswith("process_") and filename.endswith(".py"): + module_name_rel = filename[:-3] + full_module_name = f"{processors_package}.{module_name_rel}" + try: + module = importlib.import_module(full_module_name) + if hasattr(module, "get_processor_info") and callable( + module.get_processor_info + ): + processor_info = module.get_processor_info() + info_list = [] + if isinstance(processor_info, dict): + info_list = [processor_info] + elif isinstance(processor_info, list): + info_list = processor_info + else: + print( + f" Advertencia: get_processor_info en {full_module_name} devolvió tipo inesperado." + ) + continue + for info in info_list: + if ( + isinstance(info, dict) + and "type_name" in info + and "processor_func" in info + ): + type_name = info["type_name"].lower() + processor_func = info["processor_func"] + priority = info.get("priority", default_priority) + if callable(processor_func): + if type_name in processor_map: + print( + f" Advertencia: '{type_name}' en {full_module_name} sobrescribe definición anterior." + ) + processor_map[type_name] = processor_func + processor_list_unsorted.append( + { + "priority": priority, + "type_name": type_name, + "func": processor_func, + } + ) + else: + print( + f" Advertencia: 'processor_func' para '{type_name}' en {full_module_name} no es callable." + ) + else: + print( + f" Advertencia: Entrada inválida en {full_module_name}: {info}" + ) + else: + print( + f" Advertencia: Módulo {module_name_rel}.py no tiene 'get_processor_info'." + ) + except ImportError as e: + print(f"Error importando {full_module_name}: {e}") + except Exception as e: + print(f"Error procesando {full_module_name}: {e}") + traceback.print_exc() + processor_list_sorted = sorted(processor_list_unsorted, key=lambda x: x["priority"]) + return processor_map, processor_list_sorted + + +# --- Bucle Principal de Procesamiento (MODIFICADO para copiar metadatos) --- +def process_json_to_scl(json_filepath, output_json_filepath): + """ + Lee JSON simplificado, copia metadatos, aplica procesadores dinámicos, + y guarda JSON procesado en la ruta especificada. + """ + global data + + if not os.path.exists(json_filepath): + print( + f"Error Crítico (x2): JSON de entrada no encontrado: {json_filepath}", + file=sys.stderr, + ) + return False + print(f"Cargando JSON desde: {json_filepath}") + try: + with open(json_filepath, "r", encoding="utf-8") as f: + data = json.load(f) # Carga el JSON de entrada + except Exception as e: + print(f"Error Crítico al cargar JSON de entrada: {e}", file=sys.stderr) + traceback.print_exc(file=sys.stderr) + return False + + # <-- NUEVO: Extraer metadatos del JSON de entrada (si existen) --> + source_xml_mod_time = data.get("source_xml_mod_time") + source_xml_size = data.get("source_xml_size") + # <-- FIN NUEVO --> + + block_type = data.get("block_type", "Unknown") + print(f"Procesando bloque tipo: {block_type}") + + if block_type in ["GlobalDB", "PlcUDT", "PlcTagTable", "InstanceDB"]: # <-- MODIFIED: Add InstanceDB + print(f"INFO: El bloque es {block_type}. Saltando procesamiento lógico de x2.") + print( + f"Guardando JSON de {block_type} (con metadatos) en: {output_json_filepath}" + ) + try: + # <-- MODIFICADO: Asegurar que los metadatos se guarden aunque se salte --> + data_to_save = copy.deepcopy(data) # Copiar datos originales + if source_xml_mod_time is not None: + data_to_save["source_xml_mod_time"] = source_xml_mod_time + if source_xml_size is not None: + data_to_save["source_xml_size"] = source_xml_size + # <-- FIN MODIFICADO --> + with open(output_json_filepath, "w", encoding="utf-8") as f_out: + json.dump(data_to_save, f_out, indent=4, ensure_ascii=False) + print(f"Guardado de {block_type} completado.") + return True + except Exception as e: + print(f"Error Crítico al guardar JSON de {block_type}: {e}") + traceback.print_exc(file=sys.stderr) + return False + + print(f"INFO: El bloque es {block_type}. Iniciando procesamiento lógico...") + + script_dir = os.path.dirname(__file__) + processors_dir_path = os.path.join(script_dir, "processors") + processor_map, sorted_processors = load_processors(processors_dir_path) + if not processor_map: + print("Error crítico: No se cargaron procesadores. Abortando.", file=sys.stderr) + return False + + # (Mapas de acceso y bucle iterativo SIN CAMBIOS relevantes, solo pasan 'data' que ya tiene metadatos) + network_access_maps = {} + for network in data.get("networks", []): + net_id = network["id"] + current_access_map = {} + for instr in network.get("logic", []): + for _, source in instr.get("inputs", {}).items(): + sources_to_check = ( + source + if isinstance(source, list) + else ([source] if isinstance(source, dict) else []) + ) + for src in sources_to_check: + if ( + isinstance(src, dict) + and src.get("uid") + and src.get("type") in ["variable", "constant"] + ): + current_access_map[src["uid"]] = src + for _, dest_list in instr.get("outputs", {}).items(): + if isinstance(dest_list, list): + for dest in dest_list: + if ( + isinstance(dest, dict) + and dest.get("uid") + and dest.get("type") in ["variable", "constant"] + ): + current_access_map[dest["uid"]] = dest + network_access_maps[net_id] = current_access_map + + symbol_manager = SymbolManager() + sympy_map = {} + max_passes = 30 + passes = 0 + processing_complete = False + print(f"\n--- Iniciando Bucle de Procesamiento Iterativo ({block_type}) ---") + while passes < max_passes and not processing_complete: + passes += 1 + made_change_in_base_pass = False + made_change_in_group_pass = False + print(f"\n--- Pase {passes} ---") + num_sympy_processed_this_pass = 0 + num_grouped_this_pass = 0 + print(f" Fase 1 (SymPy Base - Orden por Prioridad):") + num_sympy_processed_this_pass = 0 + for processor_info in sorted_processors: + current_type_name = processor_info["type_name"] + func_to_call = processor_info["func"] + for network in data.get("networks", []): + network_id = network["id"] + network_lang = network.get("language", "LAD") + if network_lang == "STL": + continue + access_map = network_access_maps.get(network_id, {}) + network_logic = network.get("logic", []) + for instruction in network_logic: + instr_uid = instruction.get("instruction_uid") + instr_type_current = instruction.get("type", "Unknown") + if ( + instr_type_current.endswith(SCL_SUFFIX) + or "_error" in instr_type_current + or instruction.get("grouped", False) + or instr_type_current + in [ + "RAW_STL_CHUNK", + "RAW_SCL_CHUNK", + "UNSUPPORTED_LANG", + "UNSUPPORTED_CONTENT", + "PARSING_ERROR", + ] + ): + continue + lookup_key = instr_type_current.lower() + effective_type_name = lookup_key + if instr_type_current == "Call": + call_block_type = instruction.get("block_type", "").upper() + if call_block_type == "FC": + effective_type_name = "call_fc" + elif call_block_type == "FB": + effective_type_name = "call_fb" + if effective_type_name == current_type_name: + try: + changed = func_to_call( + instruction, network_id, sympy_map, symbol_manager, data + ) # data se pasa aquí + if changed: + made_change_in_base_pass = True + num_sympy_processed_this_pass += 1 + except Exception as e: + print( + f"ERROR(SymPy Base) al procesar {instr_type_current} UID {instr_uid}: {e}" + ) + traceback.print_exc() + instruction["scl"] = ( + f"// ERROR en SymPy procesador base: {e}" + ) + instruction["type"] = instr_type_current + "_error" + made_change_in_base_pass = True + print( + f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy." + ) + + if made_change_in_base_pass or passes == 1: + print(f" Fase 2 (Agrupación IF con Simplificación):") + num_grouped_this_pass = 0 + for network in data.get("networks", []): + network_id = network["id"] + network_lang = network.get("language", "LAD") + if network_lang == "STL": + continue + network_logic = network.get("logic", []) + uids_in_network = sorted( + [ + instr.get("instruction_uid", "Z") + for instr in network_logic + if instr.get("instruction_uid") + ] + ) + for uid_to_process in uids_in_network: + instruction = next( + ( + instr + for instr in network_logic + if instr.get("instruction_uid") == uid_to_process + ), + None, + ) + if not instruction: + continue + if instruction.get("grouped") or "_error" in instruction.get( + "type", "" + ): + continue + if instruction.get("type", "").endswith(SCL_SUFFIX): + try: + group_changed = process_group_ifs( + instruction, network_id, sympy_map, symbol_manager, data + ) # data se pasa aquí + if group_changed: + made_change_in_group_pass = True + num_grouped_this_pass += 1 + except Exception as e: + print( + f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}" + ) + traceback.print_exc() + print( + f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL)." + ) + + if not made_change_in_base_pass and not made_change_in_group_pass: + print( + f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---" + ) + processing_complete = True + else: + print( + f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando..." + ) + if passes == max_passes and not processing_complete: + print(f"\n--- ADVERTENCIA: Límite de {max_passes} pases alcanzado...") + + # --- Verificación Final y Guardado JSON --- + print(f"\n--- Verificación Final de Instrucciones No Procesadas ({block_type}) ---") + unprocessed_count = 0 + unprocessed_details = [] + ignored_types = [ + "raw_scl_chunk", + "unsupported_lang", + "raw_stl_chunk", + "unsupported_content", + "parsing_error", + ] + for network in data.get("networks", []): + network_id = network.get("id", "Unknown ID") + network_title = network.get("title", f"Network {network_id}") + network_lang = network.get("language", "LAD") + if network_lang == "STL": + continue + for instruction in network.get("logic", []): + instr_uid = instruction.get("instruction_uid", "Unknown UID") + instr_type = instruction.get("type", "Unknown Type") + is_grouped = instruction.get("grouped", False) + if ( + not instr_type.endswith(SCL_SUFFIX) + and "_error" not in instr_type + and not is_grouped + and instr_type.lower() not in ignored_types + ): + unprocessed_count += 1 + unprocessed_details.append( + f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), Instrucción UID: {instr_uid}, Tipo: '{instr_type}'" + ) + if unprocessed_count > 0: + print( + f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:" + ) + [print(detail) for detail in unprocessed_details] + else: + print( + "INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas." + ) + + # <-- MODIFICADO: Asegurar que los metadatos se añadan al 'data' final antes de guardar --> + if source_xml_mod_time is not None: + data["source_xml_mod_time"] = source_xml_mod_time + if source_xml_size is not None: + data["source_xml_size"] = source_xml_size + # <-- FIN MODIFICADO --> + + print(f"\nGuardando JSON procesado ({block_type}) en: {output_json_filepath}") + try: + with open(output_json_filepath, "w", encoding="utf-8") as f: + json.dump( + data, f, indent=4, ensure_ascii=False + ) # Guardar el diccionario 'data' modificado + print("Guardado completado.") + return True + except Exception as e: + print(f"Error Crítico al guardar JSON procesado: {e}", file=sys.stderr) + traceback.print_exc(file=sys.stderr) + return False + + +# --- Ejecución (MODIFICADO) --- +if __name__ == "__main__": + # Lógica para ejecución standalone + try: + import tkinter as tk + from tkinter import filedialog + except ImportError: + print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr) + tk = None + + input_json_file = "" + if tk: + root = tk.Tk() + root.withdraw() + print("Por favor, selecciona el archivo JSON de entrada (generado por x1)...") + input_json_file = filedialog.askopenfilename( + title="Selecciona el archivo JSON de entrada (.json)", + filetypes=[("JSON files", "*.json"), ("All files", "*.*")] + ) + root.destroy() + + if not input_json_file: + print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr) + else: + print(f"Archivo JSON de entrada seleccionado: {input_json_file}") + + # Calcular ruta de salida JSON procesado + json_filename_base = os.path.splitext(os.path.basename(input_json_file))[0] + # Asumimos que el _processed.json va al mismo directorio 'parsing' + parsing_dir = os.path.dirname(input_json_file) + output_json_file = os.path.join(parsing_dir, f"{json_filename_base}_processed.json") + + # Asegurarse de que el directorio de salida exista (aunque debería si el input existe) + os.makedirs(parsing_dir, exist_ok=True) + + print( + f"(x2 - Standalone) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'" + ) + + try: + success = process_json_to_scl(input_json_file, output_json_file) + if success: + print("\nProcesamiento completado exitosamente.") + else: + print(f"\nError durante el procesamiento de '{os.path.relpath(input_json_file)}'.", file=sys.stderr) + # sys.exit(1) # No usar sys.exit + except Exception as e: + print( + f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}", + file=sys.stderr, + ) + traceback.print_exc(file=sys.stderr) + # sys.exit(1) # No usar sys.exit