Simatic_XML_Parser_to_SCL/processors/processor_utils.py

311 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# processors/processor_utils.py
import re
import sympy
from .symbol_manager import SymbolManager, extract_plc_variable_name
SCL_SUFFIX = "_sympy_processed" # <<< AÑADE ESTA LÍNEA
def format_variable_name(name):
"""Limpia el nombre de la variable para SCL."""
if not name:
return "_INVALID_NAME_"
if name.startswith('"') and name.endswith('"'):
return name
prefix = ""
if name.startswith("#"):
prefix = "#"
name = name[1:]
if name and name[0].isdigit():
name = "_" + name
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
def get_sympy_representation(source_info, network_id, sympy_map, symbol_manager):
"""Gets the SymPy expression object representing the source."""
if not source_info:
print("Warning: get_sympy_representation called with None source_info.")
return None # Or raise error
# Handle lists (OR branches) - Recursively call and combine with sympy.Or
if isinstance(source_info, list):
sympy_parts = []
all_resolved = True
for sub_source in source_info:
sub_sympy = get_sympy_representation(sub_source, network_id, sympy_map, symbol_manager)
if sub_sympy is None:
all_resolved = False
break
sympy_parts.append(sub_sympy)
if not all_resolved:
return None
if not sympy_parts:
return sympy.false # Empty OR is false
# Return sympy.Or only if there are multiple parts
return sympy.Or(*sympy_parts) if len(sympy_parts) > 1 else sympy_parts[0]
# Handle single source dictionary
source_type = source_info.get("type")
if source_type == "powerrail":
return sympy.true
elif source_type == "variable":
plc_name = extract_plc_variable_name(source_info)
if plc_name:
return symbol_manager.get_symbol(plc_name)
else:
print(f"Error: Variable source without name: {source_info}")
return None # Error case
elif source_type == "constant":
# Represent constants directly if possible, otherwise maybe as symbols?
# For boolean simplification, only TRUE/FALSE matter significantly.
dtype = str(source_info.get("datatype", "")).upper()
value = source_info.get("value")
if dtype == "BOOL":
return sympy.true if str(value).upper() == "TRUE" else sympy.false
else:
# For simplification, treat non-boolean constants as opaque symbols?
# Or just return their string representation if they won't be simplified anyway?
# Let's return their string value for now, processors will handle it.
# This might need refinement if constants need symbolic handling.
return str(value) # Or maybe symbol_manager.get_symbol(str(value))?
elif source_type == "connection":
map_key = (
network_id,
source_info.get("source_instruction_uid"),
source_info.get("source_pin"),
)
# Return the SymPy object from the map
return sympy_map.get(map_key) # Returns None if not found (dependency not ready)
elif source_type == "unknown_source":
print(f"Warning: Referring to unknown source UID: {source_info.get('uid')}")
return None # Cannot resolve
else:
print(f"Warning: Unknown source type: {source_info}")
return None # Cannot resolve
def sympy_expr_to_scl(expr, symbol_manager, format_prec=5):
"""Converts a SymPy expression to an SCL string using the symbol map."""
if expr is None: return "/* ERROR: None expression */"
if expr == sympy.true: return "TRUE"
if expr == sympy.false: return "FALSE"
# Use sympy's string printer with custom settings if needed
# For boolean, standard printing might be okay, but need to substitute symbols
try:
# Get the inverse map (py_id -> plc_name)
inverse_map = symbol_manager.get_inverse_map()
# Substitute symbols back to their py_id strings first
# Need to handle the structure (And, Or, Not)
scl_str = sympy.sstr(expr, order=None) # Basic string representation
# Now, carefully replace py_id back to PLC names using regex
# Sort keys by length descending to replace longer IDs first
for py_id in sorted(inverse_map.keys(), key=len, reverse=True):
# Use word boundaries to avoid replacing parts of other IDs
scl_str = re.sub(r'\b' + re.escape(py_id) + r'\b', inverse_map[py_id], scl_str)
# Replace SymPy operators/functions with SCL equivalents
scl_str = scl_str.replace('&', ' AND ')
scl_str = scl_str.replace('|', ' OR ')
scl_str = scl_str.replace('^', ' XOR ') # If XOR is used
scl_str = scl_str.replace('~', 'NOT ')
# Add spaces around operators if needed after substitution
scl_str = re.sub(r'AND', ' AND ', scl_str)
scl_str = re.sub(r'OR', ' OR ', scl_str)
scl_str = re.sub(r'XOR', ' XOR ', scl_str)
scl_str = re.sub(r'NOT', 'NOT ', scl_str) # Space after NOT
# Clean up potential double spaces, etc.
scl_str = re.sub(r'\s+', ' ', scl_str).strip()
# Handle parentheses potentially added by sstr - maybe remove redundant ones?
# Be careful not to break operator precedence.
return scl_str
except Exception as e:
print(f"Error converting SymPy expr '{expr}' to SCL: {e}")
traceback.print_exc()
return f"/* ERROR converting SymPy: {expr} */"
def get_scl_representation(source_info, network_id, scl_map, access_map):
if not source_info:
return None
if isinstance(source_info, list):
scl_parts = []
all_resolved = True
for sub_source in source_info:
sub_scl = get_scl_representation(
sub_source, network_id, scl_map, access_map
)
if sub_scl is None:
all_resolved = False
break
if (
sub_scl in ["TRUE", "FALSE"]
or (sub_scl.startswith('"') and sub_scl.endswith('"'))
or sub_scl.isdigit()
or (sub_scl.startswith("(") and sub_scl.endswith(")"))
):
scl_parts.append(sub_scl)
else:
scl_parts.append(f"({sub_scl})")
return (
" OR ".join(scl_parts)
if len(scl_parts) > 1
else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None
)
source_type = source_info.get("type")
if source_type == "powerrail":
return "TRUE"
elif source_type == "variable":
name = source_info.get("name")
# Asegurar que los nombres de variables se formatean correctamente aquí también
return (
format_variable_name(name)
if name
else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_"
)
elif source_type == "constant":
dtype = str(source_info.get("datatype", "")).upper()
value = source_info.get("value")
try:
if dtype == "BOOL":
return str(value).upper()
elif dtype in [
"INT",
"DINT",
"SINT",
"USINT",
"UINT",
"UDINT",
"LINT",
"ULINT",
"WORD",
"DWORD",
"LWORD",
"BYTE",
]:
return str(value)
elif dtype in ["REAL", "LREAL"]:
s_val = str(value)
return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0"
elif dtype == "STRING":
# Escapar comillas simples dentro del string si es necesario
str_val = str(value).replace("'", "''")
return f"'{str_val}'"
elif dtype == "TYPEDCONSTANT":
# Podría necesitar formateo específico basado en el tipo real
return str(value)
else:
# Otros tipos (TIME, DATE, etc.) - devolver como string por ahora
str_val = str(value).replace("'", "''")
return f"'{str_val}'"
except Exception as e:
print(f"Advertencia: Error formateando constante {source_info}: {e}")
return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_"
elif source_type == "connection":
map_key = (
network_id,
source_info.get("source_instruction_uid"),
source_info.get("source_pin"),
)
return scl_map.get(map_key)
elif source_type == "unknown_source":
print(
f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}"
)
return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_"
else:
print(f"Advertencia: Tipo de fuente desconocido: {source_info}")
return f"_ERR_INVALID_SRC_TYPE_"
def format_variable_name(name):
"""Limpia el nombre de la variable para SCL."""
if not name:
return "_INVALID_NAME_"
# Si ya está entre comillas dobles, asumimos que es un nombre complejo (ej. "DB"."Variable")
# y lo devolvemos tal cual para SCL.
if name.startswith('"') and name.endswith('"'):
# Podríamos añadir validación extra aquí si fuera necesario
return name
# Si no tiene comillas, es un nombre simple (ej. Tag_1, #tempVar)
# Reemplazar caracteres no válidos (excepto '_') por '_'
# Permitir '#' al inicio para variables temporales
prefix = ""
if name.startswith("#"):
prefix = "#"
name = name[1:]
# Permitir letras, números y guiones bajos. Reemplazar el resto.
# Asegurarse de que no empiece con número (después del # si existe)
if name and name[0].isdigit():
name = "_" + name
# Reemplazar caracteres no válidos
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
def generate_temp_var_name(network_id, instr_uid, pin_name):
net_id_clean = str(network_id).replace("-", "_")
instr_uid_clean = str(instr_uid).replace("-", "_")
pin_name_clean = str(pin_name).replace("-", "_").lower()
# Usar # para variables temporales SCL estándar
return f"#_temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}"
def get_target_scl_name(instruction, pin_name, network_id, default_to_temp=True):
"""Gets the SCL formatted name for a target variable.
Handles instruction outputs AND specific inputs like Coil operand.
"""
instr_uid = instruction["instruction_uid"]
# Ahora SCL_SUFFIX está definido en este módulo
instr_type_upper = instruction.get("type", "").upper().replace(SCL_SUFFIX.upper(), "").replace("_ERROR", "") # Check original type
target_info = None
# Special handling for inputs that represent the target variable
if instr_type_upper in ["COIL", "SCOIL", "RCOIL"] and pin_name == "operand":
target_info = instruction.get("inputs", {}).get("operand")
# Add other instructions where input pin == target if necessary
# elif instr_type_upper == "XYZ" and pin_name == "some_input_target_pin":
# target_info = instruction.get("inputs", {}).get(pin_name)
else:
# Default: Assume pin_name refers to an output pin
output_pin_data = instruction.get("outputs", {}).get(pin_name)
# Check if it's a list and has one connection (standard case)
if (output_pin_data and isinstance(output_pin_data, list) and len(output_pin_data) == 1):
target_info = output_pin_data[0]
# Add handling for direct output assignment if your JSON structure supports it
target_scl = None
if target_info:
if target_info.get("type") == "variable":
plc_name = target_info.get("name")
if plc_name:
target_scl = format_variable_name(plc_name) # Use existing util
else:
print(f"Error: Target variable for {instr_uid}.{pin_name} has no name (UID: {target_info.get('uid')}).")
elif target_info.get("type") == "constant":
print(f"Advertencia: Attempt to write to constant target {instr_uid}.{pin_name} (UID: {target_info.get('uid')}).")
# else: # Handle other target types if needed
# print(f"Advertencia: Target {instr_uid}.{pin_name} is not a variable: {target_info.get('type')}.")
# else: # No target info found for the specified pin
# print(f"DEBUG: No target info found for {instr_uid}.{pin_name}")
pass
# Handle default_to_temp logic
if target_scl:
return target_scl
elif default_to_temp:
# Generate temp only if no explicit target was found AND default is allowed
print(f"INFO: Generating temp var for {instr_uid}.{pin_name}") # Be informative
return generate_temp_var_name(network_id, instr_uid, pin_name)
else:
# No target found and default temps not allowed
return None