ParamManagerScripts/backend/script_groups/XML Parser to SCL/processors/process_contact.py

103 lines
3.9 KiB
Python

# processors/process_contact.py
import sympy
from .processor_utils import (
get_sympy_representation,
format_variable_name,
) # Use new util
from .symbol_manager import (
SymbolManager,
extract_plc_variable_name,
) # Need symbol manager access
# Define SCL_SUFFIX or import if needed globally
SCL_SUFFIX = "_sympy_processed" # Indicate processing type
def process_contact(
instruction, network_id, sympy_map, symbol_manager, data
): # Pass symbol_manager
"""Genera la expresión SymPy para Contact (normal o negado)."""
instr_uid = instruction["instruction_uid"]
instr_type_original = instruction.get("type", "Contact")
# Check if already processed with the new method
if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original:
return False
is_negated = instruction.get("negated_pins", {}).get("operand", False)
# Get incoming SymPy expression (RLO)
in_input = instruction["inputs"].get("in")
sympy_expr_in = get_sympy_representation(
in_input, network_id, sympy_map, symbol_manager
)
# Get operand SymPy Symbol
operand_info = instruction["inputs"].get("operand")
operand_plc_name = extract_plc_variable_name(operand_info)
sympy_symbol_operand = (
symbol_manager.get_symbol(operand_plc_name) if operand_plc_name else None
)
# Enhanced robustness: Handle cases where operand parsing fails
if operand_plc_name is None and operand_info:
# Try to extract name directly if available
if isinstance(operand_info, dict) and "name" in operand_info:
operand_plc_name = operand_info.get("name", "").strip('"')
if operand_plc_name:
sympy_symbol_operand = symbol_manager.get_symbol(operand_plc_name)
# If still no success, mark as error instead of hanging
if sympy_symbol_operand is None:
print(
f"Error: Contact {instr_uid} - no se pudo extraer operando de {operand_info}"
)
instruction["scl"] = f"// ERROR: Contact {instr_uid} operando inválido"
instruction["type"] = instr_type_original + "_error"
return True
# Check dependencies with more specific error handling
if sympy_expr_in is None:
# If input is powerrail, treat as TRUE
if isinstance(in_input, dict) and in_input.get("type") == "powerrail":
sympy_expr_in = sympy.true
else:
# print(f"DEBUG Contact {instr_uid}: Input dependency not ready")
return False # Dependencies not ready
if sympy_symbol_operand is None:
# print(f"DEBUG Contact {instr_uid}: Operand dependency not ready")
return False # Dependencies not ready
# Apply negation using SymPy
current_term = (
sympy.Not(sympy_symbol_operand) if is_negated else sympy_symbol_operand
)
# Combine with previous RLO using SymPy
# Simplify common cases: TRUE AND X -> X, FALSE AND X -> FALSE
if sympy_expr_in == sympy.true:
sympy_expr_out = current_term
elif sympy_expr_in == sympy.false:
sympy_expr_out = sympy.false
else:
sympy_expr_out = sympy.And(sympy_expr_in, current_term)
# Store the resulting SymPy expression object in the map
map_key_out = (network_id, instr_uid, "out")
sympy_map[map_key_out] = sympy_expr_out
# Mark instruction as processed (SCL field is now less relevant here)
instruction["scl"] = f"// SymPy Contact: {sympy_expr_out}" # Optional debug comment
instruction["type"] = instr_type_original + SCL_SUFFIX # Use the new suffix
# Contact doesn't usually have ENO, it modifies the RLO ('out')
return True
# --- Processor Information Function ---
def get_processor_info():
"""Devuelve la información para el procesador Contact."""
# Ensure 'data' argument is added if needed by the processor function signature change
return {"type_name": "contact", "processor_func": process_contact, "priority": 1}