311 lines
13 KiB
Python
311 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# processors/processor_utils.py
|
|
import re
|
|
import sympy
|
|
from .symbol_manager import SymbolManager, extract_plc_variable_name
|
|
|
|
SCL_SUFFIX = "_sympy_processed" # <<< AÑADE ESTA LÍNEA
|
|
|
|
def format_variable_name(name):
|
|
"""Limpia el nombre de la variable para SCL."""
|
|
if not name:
|
|
return "_INVALID_NAME_"
|
|
if name.startswith('"') and name.endswith('"'):
|
|
return name
|
|
prefix = ""
|
|
if name.startswith("#"):
|
|
prefix = "#"
|
|
name = name[1:]
|
|
if name and name[0].isdigit():
|
|
name = "_" + name
|
|
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
|
|
return prefix + name
|
|
|
|
def get_sympy_representation(source_info, network_id, sympy_map, symbol_manager):
|
|
"""Gets the SymPy expression object representing the source."""
|
|
if not source_info:
|
|
print("Warning: get_sympy_representation called with None source_info.")
|
|
return None # Or raise error
|
|
|
|
# Handle lists (OR branches) - Recursively call and combine with sympy.Or
|
|
if isinstance(source_info, list):
|
|
sympy_parts = []
|
|
all_resolved = True
|
|
for sub_source in source_info:
|
|
sub_sympy = get_sympy_representation(sub_source, network_id, sympy_map, symbol_manager)
|
|
if sub_sympy is None:
|
|
all_resolved = False
|
|
break
|
|
sympy_parts.append(sub_sympy)
|
|
|
|
if not all_resolved:
|
|
return None
|
|
if not sympy_parts:
|
|
return sympy.false # Empty OR is false
|
|
# Return sympy.Or only if there are multiple parts
|
|
return sympy.Or(*sympy_parts) if len(sympy_parts) > 1 else sympy_parts[0]
|
|
|
|
# Handle single source dictionary
|
|
source_type = source_info.get("type")
|
|
|
|
if source_type == "powerrail":
|
|
return sympy.true
|
|
elif source_type == "variable":
|
|
plc_name = extract_plc_variable_name(source_info)
|
|
if plc_name:
|
|
return symbol_manager.get_symbol(plc_name)
|
|
else:
|
|
print(f"Error: Variable source without name: {source_info}")
|
|
return None # Error case
|
|
elif source_type == "constant":
|
|
# Represent constants directly if possible, otherwise maybe as symbols?
|
|
# For boolean simplification, only TRUE/FALSE matter significantly.
|
|
dtype = str(source_info.get("datatype", "")).upper()
|
|
value = source_info.get("value")
|
|
if dtype == "BOOL":
|
|
return sympy.true if str(value).upper() == "TRUE" else sympy.false
|
|
else:
|
|
# For simplification, treat non-boolean constants as opaque symbols?
|
|
# Or just return their string representation if they won't be simplified anyway?
|
|
# Let's return their string value for now, processors will handle it.
|
|
# This might need refinement if constants need symbolic handling.
|
|
return str(value) # Or maybe symbol_manager.get_symbol(str(value))?
|
|
|
|
elif source_type == "connection":
|
|
map_key = (
|
|
network_id,
|
|
source_info.get("source_instruction_uid"),
|
|
source_info.get("source_pin"),
|
|
)
|
|
# Return the SymPy object from the map
|
|
return sympy_map.get(map_key) # Returns None if not found (dependency not ready)
|
|
elif source_type == "unknown_source":
|
|
print(f"Warning: Referring to unknown source UID: {source_info.get('uid')}")
|
|
return None # Cannot resolve
|
|
else:
|
|
print(f"Warning: Unknown source type: {source_info}")
|
|
return None # Cannot resolve
|
|
|
|
def sympy_expr_to_scl(expr, symbol_manager, format_prec=5):
|
|
"""Converts a SymPy expression to an SCL string using the symbol map."""
|
|
if expr is None: return "/* ERROR: None expression */"
|
|
if expr == sympy.true: return "TRUE"
|
|
if expr == sympy.false: return "FALSE"
|
|
|
|
# Use sympy's string printer with custom settings if needed
|
|
# For boolean, standard printing might be okay, but need to substitute symbols
|
|
try:
|
|
# Get the inverse map (py_id -> plc_name)
|
|
inverse_map = symbol_manager.get_inverse_map()
|
|
|
|
# Substitute symbols back to their py_id strings first
|
|
# Need to handle the structure (And, Or, Not)
|
|
scl_str = sympy.sstr(expr, order=None) # Basic string representation
|
|
|
|
# Now, carefully replace py_id back to PLC names using regex
|
|
# Sort keys by length descending to replace longer IDs first
|
|
for py_id in sorted(inverse_map.keys(), key=len, reverse=True):
|
|
# Use word boundaries to avoid replacing parts of other IDs
|
|
scl_str = re.sub(r'\b' + re.escape(py_id) + r'\b', inverse_map[py_id], scl_str)
|
|
|
|
# Replace SymPy operators/functions with SCL equivalents
|
|
scl_str = scl_str.replace('&', ' AND ')
|
|
scl_str = scl_str.replace('|', ' OR ')
|
|
scl_str = scl_str.replace('^', ' XOR ') # If XOR is used
|
|
scl_str = scl_str.replace('~', 'NOT ')
|
|
# Add spaces around operators if needed after substitution
|
|
scl_str = re.sub(r'AND', ' AND ', scl_str)
|
|
scl_str = re.sub(r'OR', ' OR ', scl_str)
|
|
scl_str = re.sub(r'XOR', ' XOR ', scl_str)
|
|
scl_str = re.sub(r'NOT', 'NOT ', scl_str) # Space after NOT
|
|
|
|
# Clean up potential double spaces, etc.
|
|
scl_str = re.sub(r'\s+', ' ', scl_str).strip()
|
|
# Handle parentheses potentially added by sstr - maybe remove redundant ones?
|
|
# Be careful not to break operator precedence.
|
|
|
|
return scl_str
|
|
|
|
except Exception as e:
|
|
print(f"Error converting SymPy expr '{expr}' to SCL: {e}")
|
|
traceback.print_exc()
|
|
return f"/* ERROR converting SymPy: {expr} */"
|
|
|
|
def get_scl_representation(source_info, network_id, scl_map, access_map):
|
|
if not source_info:
|
|
return None
|
|
if isinstance(source_info, list):
|
|
scl_parts = []
|
|
all_resolved = True
|
|
for sub_source in source_info:
|
|
sub_scl = get_scl_representation(
|
|
sub_source, network_id, scl_map, access_map
|
|
)
|
|
if sub_scl is None:
|
|
all_resolved = False
|
|
break
|
|
if (
|
|
sub_scl in ["TRUE", "FALSE"]
|
|
or (sub_scl.startswith('"') and sub_scl.endswith('"'))
|
|
or sub_scl.isdigit()
|
|
or (sub_scl.startswith("(") and sub_scl.endswith(")"))
|
|
):
|
|
scl_parts.append(sub_scl)
|
|
else:
|
|
scl_parts.append(f"({sub_scl})")
|
|
return (
|
|
" OR ".join(scl_parts)
|
|
if len(scl_parts) > 1
|
|
else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None
|
|
)
|
|
source_type = source_info.get("type")
|
|
if source_type == "powerrail":
|
|
return "TRUE"
|
|
elif source_type == "variable":
|
|
name = source_info.get("name")
|
|
# Asegurar que los nombres de variables se formatean correctamente aquí también
|
|
return (
|
|
format_variable_name(name)
|
|
if name
|
|
else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_"
|
|
)
|
|
elif source_type == "constant":
|
|
dtype = str(source_info.get("datatype", "")).upper()
|
|
value = source_info.get("value")
|
|
try:
|
|
if dtype == "BOOL":
|
|
return str(value).upper()
|
|
elif dtype in [
|
|
"INT",
|
|
"DINT",
|
|
"SINT",
|
|
"USINT",
|
|
"UINT",
|
|
"UDINT",
|
|
"LINT",
|
|
"ULINT",
|
|
"WORD",
|
|
"DWORD",
|
|
"LWORD",
|
|
"BYTE",
|
|
]:
|
|
return str(value)
|
|
elif dtype in ["REAL", "LREAL"]:
|
|
s_val = str(value)
|
|
return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0"
|
|
elif dtype == "STRING":
|
|
# Escapar comillas simples dentro del string si es necesario
|
|
str_val = str(value).replace("'", "''")
|
|
return f"'{str_val}'"
|
|
elif dtype == "TYPEDCONSTANT":
|
|
# Podría necesitar formateo específico basado en el tipo real
|
|
return str(value)
|
|
else:
|
|
# Otros tipos (TIME, DATE, etc.) - devolver como string por ahora
|
|
str_val = str(value).replace("'", "''")
|
|
return f"'{str_val}'"
|
|
except Exception as e:
|
|
print(f"Advertencia: Error formateando constante {source_info}: {e}")
|
|
return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_"
|
|
elif source_type == "connection":
|
|
map_key = (
|
|
network_id,
|
|
source_info.get("source_instruction_uid"),
|
|
source_info.get("source_pin"),
|
|
)
|
|
return scl_map.get(map_key)
|
|
elif source_type == "unknown_source":
|
|
print(
|
|
f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}"
|
|
)
|
|
return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_"
|
|
else:
|
|
print(f"Advertencia: Tipo de fuente desconocido: {source_info}")
|
|
return f"_ERR_INVALID_SRC_TYPE_"
|
|
|
|
def format_variable_name(name):
|
|
"""Limpia el nombre de la variable para SCL."""
|
|
if not name:
|
|
return "_INVALID_NAME_"
|
|
|
|
# Si ya está entre comillas dobles, asumimos que es un nombre complejo (ej. "DB"."Variable")
|
|
# y lo devolvemos tal cual para SCL.
|
|
if name.startswith('"') and name.endswith('"'):
|
|
# Podríamos añadir validación extra aquí si fuera necesario
|
|
return name
|
|
|
|
# Si no tiene comillas, es un nombre simple (ej. Tag_1, #tempVar)
|
|
# Reemplazar caracteres no válidos (excepto '_') por '_'
|
|
# Permitir '#' al inicio para variables temporales
|
|
prefix = ""
|
|
if name.startswith("#"):
|
|
prefix = "#"
|
|
name = name[1:]
|
|
|
|
# Permitir letras, números y guiones bajos. Reemplazar el resto.
|
|
# Asegurarse de que no empiece con número (después del # si existe)
|
|
if name and name[0].isdigit():
|
|
name = "_" + name
|
|
# Reemplazar caracteres no válidos
|
|
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
|
|
|
|
return prefix + name
|
|
|
|
def generate_temp_var_name(network_id, instr_uid, pin_name):
|
|
net_id_clean = str(network_id).replace("-", "_")
|
|
instr_uid_clean = str(instr_uid).replace("-", "_")
|
|
pin_name_clean = str(pin_name).replace("-", "_").lower()
|
|
# Usar # para variables temporales SCL estándar
|
|
return f"#_temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}"
|
|
|
|
def get_target_scl_name(instruction, pin_name, network_id, default_to_temp=True):
|
|
"""Gets the SCL formatted name for a target variable.
|
|
Handles instruction outputs AND specific inputs like Coil operand.
|
|
"""
|
|
instr_uid = instruction["instruction_uid"]
|
|
# Ahora SCL_SUFFIX está definido en este módulo
|
|
instr_type_upper = instruction.get("type", "").upper().replace(SCL_SUFFIX.upper(), "").replace("_ERROR", "") # Check original type
|
|
target_info = None
|
|
|
|
# Special handling for inputs that represent the target variable
|
|
if instr_type_upper in ["COIL", "SCOIL", "RCOIL"] and pin_name == "operand":
|
|
target_info = instruction.get("inputs", {}).get("operand")
|
|
# Add other instructions where input pin == target if necessary
|
|
# elif instr_type_upper == "XYZ" and pin_name == "some_input_target_pin":
|
|
# target_info = instruction.get("inputs", {}).get(pin_name)
|
|
else:
|
|
# Default: Assume pin_name refers to an output pin
|
|
output_pin_data = instruction.get("outputs", {}).get(pin_name)
|
|
# Check if it's a list and has one connection (standard case)
|
|
if (output_pin_data and isinstance(output_pin_data, list) and len(output_pin_data) == 1):
|
|
target_info = output_pin_data[0]
|
|
# Add handling for direct output assignment if your JSON structure supports it
|
|
|
|
target_scl = None
|
|
if target_info:
|
|
if target_info.get("type") == "variable":
|
|
plc_name = target_info.get("name")
|
|
if plc_name:
|
|
target_scl = format_variable_name(plc_name) # Use existing util
|
|
else:
|
|
print(f"Error: Target variable for {instr_uid}.{pin_name} has no name (UID: {target_info.get('uid')}).")
|
|
elif target_info.get("type") == "constant":
|
|
print(f"Advertencia: Attempt to write to constant target {instr_uid}.{pin_name} (UID: {target_info.get('uid')}).")
|
|
# else: # Handle other target types if needed
|
|
# print(f"Advertencia: Target {instr_uid}.{pin_name} is not a variable: {target_info.get('type')}.")
|
|
# else: # No target info found for the specified pin
|
|
# print(f"DEBUG: No target info found for {instr_uid}.{pin_name}")
|
|
pass
|
|
|
|
|
|
# Handle default_to_temp logic
|
|
if target_scl:
|
|
return target_scl
|
|
elif default_to_temp:
|
|
# Generate temp only if no explicit target was found AND default is allowed
|
|
print(f"INFO: Generating temp var for {instr_uid}.{pin_name}") # Be informative
|
|
return generate_temp_var_name(network_id, instr_uid, pin_name)
|
|
else:
|
|
# No target found and default temps not allowed
|
|
return None
|