feat: Add duplicate UID handler and process_sr processor

- Implemented a new processor for handling duplicate UIDs in JSON data. The `duplicate_uid_handler.py` detects and resolves duplicate UIDs by renaming them with a suffix (_dup1, _dup2, etc.) and updates references accordingly.
- Added a new processor for Set/Reset flip-flop (Sr) in `process_sr.py`. This processor marks instructions as processed without generating specific code for cases with no connections, and flags errors for more complex cases.
This commit is contained in:
Miguel 2025-08-27 22:30:30 +02:00
parent f7d11c67c3
commit b1ee3a0eae
10 changed files with 52403 additions and 10173 deletions

View File

@ -0,0 +1,65 @@
# processors/duplicate_uid_handler.py
"""
Manejador de UIDs duplicados en el JSON
"""
def detect_and_resolve_duplicate_uids(data):
"""
Detecta y resuelve UIDs duplicados en el JSON de datos.
Estrategia: Renombrar UIDs duplicados añadiendo sufijo _dup1, _dup2, etc.
"""
uid_counts = {}
modifications_made = 0
print("INFO: Detectando UIDs duplicados...")
# Primera pasada: contar UIDs
for network in data.get("networks", []):
network_id = network.get("id", "Unknown")
for instruction in network.get("logic", []):
uid = instruction.get("instruction_uid")
if uid:
if uid not in uid_counts:
uid_counts[uid] = []
uid_counts[uid].append((network_id, instruction))
# Segunda pasada: renombrar duplicados
for uid, instances in uid_counts.items():
if len(instances) > 1:
print(f"INFO: UID duplicado encontrado: {uid} ({len(instances)} instancias)")
# Mantener la primera instancia, renombrar las demás
for i, (network_id, instruction) in enumerate(instances[1:], 1):
old_uid = instruction["instruction_uid"]
new_uid = f"{old_uid}_dup{i}"
instruction["instruction_uid"] = new_uid
modifications_made += 1
print(f" - Red {network_id}: UID {old_uid}{new_uid}")
# También actualizar referencias en otras instrucciones
update_uid_references(data, old_uid, new_uid, network_id)
if modifications_made > 0:
print(f"INFO: Se resolvieron {modifications_made} UIDs duplicados")
else:
print("INFO: No se encontraron UIDs duplicados")
return modifications_made > 0
def update_uid_references(data, old_uid, new_uid, target_network_id):
"""
Actualiza las referencias al UID antiguo en conexiones de otras instrucciones
dentro de la misma red.
"""
for network in data.get("networks", []):
if network.get("id") != target_network_id:
continue
for instruction in network.get("logic", []):
# Buscar en inputs que tengan source_instruction_uid
for pin_name, pin_info in instruction.get("inputs", {}).items():
if isinstance(pin_info, dict):
if pin_info.get("source_instruction_uid") == old_uid:
pin_info["source_instruction_uid"] = new_uid
print(f" - Actualizada referencia en instrucción {instruction.get('instruction_uid')} pin {pin_name}")

View File

@ -72,8 +72,12 @@ def process_coil(instruction, network_id, sympy_map, symbol_manager, data):
# Update instruction
instruction["scl"] = scl_final
instruction["type"] = instr_type_original + SCL_SUFFIX
# Coil typically doesn't output to scl_map
# *** ENHANCED: Store the Coil's input expression in sympy_map for potential reuse ***
# Some instructions (like subsequent Contacts) might need to read the Coil's logic state
map_key_out = (network_id, instr_uid, "out")
sympy_map[map_key_out] = simplified_expr # Store the simplified boolean expression
return True
# --- Processor Information Function ---

View File

@ -26,20 +26,44 @@ def process_contact(instruction, network_id, sympy_map, symbol_manager, data): #
operand_plc_name = extract_plc_variable_name(operand_info)
sympy_symbol_operand = symbol_manager.get_symbol(operand_plc_name) if operand_plc_name else None
# Check dependencies
if sympy_expr_in is None or sympy_symbol_operand is None:
# print(f"DEBUG Contact {instr_uid}: Dependency not ready (In: {sympy_expr_in is not None}, Op: {sympy_symbol_operand is not None})")
# Enhanced robustness: Handle cases where operand parsing fails
if operand_plc_name is None and operand_info:
# Try to extract name directly if available
if isinstance(operand_info, dict) and "name" in operand_info:
operand_plc_name = operand_info.get("name", "").strip('"')
if operand_plc_name:
sympy_symbol_operand = symbol_manager.get_symbol(operand_plc_name)
# If still no success, mark as error instead of hanging
if sympy_symbol_operand is None:
print(f"Error: Contact {instr_uid} - no se pudo extraer operando de {operand_info}")
instruction["scl"] = f"// ERROR: Contact {instr_uid} operando inválido"
instruction["type"] = instr_type_original + "_error"
return True
# Check dependencies with more specific error handling
if sympy_expr_in is None:
# If input is powerrail, treat as TRUE
if isinstance(in_input, dict) and in_input.get("type") == "powerrail":
sympy_expr_in = sympy.true
else:
# print(f"DEBUG Contact {instr_uid}: Input dependency not ready")
return False # Dependencies not ready
if sympy_symbol_operand is None:
# print(f"DEBUG Contact {instr_uid}: Operand dependency not ready")
return False # Dependencies not ready
# Apply negation using SymPy
current_term = sympy.Not(sympy_symbol_operand) if is_negated else sympy_symbol_operand
# Combine with previous RLO using SymPy
# Simplify common cases: TRUE AND X -> X
# Simplify common cases: TRUE AND X -> X, FALSE AND X -> FALSE
if sympy_expr_in == sympy.true:
sympy_expr_out = current_term
elif sympy_expr_in == sympy.false:
sympy_expr_out = sympy.false
else:
# Could add FALSE AND X -> FALSE optimization here too
sympy_expr_out = sympy.And(sympy_expr_in, current_term)
# Store the resulting SymPy expression object in the map

View File

@ -11,15 +11,15 @@ SCL_SUFFIX = "_sympy_processed"
def process_math(instruction, network_id, sympy_map, symbol_manager: SymbolManager, data):
"""
Genera SCL para operaciones matemáticas (SUB, MUL, DIV), simplificando EN.
Genera SCL para operaciones matemáticas (SUB, MUL, DIV, CEIL), simplificando EN.
"""
instr_uid = instruction["instruction_uid"]
instr_type_original = instruction.get("type", "") # SUB, MUL, DIV
instr_type_original = instruction.get("type", "") # SUB, MUL, DIV, CEIL
if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original:
return False
# Mapa de tipos a operadores SCL string
op_map = {"SUB": "-", "MUL": "*", "DIV": "/"}
op_map = {"SUB": "-", "MUL": "*", "DIV": "/", "CEIL": "CEIL"}
scl_operator = op_map.get(instr_type_original.upper())
if not scl_operator:
instruction["scl"] = f"// ERROR: Operación matemática no soportada: {instr_type_original}"
@ -30,28 +30,42 @@ def process_math(instruction, network_id, sympy_map, symbol_manager: SymbolManag
en_input = instruction["inputs"].get("en")
in1_info = instruction["inputs"].get("in1")
in2_info = instruction["inputs"].get("in2")
in_info = instruction["inputs"].get("in") # Para funciones de un solo operando como CEIL
sympy_en_expr = get_sympy_representation(en_input, network_id, sympy_map, symbol_manager) if en_input else sympy.true
op1_sympy_or_const = get_sympy_representation(in1_info, network_id, sympy_map, symbol_manager)
op2_sympy_or_const = get_sympy_representation(in2_info, network_id, sympy_map, symbol_manager)
# Para CEIL solo necesitamos un operando
if instr_type_original.upper() == "CEIL":
op1_sympy_or_const = get_sympy_representation(in_info, network_id, sympy_map, symbol_manager)
op2_sympy_or_const = None
else:
op1_sympy_or_const = get_sympy_representation(in1_info, network_id, sympy_map, symbol_manager)
op2_sympy_or_const = get_sympy_representation(in2_info, network_id, sympy_map, symbol_manager)
# Obtener destino SCL
target_scl_name = get_target_scl_name(instruction, "out", network_id, default_to_temp=True)
# Verificar dependencias
if sympy_en_expr is None or op1_sympy_or_const is None or op2_sympy_or_const is None or target_scl_name is None:
if sympy_en_expr is None or op1_sympy_or_const is None or target_scl_name is None:
return False
if instr_type_original.upper() != "CEIL" and op2_sympy_or_const is None:
return False
# Convertir operandos SymPy/Constante a SCL strings
op1_scl = sympy_expr_to_scl(op1_sympy_or_const, symbol_manager)
op2_scl = sympy_expr_to_scl(op2_sympy_or_const, symbol_manager)
# Añadir paréntesis si contienen operadores (más seguro)
# La función sympy_expr_to_scl debería idealmente manejar esto, pero doble chequeo simple:
op1_scl_formatted = f"({op1_scl})" if re.search(r'[+\-*/ ]', op1_scl) else op1_scl
op2_scl_formatted = f"({op2_scl})" if re.search(r'[+\-*/ ]', op2_scl) else op2_scl
# Generar SCL Core
scl_core = f"{target_scl_name} := {op1_scl_formatted} {scl_operator} {op2_scl_formatted};"
if instr_type_original.upper() == "CEIL":
scl_core = f"{target_scl_name} := CEIL({op1_scl});"
else:
op2_scl = sympy_expr_to_scl(op2_sympy_or_const, symbol_manager)
# Añadir paréntesis si contienen operadores (más seguro)
# La función sympy_expr_to_scl debería idealmente manejar esto, pero doble chequeo simple:
op1_scl_formatted = f"({op1_scl})" if re.search(r'[+\-*/ ]', op1_scl) else op1_scl
op2_scl_formatted = f"({op2_scl})" if re.search(r'[+\-*/ ]', op2_scl) else op2_scl
scl_core = f"{target_scl_name} := {op1_scl_formatted} {scl_operator} {op2_scl_formatted};"
# Aplicar Condición EN (Simplificando EN)
scl_final = ""
@ -83,9 +97,10 @@ def process_math(instruction, network_id, sympy_map, symbol_manager: SymbolManag
# --- Processor Information Function ---
def get_processor_info():
"""Devuelve info para SUB, MUL, DIV."""
"""Devuelve info para SUB, MUL, DIV, CEIL."""
return [
{'type_name': 'sub', 'processor_func': process_math, 'priority': 4},
{'type_name': 'mul', 'processor_func': process_math, 'priority': 4},
{'type_name': 'div', 'processor_func': process_math, 'priority': 4}
{'type_name': 'div', 'processor_func': process_math, 'priority': 4},
{'type_name': 'ceil', 'processor_func': process_math, 'priority': 4}
]

View File

@ -26,31 +26,44 @@ def process_o(instruction, network_id, sympy_map, symbol_manager: SymbolManager,
sympy_parts = []
all_resolved = True
missing_deps = []
for pin in input_pins:
input_info = instruction["inputs"][pin]
sympy_expr = get_sympy_representation(input_info, network_id, sympy_map, symbol_manager)
if sympy_expr is None:
all_resolved = False
# print(f"DEBUG: O {instr_uid} esperando pin {pin}")
break # Salir si una dependencia no está lista
missing_deps.append(pin)
# Continue checking other pins instead of breaking immediately
continue
# Optimización: No incluir FALSE en un OR
if sympy_expr != sympy.false:
sympy_parts.append(sympy_expr)
if not all_resolved:
# More detailed debug info
# print(f"DEBUG: O {instr_uid} esperando pines {missing_deps}")
return False # Esperar dependencias
# Construir la expresión OR de SymPy
result_sympy_expr = sympy.false # Valor por defecto si no hay entradas válidas o todas son FALSE
if sympy_parts:
# Usar sympy.Or para construir la expresión
# Enhanced handling: If all inputs are FALSE or no valid inputs, result is FALSE
if not sympy_parts:
result_sympy_expr = sympy.false
elif len(sympy_parts) == 1:
# Optimization: OR with single input is just the input
result_sympy_expr = sympy_parts[0]
else:
# Use sympy.Or for multiple inputs
result_sympy_expr = sympy.Or(*sympy_parts)
# Simplificar casos obvios como OR(X) -> X, OR(X, TRUE) -> TRUE
# simplify_logic aquí puede ser prematuro, mejor al final.
# Pero Or() podría simplificar automáticamente OR(X) -> X.
# Opcional: result_sympy_expr = sympy.simplify_logic(result_sympy_expr)
# Optional: Apply simplification
try:
result_sympy_expr = sympy.simplify_logic(result_sympy_expr, force=False)
except Exception as e:
# If simplification fails, use original expression
# print(f"DEBUG: O {instr_uid} simplification failed: {e}")
pass
# Guardar la expresión SymPy resultante en el mapa para 'out'

View File

@ -0,0 +1,39 @@
# processors/process_sr.py
# -*- coding: utf-8 -*-
import sympy
import traceback
from .processor_utils import get_sympy_representation, sympy_expr_to_scl, get_target_scl_name, format_variable_name
from .symbol_manager import SymbolManager
SCL_SUFFIX = "_sympy_processed"
def process_sr(instruction, network_id, sympy_map, symbol_manager: SymbolManager, data):
"""
Genera SCL para Set/Reset flip-flop (Sr).
Por ahora, marca como procesado sin generar código específico.
"""
instr_uid = instruction["instruction_uid"]
instr_type_original = instruction.get("type", "Sr")
if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original:
return False
# Verificar si la instrucción tiene conexiones válidas
inputs = instruction.get("inputs", {})
outputs = instruction.get("outputs", {})
# Si no tiene conexiones, marcar como procesado sin generar código
if not inputs and not outputs:
instruction["scl"] = "// Sr flip-flop sin conexiones - procesado como placeholder"
instruction["type"] = instr_type_original + SCL_SUFFIX
return True
# TODO: Implementar lógica completa para Sr cuando se encuentren casos con conexiones
# Por ahora, marcar como error para casos más complejos
instruction["scl"] = f"// ERROR: Sr {instr_uid} con conexiones no implementado aún."
instruction["type"] = instr_type_original + "_error"
return True
# --- Processor Information Function ---
def get_processor_info():
"""Devuelve la información para el procesador Sr."""
return {'type_name': 'sr', 'processor_func': process_sr, 'priority': 4}

View File

@ -19,7 +19,7 @@ def process_timer(instruction, network_id, sympy_map, symbol_manager: SymbolMana
return False
scl_timer_type = instr_type_original.upper()
if scl_timer_type not in ["TON", "TOF"]:
if scl_timer_type not in ["TON", "TOF", "TP"]:
instruction["scl"] = f"// ERROR: Tipo de temporizador directo no soportado: {instr_type_original}"
instruction["type"] = instr_type_original + "_error"
return True
@ -59,7 +59,7 @@ def process_timer(instruction, network_id, sympy_map, symbol_manager: SymbolMana
instruction["scl"] = scl_call # SCL final generado
instruction["type"] = instr_type_original + SCL_SUFFIX
# 7. Actualizar sympy_map para las salidas Q y ET
# 7. Actualizar sympy_map para las salidas Q, ET y ENO
map_key_q = (network_id, instr_uid, "Q") # Pin estándar SCL
# *** Store SymPy Symbol for boolean output Q ***
q_output_scl_access = f"{instance_name_scl}.Q" # String for SCL access
@ -74,12 +74,25 @@ def process_timer(instruction, network_id, sympy_map, symbol_manager: SymbolMana
# ET is TIME, store SCL access string
sympy_map[map_key_et] = f"{instance_name_scl}.ET"
# *** NEW: Handle ENO (Enable Output) pin ***
map_key_eno = (network_id, instr_uid, "eno")
# For timers, ENO is typically TRUE when the timer is properly executed
# In simplified logic, we can assume ENO = TRUE for well-formed timer calls
# Or we could make it conditional based on input validity
# For now, let's use TRUE as a reasonable default
sympy_map[map_key_eno] = sympy.true
# *** Also handle common aliases ***
map_key_out = (network_id, instr_uid, "out") # Some connections might look for "out"
sympy_map[map_key_out] = sympy_q_symbol # Map "out" to Q output
return True
# --- Processor Information Function ---
def get_processor_info():
"""Devuelve info para TON y TOF directos."""
"""Devuelve info para TON, TOF y TP directos."""
return [
{'type_name': 'ton', 'processor_func': process_timer, 'priority': 5},
{'type_name': 'tof', 'processor_func': process_timer, 'priority': 5}
{'type_name': 'tof', 'processor_func': process_timer, 'priority': 5},
{'type_name': 'tp', 'processor_func': process_timer, 'priority': 5}
]

View File

@ -53,6 +53,39 @@ class SymbolManager:
# Helper function to extract PLC variable name from JSON operand info
def extract_plc_variable_name(operand_info):
if operand_info and operand_info.get("type") == "variable":
return operand_info.get("name")
return None # Not a variable or info missing
if operand_info and operand_info.get("type") == "variable":
return operand_info.get("name")
elif operand_info and operand_info.get("type") == "unknown_structure":
# Handle direct memory addresses like DB960.X448.0
area = operand_info.get("Area")
block_number = operand_info.get("BlockNumber")
bit_offset = operand_info.get("BitOffset")
data_type = operand_info.get("Type")
if area and block_number and bit_offset is not None:
if area == "DB" and data_type == "Bool":
# Convert bit offset to byte and bit
byte_offset = int(bit_offset) // 8
bit_pos = int(bit_offset) % 8
return f"%DB{block_number}.DBX{byte_offset}.{bit_pos}"
elif area == "DB" and data_type in ["Word", "Int"]:
byte_offset = int(bit_offset) // 8
return f"%DB{block_number}.DBW{byte_offset}"
elif area == "DB" and data_type in ["DWord", "DInt", "Real"]:
byte_offset = int(bit_offset) // 8
return f"%DB{block_number}.DBD{byte_offset}"
else:
# Other area types (M, I, Q, etc.)
if data_type == "Bool":
byte_offset = int(bit_offset) // 8
bit_pos = int(bit_offset) % 8
return f"%{area}{byte_offset}.{bit_pos}"
else:
byte_offset = int(bit_offset) // 8
if data_type in ["Word", "Int"]:
return f"%{area}W{byte_offset}"
elif data_type in ["DWord", "DInt", "Real"]:
return f"%{area}D{byte_offset}"
else:
return f"%{area}{byte_offset}"
return None # Not a variable or info missing

View File

@ -309,6 +309,18 @@ def process_json_to_scl(json_filepath, output_json_filepath):
print(f"INFO: El bloque es {block_type}. Iniciando procesamiento lógico...")
# --- NUEVO: Manejar UIDs duplicados antes del procesamiento principal ---
try:
from processors.duplicate_uid_handler import detect_and_resolve_duplicate_uids
duplicates_resolved = detect_and_resolve_duplicate_uids(data)
if duplicates_resolved:
print("INFO: UIDs duplicados resueltos. Continuando con el procesamiento...")
except ImportError:
print("WARNING: No se pudo cargar el manejador de UIDs duplicados")
except Exception as e:
print(f"WARNING: Error al manejar UIDs duplicados: {e}")
# --- FIN NUEVO ---
script_dir = os.path.dirname(__file__)
processors_dir_path = os.path.join(script_dir, "processors")
processor_map, sorted_processors = load_processors(processors_dir_path)

62278
data/log.txt

File diff suppressed because it is too large Load Diff