# -*- coding: utf-8 -*- # processors/processor_utils.py import re import sympy from .symbol_manager import SymbolManager, extract_plc_variable_name SCL_SUFFIX = "_sympy_processed" # <<< AÑADE ESTA LÍNEA def format_variable_name(name): """Limpia el nombre de la variable para SCL.""" if not name: return "_INVALID_NAME_" if name.startswith('"') and name.endswith('"'): return name prefix = "" if name.startswith("#"): prefix = "#" name = name[1:] if name and name[0].isdigit(): name = "_" + name name = re.sub(r"[^a-zA-Z0-9_]", "_", name) return prefix + name def get_sympy_representation(source_info, network_id, sympy_map, symbol_manager): """Gets the SymPy expression object representing the source.""" if not source_info: print("Warning: get_sympy_representation called with None source_info.") return None # Or raise error # Handle lists (OR branches) - Recursively call and combine with sympy.Or if isinstance(source_info, list): sympy_parts = [] all_resolved = True for sub_source in source_info: sub_sympy = get_sympy_representation(sub_source, network_id, sympy_map, symbol_manager) if sub_sympy is None: all_resolved = False break sympy_parts.append(sub_sympy) if not all_resolved: return None if not sympy_parts: return sympy.false # Empty OR is false # Return sympy.Or only if there are multiple parts return sympy.Or(*sympy_parts) if len(sympy_parts) > 1 else sympy_parts[0] # Handle single source dictionary source_type = source_info.get("type") if source_type == "powerrail": return sympy.true elif source_type == "variable": plc_name = extract_plc_variable_name(source_info) if plc_name: return symbol_manager.get_symbol(plc_name) else: print(f"Error: Variable source without name: {source_info}") return None # Error case elif source_type == "constant": # Represent constants directly if possible, otherwise maybe as symbols? # For boolean simplification, only TRUE/FALSE matter significantly. dtype = str(source_info.get("datatype", "")).upper() value = source_info.get("value") if dtype == "BOOL": return sympy.true if str(value).upper() == "TRUE" else sympy.false else: # For simplification, treat non-boolean constants as opaque symbols? # Or just return their string representation if they won't be simplified anyway? # Let's return their string value for now, processors will handle it. # This might need refinement if constants need symbolic handling. return str(value) # Or maybe symbol_manager.get_symbol(str(value))? elif source_type == "connection": map_key = ( network_id, source_info.get("source_instruction_uid"), source_info.get("source_pin"), ) # Return the SymPy object from the map return sympy_map.get(map_key) # Returns None if not found (dependency not ready) elif source_type == "unknown_source": print(f"Warning: Referring to unknown source UID: {source_info.get('uid')}") return None # Cannot resolve else: print(f"Warning: Unknown source type: {source_info}") return None # Cannot resolve def sympy_expr_to_scl(expr, symbol_manager, format_prec=5): """Converts a SymPy expression to an SCL string using the symbol map.""" if expr is None: return "/* ERROR: None expression */" if expr == sympy.true: return "TRUE" if expr == sympy.false: return "FALSE" # Use sympy's string printer with custom settings if needed # For boolean, standard printing might be okay, but need to substitute symbols try: # Get the inverse map (py_id -> plc_name) inverse_map = symbol_manager.get_inverse_map() # Substitute symbols back to their py_id strings first # Need to handle the structure (And, Or, Not) scl_str = sympy.sstr(expr, order=None) # Basic string representation # Now, carefully replace py_id back to PLC names using regex # Sort keys by length descending to replace longer IDs first for py_id in sorted(inverse_map.keys(), key=len, reverse=True): # Use word boundaries to avoid replacing parts of other IDs scl_str = re.sub(r'\b' + re.escape(py_id) + r'\b', inverse_map[py_id], scl_str) # Replace SymPy operators/functions with SCL equivalents scl_str = scl_str.replace('&', ' AND ') scl_str = scl_str.replace('|', ' OR ') scl_str = scl_str.replace('^', ' XOR ') # If XOR is used scl_str = scl_str.replace('~', 'NOT ') # Add spaces around operators if needed after substitution scl_str = re.sub(r'AND', ' AND ', scl_str) scl_str = re.sub(r'OR', ' OR ', scl_str) scl_str = re.sub(r'XOR', ' XOR ', scl_str) scl_str = re.sub(r'NOT', 'NOT ', scl_str) # Space after NOT # Clean up potential double spaces, etc. scl_str = re.sub(r'\s+', ' ', scl_str).strip() # Handle parentheses potentially added by sstr - maybe remove redundant ones? # Be careful not to break operator precedence. return scl_str except Exception as e: print(f"Error converting SymPy expr '{expr}' to SCL: {e}") traceback.print_exc() return f"/* ERROR converting SymPy: {expr} */" def get_scl_representation(source_info, network_id, scl_map, access_map): if not source_info: return None if isinstance(source_info, list): scl_parts = [] all_resolved = True for sub_source in source_info: sub_scl = get_scl_representation( sub_source, network_id, scl_map, access_map ) if sub_scl is None: all_resolved = False break if ( sub_scl in ["TRUE", "FALSE"] or (sub_scl.startswith('"') and sub_scl.endswith('"')) or sub_scl.isdigit() or (sub_scl.startswith("(") and sub_scl.endswith(")")) ): scl_parts.append(sub_scl) else: scl_parts.append(f"({sub_scl})") return ( " OR ".join(scl_parts) if len(scl_parts) > 1 else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None ) source_type = source_info.get("type") if source_type == "powerrail": return "TRUE" elif source_type == "variable": name = source_info.get("name") # Asegurar que los nombres de variables se formatean correctamente aquí también return ( format_variable_name(name) if name else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_" ) elif source_type == "constant": dtype = str(source_info.get("datatype", "")).upper() value = source_info.get("value") try: if dtype == "BOOL": return str(value).upper() elif dtype in [ "INT", "DINT", "SINT", "USINT", "UINT", "UDINT", "LINT", "ULINT", "WORD", "DWORD", "LWORD", "BYTE", ]: return str(value) elif dtype in ["REAL", "LREAL"]: s_val = str(value) return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0" elif dtype == "STRING": # Escapar comillas simples dentro del string si es necesario str_val = str(value).replace("'", "''") return f"'{str_val}'" elif dtype == "TYPEDCONSTANT": # Podría necesitar formateo específico basado en el tipo real return str(value) else: # Otros tipos (TIME, DATE, etc.) - devolver como string por ahora str_val = str(value).replace("'", "''") return f"'{str_val}'" except Exception as e: print(f"Advertencia: Error formateando constante {source_info}: {e}") return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_" elif source_type == "connection": map_key = ( network_id, source_info.get("source_instruction_uid"), source_info.get("source_pin"), ) return scl_map.get(map_key) elif source_type == "unknown_source": print( f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}" ) return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_" else: print(f"Advertencia: Tipo de fuente desconocido: {source_info}") return f"_ERR_INVALID_SRC_TYPE_" def format_variable_name(name): """Limpia el nombre de la variable para SCL.""" if not name: return "_INVALID_NAME_" # Si ya está entre comillas dobles, asumimos que es un nombre complejo (ej. "DB"."Variable") # y lo devolvemos tal cual para SCL. if name.startswith('"') and name.endswith('"'): # Podríamos añadir validación extra aquí si fuera necesario return name # Si no tiene comillas, es un nombre simple (ej. Tag_1, #tempVar) # Reemplazar caracteres no válidos (excepto '_') por '_' # Permitir '#' al inicio para variables temporales prefix = "" if name.startswith("#"): prefix = "#" name = name[1:] # Permitir letras, números y guiones bajos. Reemplazar el resto. # Asegurarse de que no empiece con número (después del # si existe) if name and name[0].isdigit(): name = "_" + name # Reemplazar caracteres no válidos name = re.sub(r"[^a-zA-Z0-9_]", "_", name) return prefix + name def generate_temp_var_name(network_id, instr_uid, pin_name): net_id_clean = str(network_id).replace("-", "_") instr_uid_clean = str(instr_uid).replace("-", "_") pin_name_clean = str(pin_name).replace("-", "_").lower() # Usar # para variables temporales SCL estándar return f"#_temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}" def get_target_scl_name(instruction, pin_name, network_id, default_to_temp=True): """Gets the SCL formatted name for a target variable. Handles instruction outputs AND specific inputs like Coil operand. """ instr_uid = instruction["instruction_uid"] # Ahora SCL_SUFFIX está definido en este módulo instr_type_upper = instruction.get("type", "").upper().replace(SCL_SUFFIX.upper(), "").replace("_ERROR", "") # Check original type target_info = None # Special handling for inputs that represent the target variable if instr_type_upper in ["COIL", "SCOIL", "RCOIL"] and pin_name == "operand": target_info = instruction.get("inputs", {}).get("operand") # Add other instructions where input pin == target if necessary # elif instr_type_upper == "XYZ" and pin_name == "some_input_target_pin": # target_info = instruction.get("inputs", {}).get(pin_name) else: # Default: Assume pin_name refers to an output pin output_pin_data = instruction.get("outputs", {}).get(pin_name) # Check if it's a list and has one connection (standard case) if (output_pin_data and isinstance(output_pin_data, list) and len(output_pin_data) == 1): target_info = output_pin_data[0] # Add handling for direct output assignment if your JSON structure supports it target_scl = None if target_info: if target_info.get("type") == "variable": plc_name = target_info.get("name") if plc_name: target_scl = format_variable_name(plc_name) # Use existing util else: print(f"Error: Target variable for {instr_uid}.{pin_name} has no name (UID: {target_info.get('uid')}).") elif target_info.get("type") == "constant": print(f"Advertencia: Attempt to write to constant target {instr_uid}.{pin_name} (UID: {target_info.get('uid')}).") # else: # Handle other target types if needed # print(f"Advertencia: Target {instr_uid}.{pin_name} is not a variable: {target_info.get('type')}.") # else: # No target info found for the specified pin # print(f"DEBUG: No target info found for {instr_uid}.{pin_name}") pass # Handle default_to_temp logic if target_scl: return target_scl elif default_to_temp: # Generate temp only if no explicit target was found AND default is allowed print(f"INFO: Generating temp var for {instr_uid}.{pin_name}") # Be informative return generate_temp_var_name(network_id, instr_uid, pin_name) else: # No target found and default temps not allowed return None