# processors/process_contact.py import sympy from .processor_utils import ( get_sympy_representation, format_variable_name, ) # Use new util from .symbol_manager import ( SymbolManager, extract_plc_variable_name, ) # Need symbol manager access # Define SCL_SUFFIX or import if needed globally SCL_SUFFIX = "_sympy_processed" # Indicate processing type def process_contact( instruction, network_id, sympy_map, symbol_manager, data ): # Pass symbol_manager """Genera la expresión SymPy para Contact (normal o negado).""" instr_uid = instruction["instruction_uid"] instr_type_original = instruction.get("type", "Contact") # Check if already processed with the new method if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original: return False is_negated = instruction.get("negated_pins", {}).get("operand", False) # Get incoming SymPy expression (RLO) in_input = instruction["inputs"].get("in") sympy_expr_in = get_sympy_representation( in_input, network_id, sympy_map, symbol_manager ) # Get operand SymPy Symbol operand_info = instruction["inputs"].get("operand") operand_plc_name = extract_plc_variable_name(operand_info) sympy_symbol_operand = ( symbol_manager.get_symbol(operand_plc_name) if operand_plc_name else None ) # Enhanced robustness: Handle cases where operand parsing fails if operand_plc_name is None and operand_info: # Try to extract name directly if available if isinstance(operand_info, dict) and "name" in operand_info: operand_plc_name = operand_info.get("name", "").strip('"') if operand_plc_name: sympy_symbol_operand = symbol_manager.get_symbol(operand_plc_name) # If still no success, mark as error instead of hanging if sympy_symbol_operand is None: print( f"Error: Contact {instr_uid} - no se pudo extraer operando de {operand_info}" ) instruction["scl"] = f"// ERROR: Contact {instr_uid} operando inválido" instruction["type"] = instr_type_original + "_error" return True # Check dependencies with more specific error handling if sympy_expr_in is None: # If input is powerrail, treat as TRUE if isinstance(in_input, dict) and in_input.get("type") == "powerrail": sympy_expr_in = sympy.true else: # print(f"DEBUG Contact {instr_uid}: Input dependency not ready") return False # Dependencies not ready if sympy_symbol_operand is None: # print(f"DEBUG Contact {instr_uid}: Operand dependency not ready") return False # Dependencies not ready # Apply negation using SymPy current_term = ( sympy.Not(sympy_symbol_operand) if is_negated else sympy_symbol_operand ) # Combine with previous RLO using SymPy # Simplify common cases: TRUE AND X -> X, FALSE AND X -> FALSE if sympy_expr_in == sympy.true: sympy_expr_out = current_term elif sympy_expr_in == sympy.false: sympy_expr_out = sympy.false else: sympy_expr_out = sympy.And(sympy_expr_in, current_term) # Store the resulting SymPy expression object in the map map_key_out = (network_id, instr_uid, "out") sympy_map[map_key_out] = sympy_expr_out # Mark instruction as processed (SCL field is now less relevant here) instruction["scl"] = f"// SymPy Contact: {sympy_expr_out}" # Optional debug comment instruction["type"] = instr_type_original + SCL_SUFFIX # Use the new suffix # Contact doesn't usually have ENO, it modifies the RLO ('out') return True # --- Processor Information Function --- def get_processor_info(): """Devuelve la información para el procesador Contact.""" # Ensure 'data' argument is added if needed by the processor function signature change return {"type_name": "contact", "processor_func": process_contact, "priority": 1}