Simatic_XML_Parser_to_SCL/process.py

492 lines
23 KiB
Python

# -*- coding: utf-8 -*-
import json
import os
import copy
import traceback
# --- Constantes y Configuración ---
SCL_SUFFIX = "_scl"
# --- Helper Functions ---
def get_scl_representation(source_info, network_id, scl_map, access_map):
"""
Busca la representación SCL de una entrada.
source_info: Puede ser {'type': 'powerrail'}, un Access dict, o un Connection dict, o una lista (OR).
"""
if not source_info:
return None
# Si es una lista (rama OR), procesarla recursivamente
if isinstance(source_info, list):
scl_parts = []
all_resolved = True
for sub_source in source_info:
sub_scl = get_scl_representation(sub_source, network_id, scl_map, access_map)
if sub_scl is None:
all_resolved = False
# print(f"DEBUG: Dependencia no resuelta DENTRO de rama OR: {sub_source}")
break
scl_parts.append(f"({sub_scl})")
if all_resolved:
or_expr = " OR ".join(scl_parts)
# print(f"DEBUG: Rama OR resuelta a: {or_expr}")
return or_expr
else:
return None
# Si no es lista, procesar como fuente única
source_type = source_info.get('type')
if source_type == 'powerrail':
return "TRUE" # Condición inicial
elif source_type == 'variable':
return source_info.get('name', f"_ERR_VAR_{source_info.get('uid')}_")
elif source_type == 'constant':
dtype = str(source_info.get('datatype', '')).upper()
value = source_info.get('value')
try: # Añadir try-except para robustez en formato
if dtype == 'BOOL': return str(value).upper()
elif dtype in ['INT', 'DINT', 'SINT', 'USINT', 'UINT', 'UDINT', 'LINT', 'ULINT', 'WORD', 'DWORD', 'LWORD', 'BYTE']: return str(value)
elif dtype in ['REAL', 'LREAL']:
s_val = str(value)
return s_val if '.' in s_val else s_val + ".0"
elif dtype == 'STRING': return f"'{str(value)}'"
elif dtype == 'TYPEDCONSTANT': return str(value) # Ej: DINT#60
else: return f"'{str(value)}'" # Otros tipos como string
except Exception as e:
print(f"Advertencia: Error formateando constante {source_info}: {e}")
return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_"
elif source_type == 'connection':
map_key = (network_id, source_info.get('source_instruction_uid'), source_info.get('source_pin'))
if map_key in scl_map:
# print(f"DEBUG: Valor encontrado en scl_map para {map_key}: {scl_map[map_key]}")
return scl_map[map_key]
else:
# print(f"DEBUG: Valor NO encontrado en scl_map para {map_key}")
return None # Dependencia no resuelta
else:
print(f"Advertencia: Tipo de fuente desconocido o inválido: {source_info}")
return f"_ERR_UNKNOWN_SOURCE_"
def generate_temp_var_name(network_id, instr_uid, pin_name):
"""Genera un nombre único para una variable temporal SCL."""
net_id_clean = str(network_id).replace('-', '_')
instr_uid_clean = str(instr_uid).replace('-', '_')
pin_name_clean = str(pin_name).replace('-', '_').lower() # lower para consistencia
return f"_temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}"
def get_target_scl_name(instruction, output_pin_name, network_id, default_to_temp=True):
"""Determina el nombre SCL del destino (variable o temporal)."""
instr_uid = instruction['instruction_uid']
output_pin_data = instruction['outputs'].get(output_pin_name)
target_scl = None
if output_pin_data and isinstance(output_pin_data, list) and len(output_pin_data) == 1:
dest_access = output_pin_data[0]
if dest_access.get('type') == 'variable':
target_scl = dest_access.get('name')
# print(f"DEBUG: Target para {instr_uid}.{output_pin_name} es variable directa: {target_scl}")
elif dest_access.get('type') == 'constant':
print(f"Advertencia: Instrucción {instr_uid} intenta escribir en constante UID {dest_access.get('uid')}. Usando temporal.")
if default_to_temp: target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
else:
print(f"Advertencia: Destino de {instr_uid}.{output_pin_name} no es variable: {dest_access.get('type')}. Usando temporal si aplica.")
if default_to_temp: target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
elif default_to_temp:
# Si no hay salida, o va a múltiples sitios, o no es Access tipo variable, usar temporal
# print(f"DEBUG: Usando temporal para {instr_uid}.{output_pin_name}")
target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
# print(f"DEBUG: Target final para {instr_uid}.{output_pin_name}: {target_scl}")
return target_scl
# --- Procesadores de Instrucciones ---
def process_contact(instruction, network_id, scl_map, access_map):
"""Traduce Contact a una expresión booleana SCL y actualiza scl_map."""
instr_uid = instruction['instruction_uid']
instr_type = instruction['type']
if instr_type.endswith(SCL_SUFFIX): return False
# Asume Contact normal, necesitaría info adicional para negado
# (Podría estar en 'Name' o TemplateValue si el JSON lo capturara)
is_negated = False # TODO: Determinar si es negado (ej. check instruction['Name'] == 'ContactN')
print(f"DEBUG: Intentando procesar CONTACT{' (N)' if is_negated else ''} - UID: {instr_uid} en Red: {network_id}")
in_rlo_scl = get_scl_representation(instruction['inputs'].get('in'), network_id, scl_map, access_map)
operand_scl = get_scl_representation(instruction['inputs'].get('operand'), network_id, scl_map, access_map)
if in_rlo_scl is None or operand_scl is None:
print(f"DEBUG: Dependencia no resuelta para CONTACT UID: {instr_uid} (in={in_rlo_scl}, op={operand_scl})")
return False
# Construir nueva expresión RLO
term = f"(NOT {operand_scl})" if is_negated else operand_scl
new_rlo_scl = ""
if in_rlo_scl == "TRUE": # Inicio de línea o RLO anterior era TRUE
new_rlo_scl = term
else: # Combinar con RLO anterior
# Quitar paréntesis externos si el RLO anterior ya los tiene
if in_rlo_scl.startswith('(') and in_rlo_scl.endswith(')'):
in_rlo_processed = in_rlo_scl
else:
in_rlo_processed = f"({in_rlo_scl})" # Asegurar paréntesis
new_rlo_scl = f"{in_rlo_processed} AND {term}"
# Actualizar scl_map con el nuevo RLO resultante de este contacto
map_key = (network_id, instr_uid, 'out')
scl_map[map_key] = new_rlo_scl
# print(f"DEBUG: CONTACT UID: {instr_uid} - Nuevo RLO en scl_map[{map_key}] = {new_rlo_scl}")
# Los contactos no generan SCL directo, solo actualizan el flujo lógico (RLO)
instruction['scl'] = f"// RLO updated by Contact {instr_uid}: {new_rlo_scl}" # Comentario opcional
instruction['type'] = instr_type + SCL_SUFFIX
return True
def process_eq(instruction, network_id, scl_map, access_map):
"""Traduce Eq (comparación) a una expresión booleana SCL y actualiza scl_map."""
instr_uid = instruction['instruction_uid']
instr_type = instruction['type']
if instr_type.endswith(SCL_SUFFIX): return False
print(f"DEBUG: Intentando procesar EQ - UID: {instr_uid} en Red: {network_id}")
in1_scl = get_scl_representation(instruction['inputs'].get('in1'), network_id, scl_map, access_map)
in2_scl = get_scl_representation(instruction['inputs'].get('in2'), network_id, scl_map, access_map)
# 'pre' generalmente no se traduce directamente a SCL para comparaciones simples
# pre_scl = get_scl_representation(instruction['inputs'].get('pre'), network_id, scl_map, access_map)
if in1_scl is None or in2_scl is None: # Ignoramos 'pre' por ahora
print(f"DEBUG: Dependencia no resuelta para EQ UID: {instr_uid} (in1={in1_scl}, in2={in2_scl})")
return False
# Generar expresión de comparación
comparison_scl = f"({in1_scl} = {in2_scl})"
# Actualizar scl_map con el resultado booleano
map_key = (network_id, instr_uid, 'out')
scl_map[map_key] = comparison_scl
# print(f"DEBUG: EQ UID: {instr_uid} - Resultado en scl_map[{map_key}] = {comparison_scl}")
# Eq no genera SCL directo, solo el resultado booleano
instruction['scl'] = f"// Comparison Eq {instr_uid}: {comparison_scl}" # Comentario opcional
instruction['type'] = instr_type + SCL_SUFFIX
return True
def process_coil(instruction, network_id, scl_map, access_map):
"""Traduce Coil a una asignación SCL."""
instr_uid = instruction['instruction_uid']
instr_type = instruction['type']
if instr_type.endswith(SCL_SUFFIX): return False
print(f"DEBUG: Intentando procesar COIL - UID: {instr_uid} en Red: {network_id}")
in_rlo_scl = get_scl_representation(instruction['inputs'].get('in'), network_id, scl_map, access_map)
operand_scl = get_scl_representation(instruction['inputs'].get('operand'), network_id, scl_map, access_map)
if in_rlo_scl is None or operand_scl is None:
print(f"DEBUG: Dependencia no resuelta para COIL UID: {instr_uid} (in={in_rlo_scl}, op={operand_scl})")
return False
# Verificar que el operando sea una variable
operand_info = instruction['inputs'].get('operand')
if not (operand_info and operand_info.get('type') == 'variable'):
print(f"Error: Operando de COIL UID {instr_uid} no es una variable: {operand_info}")
# No se puede asignar a una constante o algo desconocido
instruction['scl'] = f"// ERROR: Coil {instr_uid} operando no es variable"
instruction['type'] = instr_type + "_error" # Marcar como error
return True # Marcado como procesado (con error)
# Generar la asignación SCL
scl_final = f"{operand_scl} := {in_rlo_scl};"
instruction['scl'] = scl_final
instruction['type'] = instr_type + SCL_SUFFIX
# Coil consume el RLO, no añade nada al scl_map para la salida 'out' (que no tiene)
print(f"INFO: COIL UID: {instr_uid} procesado. SCL: {scl_final}")
return True
def process_convert(instruction, network_id, scl_map, access_map):
"""Traduce Convert a SCL, usando temporal si es necesario."""
instr_uid = instruction['instruction_uid']
instr_type = instruction['type']
if instr_type.endswith(SCL_SUFFIX): return False
print(f"DEBUG: Intentando procesar CONVERT - UID: {instr_uid} en Red: {network_id}")
en_scl = get_scl_representation(instruction['inputs'].get('en'), network_id, scl_map, access_map)
in_scl = get_scl_representation(instruction['inputs'].get('in'), network_id, scl_map, access_map)
if en_scl is None or in_scl is None:
print(f"DEBUG: Dependencia no resuelta para CONVERT UID: {instr_uid} (en={en_scl}, in={in_scl})")
return False
# Determinar destino (variable o temporal) - Usa pin 'out'
target_scl = get_target_scl_name(instruction, 'out', network_id, default_to_temp=True)
if target_scl is None: # Should not happen with default_to_temp=True
print(f"Error Interno: No se pudo determinar destino para CONVERT UID {instr_uid}")
return False
# Determinar función de conversión (simplificado, requiere tipos exactos)
# TODO: Necesitaría info de TemplateValue para tipos Src/Dest
# Por ahora, asumimos conversión implícita o directa si no podemos determinar
conversion_expr = in_scl # Asume asignación directa por defecto
# Ejemplo (si tuviéramos tipos):
# src_type = instruction['template_values'].get('SrcType')
# dest_type = instruction['template_values'].get('DestType')
# if src_type == 'Int' and dest_type == 'DInt': conversion_expr = f"INT_TO_DINT({in_scl})"
# elif src_type == 'DInt' and dest_type == 'Real': conversion_expr = f"DINT_TO_REAL({in_scl})"
# else: conversion_expr = in_scl # Si no hay conversión específica conocida
# Generar SCL
scl_core = f"{target_scl} := {conversion_expr};"
scl_final = scl_core if en_scl == "TRUE" else f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
instruction['scl'] = scl_final
instruction['type'] = instr_type + SCL_SUFFIX
# Actualizar scl_map con el resultado (nombre de la variable destino o temporal)
map_key = (network_id, instr_uid, 'out')
scl_map[map_key] = target_scl
# print(f"DEBUG: CONVERT UID: {instr_uid} - Resultado en scl_map[{map_key}] = {target_scl}")
print(f"INFO: CONVERT UID: {instr_uid} procesado. SCL: {scl_final.splitlines()[0]}...")
return True
def process_mod(instruction, network_id, scl_map, access_map):
"""Traduce Mod (módulo) a SCL, usando temporal si es necesario."""
instr_uid = instruction['instruction_uid']
instr_type = instruction['type']
if instr_type.endswith(SCL_SUFFIX): return False
print(f"DEBUG: Intentando procesar MOD - UID: {instr_uid} en Red: {network_id}")
en_scl = get_scl_representation(instruction['inputs'].get('en'), network_id, scl_map, access_map)
in1_scl = get_scl_representation(instruction['inputs'].get('in1'), network_id, scl_map, access_map)
in2_scl = get_scl_representation(instruction['inputs'].get('in2'), network_id, scl_map, access_map)
if en_scl is None or in1_scl is None or in2_scl is None:
print(f"DEBUG: Dependencia no resuelta para MOD UID: {instr_uid} (en={en_scl}, in1={in1_scl}, in2={in2_scl})")
return False
# Determinar destino (variable o temporal) - Usa pin 'out'
target_scl = get_target_scl_name(instruction, 'out', network_id, default_to_temp=True)
if target_scl is None:
print(f"Error Interno: No se pudo determinar destino para MOD UID {instr_uid}")
return False
# Generar SCL
# Asegurar paréntesis por si las entradas son expresiones complejas
scl_core = f"{target_scl} := ({in1_scl}) MOD ({in2_scl});"
scl_final = scl_core if en_scl == "TRUE" else f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
instruction['scl'] = scl_final
instruction['type'] = instr_type + SCL_SUFFIX
# Actualizar scl_map con el resultado (nombre de la variable destino o temporal)
map_key = (network_id, instr_uid, 'out')
scl_map[map_key] = target_scl
# print(f"DEBUG: MOD UID: {instr_uid} - Resultado en scl_map[{map_key}] = {target_scl}")
print(f"INFO: MOD UID: {instr_uid} procesado. SCL: {scl_final.splitlines()[0]}...")
return True
# --- (process_add, process_move sin cambios significativos respecto a la versión anterior) ---
def process_add(instruction, network_id, scl_map, access_map):
instr_uid = instruction['instruction_uid']
instr_type = instruction['type']
# No procesar si ya tiene sufijo _scl o _error
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
print(f"DEBUG: Intentando procesar ADD - UID: {instr_uid} en Red: {network_id}")
en_scl = get_scl_representation(instruction['inputs'].get('en'), network_id, scl_map, access_map)
in1_scl = get_scl_representation(instruction['inputs'].get('in1'), network_id, scl_map, access_map)
in2_scl = get_scl_representation(instruction['inputs'].get('in2'), network_id, scl_map, access_map)
# TODO: Manejar más entradas (in3, in4...)
if en_scl is None or in1_scl is None or in2_scl is None:
print(f"DEBUG: Dependencia no resuelta para ADD UID: {instr_uid} (en={en_scl}, in1={in1_scl}, in2={in2_scl})")
return False # <<<--- Sale correctamente si hay dependencias
# Si llegamos aquí, TODAS las dependencias están resueltas
print(f"DEBUG: Dependencias RESUELTAS para ADD UID: {instr_uid}")
target_scl = get_target_scl_name(instruction, 'out', network_id, default_to_temp=True)
if target_scl is None:
print(f"Error Interno: No se pudo determinar destino para ADD UID {instr_uid}")
instruction['scl'] = f"// ERROR: No se pudo determinar destino para Add {instr_uid}"
instruction['type'] += "_error"
return True # Se procesó (con error)
# Generar SCL Core
scl_core = f"{target_scl} := ({in1_scl}) + ({in2_scl});" # Paréntesis por seguridad
# Añadir IF si es necesario
scl_final = scl_core if en_scl == "TRUE" else f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
# <<<--- ¡ACTUALIZAR EL JSON AQUÍ! --- >>>
instruction['scl'] = scl_final
instruction['type'] = instr_type + SCL_SUFFIX # Marcar como procesado
# Actualizar el mapa SCL con el resultado de esta instrucción
map_key = (network_id, instr_uid, 'out')
scl_map[map_key] = target_scl # El valor es el nombre de la variable (temporal o destino)
print(f"INFO: ADD UID: {instr_uid} procesado. SCL: {scl_final.splitlines()[0]}...")
return True # <<<--- ¡DEVOLVER TRUE PORQUE HUBO CAMBIO! --- >>>
def process_move(instruction, network_id, scl_map, access_map):
instr_uid = instruction['instruction_uid']
instr_type = instruction['type']
# No procesar si ya tiene sufijo _scl o _error
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
print(f"DEBUG: Intentando procesar MOVE - UID: {instr_uid} en Red: {network_id}")
en_scl = get_scl_representation(instruction['inputs'].get('en'), network_id, scl_map, access_map)
in_scl = get_scl_representation(instruction['inputs'].get('in'), network_id, scl_map, access_map)
if en_scl is None or in_scl is None:
print(f"DEBUG: Dependencia no resuelta para MOVE UID: {instr_uid} (en={en_scl}, in={in_scl})")
return False # <<<--- Sale correctamente si hay dependencias
# Si llegamos aquí, TODAS las dependencias están resueltas
print(f"DEBUG: Dependencias RESUELTAS para MOVE UID: {instr_uid}")
# MOVE asigna, así que el destino NO debe ser temporal por defecto.
target_scl = get_target_scl_name(instruction, 'out1', network_id, default_to_temp=False)
if target_scl is None:
print(f"Advertencia: MOVE UID: {instr_uid} no tiene un destino variable único claro en out1. No se procesa.")
# No marcamos como error necesariamente, simplemente no se pudo procesar.
return False # <<<--- Devuelve False si no se puede procesar
# Generar SCL Core
scl_core = f"{target_scl} := {in_scl};"
# Añadir IF si es necesario
scl_final = scl_core if en_scl == "TRUE" else f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
# <<<--- ¡ACTUALIZAR EL JSON AQUÍ! --- >>>
instruction['scl'] = scl_final
instruction['type'] = instr_type + SCL_SUFFIX # Marcar como procesado
# No añadir a scl_map ya que MOVE asigna directamente.
print(f"INFO: MOVE UID: {instr_uid} procesado. SCL: {scl_final.splitlines()[0]}...")
return True # <<<--- ¡DEVOLVER TRUE PORQUE HUBO CAMBIO! --- >>>
# --- Bucle Principal de Procesamiento ---
def process_json_to_scl(json_filepath):
if not os.path.exists(json_filepath):
print(f"Error: Archivo JSON no encontrado en {json_filepath}")
return
print(f"Cargando JSON desde: {json_filepath}")
try:
with open(json_filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception as e:
print(f"Error al cargar o parsear JSON: {e}")
return
# Reconstruir access_map dinámicamente (MEJOR SI VIENE DEL JSON)
network_access_maps = {}
print("Creando mapas de acceso por red...")
for network in data.get('networks', []):
net_id = network['id']
current_access_map = {}
for instr in network.get('logic', []):
# Chequear inputs
for pin, source in instr.get('inputs', {}).items():
sources_to_check = source if isinstance(source, list) else ([source] if isinstance(source, dict) else [])
for src in sources_to_check:
if isinstance(src, dict) and src.get('uid') and src.get('scope') and src.get('type') in ['variable', 'constant']:
current_access_map[src['uid']] = src
# Chequear outputs
for pin, dest_list in instr.get('outputs', {}).items():
if isinstance(dest_list, list):
for dest in dest_list:
if isinstance(dest, dict) and dest.get('uid') and dest.get('scope') and dest.get('type') in ['variable', 'constant']:
current_access_map[dest['uid']] = dest
network_access_maps[net_id] = current_access_map
# print(f"Red {net_id}: {len(current_access_map)} accesos encontrados (aprox).")
scl_map = {}
max_passes = 20
passes = 0
# Lista de procesadores con nueva prioridad
processors = [
process_convert, # Genera valores, posiblemente temporales
process_mod, # Genera valores, posiblemente temporales
process_eq, # Genera condiciones booleanas
# Añadir otros comparadores (Ne, Gt, Lt...) aquí
process_contact, # Combina condiciones booleanas (actualiza RLO)
process_move, # Asigna valores (usa RLO de Contact/Eq)
process_add, # Calcula (usa RLO de Contact/Eq)
process_coil, # Asigna RLO final
# Añadir process_pbox, process_O, etc.
]
print("\n--- Iniciando Bucle de Procesamiento Iterativo ---")
while passes < max_passes:
passes += 1
made_change_in_pass = False
print(f"\n--- Pase {passes} ---")
for network in data.get('networks', []):
network_id = network['id']
access_map = network_access_maps.get(network_id, {})
for instruction in network.get('logic', []):
# Saltar si ya está procesado
if instruction['type'].endswith(SCL_SUFFIX) or "_error" in instruction['type']:
continue
# Intentar aplicar cada procesador
for processor_func in processors:
if instruction['type'] == processor_func.__name__.split('_')[1].capitalize(): # Compara tipo con nombre de función
try:
changed = processor_func(instruction, network_id, scl_map, access_map)
if changed:
made_change_in_pass = True
break # Pasar a la siguiente instrucción
except Exception as e:
print(f"ERROR al ejecutar {processor_func.__name__} en UID {instruction.get('instruction_uid')} Red {network_id}: {e}")
traceback.print_exc()
instruction['scl'] = f"// ERROR during processing: {e}"
instruction['type'] += "_error" # Marcar como error
made_change_in_pass = True # Hubo un cambio (a estado de error)
break # No intentar otros procesadores si hubo error
if not made_change_in_pass:
print(f"\n--- No se hicieron cambios en el pase {passes}. Proceso completado. ---")
break
elif passes == max_passes:
print(f"\n--- Límite de {max_passes} pases alcanzado. Puede haber dependencias circulares o lógica no procesada. ---")
output_filename = json_filepath.replace('.json', '_scl_processed.json')
print(f"\nGuardando JSON procesado en: {output_filename}")
try:
with open(output_filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
print("Guardado completado.")
except Exception as e:
print(f"Error al guardar el JSON procesado: {e}")
# --- Ejecución ---
if __name__ == "__main__":
input_json_file = 'BlenderRun_ProdTime_simplified.json'
process_json_to_scl(input_json_file)