Simatic_XML_Parser_to_SCL/x2_process.py

476 lines
23 KiB
Python

# -*- coding: utf-8 -*-
import json
import argparse
import os
import copy
import traceback
import re
import importlib
import sys
# --- Constantes y Configuración ---
SCL_SUFFIX = "_scl"
GROUPED_COMMENT = "// Logic included in grouped IF"
# Global data variable
data = {}
def process_group_ifs(instruction, network_id, scl_map, access_map, data):
"""
Busca instrucciones que generan condiciones (Contact, O, Eq, PBox, etc.) ya procesadas
y, si habilitan un grupo (>1) de bloques funcionales (Move, Add, Call, etc.),
construye el bloque IF agrupado.
Modifica el campo 'scl' de la instrucción generadora de condición.
"""
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
instr_type_original = instr_type.replace("_scl", "").replace("_error", "")
made_change = False
# Solo actuar sobre generadores de condición ya procesados (_scl)
# y que no sean ellos mismos errores o ya agrupados por otro IF
if (
not instr_type.endswith("_scl")
or "_error" in instr_type
or instruction.get("grouped", False)
or instr_type_original
not in [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"PBox",
"And",
"Xor",
]
): # Añadir más si es necesario
return False
# Evitar reagrupar si ya se hizo (comprobando si SCL ya es un IF complejo)
current_scl = instruction.get("scl", "")
if current_scl.strip().startswith("IF") and "END_IF;" in current_scl:
# print(f"DEBUG Group: {instr_uid} ya tiene IF complejo, saltando agrupación.")
return False
# Ignorar comentarios simples que empiezan por IF
if current_scl.strip().startswith("//") and "IF" in current_scl:
return False
# Obtener la condición generada por esta instrucción (debería estar en scl_map['out'])
map_key_out = (network_id, instr_uid, "out")
condition_scl = scl_map.get(map_key_out)
# No agrupar para condiciones triviales, no encontradas o ya agrupadas
if condition_scl is None or condition_scl in ["TRUE", "FALSE"]:
return False
# Encontrar todos los bloques funcionales habilitados DIRECTAMENTE por esta condición
grouped_instructions_cores = [] # Lista de SCL 'core' de los consumidores
consumer_instr_list = [] # Lista de instrucciones consumidoras
network_logic = next(
(net["logic"] for net in data["networks"] if net["id"] == network_id), []
)
if not network_logic:
return False
# Identificar los tipos de instrucciones que queremos agrupar (bloques funcionales)
groupable_types = [
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call_FC",
"Call_FB",
] # Añadir más si es necesario
for consumer_instr in network_logic:
consumer_uid = consumer_instr["instruction_uid"]
# Saltar si ya está agrupado por otra condición o es él mismo
if consumer_instr.get("grouped", False) or consumer_uid == instr_uid:
continue
consumer_en = consumer_instr.get("inputs", {}).get("en")
consumer_type = consumer_instr.get("type", "") # Tipo actual (_scl o no)
consumer_type_original = consumer_type.replace("_scl", "").replace("_error", "")
# ¿Está la entrada 'en' del consumidor conectada a nuestra salida 'out'?
is_enabled_by_us = False
if (
isinstance(consumer_en, dict)
and consumer_en.get("type") == "connection"
and consumer_en.get("source_instruction_uid") == instr_uid
and consumer_en.get("source_pin")
== "out" # Condición viene de 'out' del generador
):
is_enabled_by_us = True
# ¿Es un tipo de instrucción agrupable y ya procesado (tiene SCL)?
if (
is_enabled_by_us
and consumer_type.endswith("_scl")
and consumer_type_original in groupable_types
):
consumer_scl = consumer_instr.get("scl", "")
# Extraer el SCL core (la parte DENTRO del IF o la línea única si no había IF)
core_scl = None
if consumer_scl.strip().startswith("IF"):
# Extraer todo entre THEN y END_IF;
match = re.search(
r"IF\s+.*\s+THEN\s*(.*?)\s*END_IF;",
consumer_scl,
re.DOTALL | re.IGNORECASE,
)
if match:
core_scl = match.group(1).strip()
elif consumer_scl and not consumer_scl.strip().startswith("//"):
# Si no es IF y no es solo comentario, es el core
core_scl = consumer_scl.strip()
if core_scl:
grouped_instructions_cores.append(core_scl)
consumer_instr_list.append(consumer_instr) # Guardar referencia
# else: # Debug
# print(f"DEBUG Group: Consumidor {consumer_uid} ({consumer_type}) habilitado por {instr_uid} no tenía SCL core extraíble. SCL: '{consumer_scl}'")
# Si encontramos más de un consumidor agrupable
if len(grouped_instructions_cores) > 1:
print(
f"INFO: Agrupando {len(grouped_instructions_cores)} instrucciones bajo condición de {instr_type_original} UID {instr_uid} (Cond: {condition_scl})"
)
# Construir el bloque IF agrupado
scl_grouped = [f"IF {condition_scl} THEN"]
for core_line in grouped_instructions_cores:
# Añadir indentación adecuada (2 espacios)
indented_core = "\n".join(
[f" {line.strip()}" for line in core_line.splitlines()]
)
scl_grouped.append(indented_core)
scl_grouped.append("END_IF;")
final_grouped_scl = "\n".join(scl_grouped)
# Sobrescribir 'scl' de la instrucción generadora de condición
instruction["scl"] = final_grouped_scl
# Marcar los consumidores como agrupados y limpiar su SCL original
for consumer_instr in consumer_instr_list:
consumer_instr["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})"
consumer_instr["grouped"] = True # Marcar como agrupado
made_change = True
# else: # Debug
# if len(grouped_instructions_cores) == 1:
# print(f"DEBUG Group: Solo 1 consumidor ({consumer_instr_list[0]['instruction_uid']}) para {instr_uid}. No se agrupa.")
# elif len(grouped_instructions_cores) == 0 and condition_scl not in ['TRUE', 'FALSE']:
# # Solo mostrar si la condición no era trivial
# pass # print(f"DEBUG Group: Ningún consumidor agrupable encontrado para {instr_uid} (Cond: {condition_scl}).")
return made_change
def load_processors(processors_dir="processors"):
"""
Escanea el directorio, importa módulos, construye el mapa y una lista
ordenada por prioridad.
"""
processor_map = {}
processor_list_unsorted = [] # Lista para guardar (priority, type_name, func)
default_priority = 10 # Prioridad si no se define en get_processor_info
if not os.path.isdir(processors_dir):
print(f"Error: Directorio de procesadores no encontrado: '{processors_dir}'")
return processor_map, [] # Devuelve mapa vacío y lista vacía
print(f"Cargando procesadores desde: '{processors_dir}'")
processors_package = os.path.basename(processors_dir)
for filename in os.listdir(processors_dir):
if filename.startswith("process_") and filename.endswith(".py"):
module_name_rel = filename[:-3]
full_module_name = f"{processors_package}.{module_name_rel}"
try:
module = importlib.import_module(full_module_name)
if hasattr(module, 'get_processor_info') and callable(module.get_processor_info):
processor_info = module.get_processor_info()
info_list = []
if isinstance(processor_info, dict):
info_list = [processor_info]
elif isinstance(processor_info, list):
info_list = processor_info
else:
print(f" Advertencia: get_processor_info en {full_module_name} devolvió tipo inesperado. Se ignora.")
continue
for info in info_list:
if isinstance(info, dict) and 'type_name' in info and 'processor_func' in info:
type_name = info['type_name'].lower()
processor_func = info['processor_func']
# Obtener prioridad, usar default si no existe
priority = info.get('priority', default_priority)
if callable(processor_func):
if type_name in processor_map:
print(f" Advertencia: '{type_name}' en {full_module_name} sobrescribe definición anterior.")
processor_map[type_name] = processor_func
# Añadir a la lista para ordenar
processor_list_unsorted.append({'priority': priority, 'type_name': type_name, 'func': processor_func})
print(f" - Cargado '{type_name}' (Prio: {priority}) desde {module_name_rel}.py")
else:
print(f" Advertencia: 'processor_func' para '{type_name}' en {full_module_name} no es callable.")
else:
print(f" Advertencia: Entrada inválida en {full_module_name}: {info}")
else:
print(f" Advertencia: Módulo {module_name_rel}.py no tiene 'get_processor_info'.")
except ImportError as e:
print(f"Error importando {full_module_name}: {e}")
except Exception as e:
print(f"Error procesando {full_module_name}: {e}")
traceback.print_exc()
# Ordenar la lista por prioridad (menor primero)
processor_list_sorted = sorted(processor_list_unsorted, key=lambda x: x['priority'])
print(f"\nTotal de tipos de procesadores cargados: {len(processor_map)}")
print(f"Orden de procesamiento por prioridad: {[item['type_name'] for item in processor_list_sorted]}")
# Devolver el mapa (para lookup rápido si es necesario) y la lista ordenada
return processor_map, processor_list_sorted
# --- Bucle Principal de Procesamiento (Modificado) ---
def process_json_to_scl(json_filepath):
"""
Lee el JSON simplificado, aplica los procesadores dinámicamente cargados
siguiendo un orden de prioridad, y guarda el JSON procesado.
"""
global data # Necesario si process_group_ifs (definido fuera) accede a data globalmente.
# Si process_group_ifs está definida DENTRO de process_json_to_scl,
# no necesitarías global, ya que accedería a la 'data' local.
# Lo más limpio es definir process_group_ifs fuera y pasarle 'data'
# como argumento (como ya se hace). Así que 'global data' aquí es probablemente innecesario.
# Eliminémoslo por ahora y aseguremos que data se pasa a process_group_ifs.
if not os.path.exists(json_filepath):
print(f"Error: JSON no encontrado: {json_filepath}")
return
print(f"Cargando JSON desde: {json_filepath}")
try:
# Cargar datos en una variable local de esta función
with open(json_filepath, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
print(f"Error al cargar JSON: {e}")
traceback.print_exc()
return
# --- Carga dinámica de procesadores (Obtiene mapa y lista ordenada) ---
script_dir = os.path.dirname(__file__)
processors_dir_path = os.path.join(script_dir, 'processors')
processor_map, sorted_processors = load_processors(processors_dir_path)
if not processor_map: # O verificar sorted_processors
print("Error crítico: No se cargaron procesadores. Abortando.")
return
# --- Crear mapas de acceso por red ---
network_access_maps = {}
for network in data.get("networks", []):
net_id = network["id"]
current_access_map = {}
# Extraer todos los 'Access' usados en esta red
for instr in network.get("logic", []):
# Revisar Inputs
for _, source in instr.get("inputs", {}).items():
sources_to_check = (source if isinstance(source, list) else ([source] if isinstance(source, dict) else []))
for src in sources_to_check:
if (isinstance(src, dict) and src.get("uid") and src.get("type") in ["variable", "constant"]):
current_access_map[src["uid"]] = src
# Revisar Outputs
for _, dest_list in instr.get("outputs", {}).items():
if isinstance(dest_list, list):
for dest in dest_list:
if (isinstance(dest, dict) and dest.get("uid") and dest.get("type") in ["variable", "constant"]):
current_access_map[dest["uid"]] = dest
network_access_maps[net_id] = current_access_map
# --- Inicializar mapa SCL y bucle ---
scl_map = {} # Mapa para resultados SCL intermedios
max_passes = 30
passes = 0
processing_complete = False
print("\n--- Iniciando Bucle de Procesamiento Iterativo (con prioridad) ---")
while passes < max_passes and not processing_complete:
passes += 1
made_change_in_base_pass = False
made_change_in_group_pass = False
print(f"\n--- Pase {passes} ---")
num_processed_this_pass = 0
num_grouped_this_pass = 0
# --- FASE 1: Procesadores Base (Itera según la lista ordenada por prioridad) ---
print(f" Fase 1 (Base - Orden por Prioridad):")
for processor_info in sorted_processors: # Iterar sobre la lista ordenada por prioridad
current_type_name = processor_info['type_name']
func_to_call = processor_info['func']
# Descomentar para depuración muy detallada:
# print(f" Intentando procesar tipo: {current_type_name} (Prio: {processor_info['priority']})")
# Buscar instrucciones de este tipo en todas las redes
for network in data.get("networks", []):
network_id = network["id"]
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
instr_uid = instruction.get("instruction_uid")
instr_type_original = instruction.get("type", "Unknown")
# Saltar si ya está procesado, es un error o ya fue agrupado por otro
if (instr_type_original.endswith(SCL_SUFFIX)
or "_error" in instr_type_original
or instruction.get("grouped", False)):
continue
# Determinar el tipo efectivo de la instrucción para la comparación
# (Manejo especial para 'Call')
lookup_key = instr_type_original.lower()
effective_type_name = lookup_key
if instr_type_original == "Call":
block_type = instruction.get("block_type", "").upper()
if block_type == "FC": effective_type_name = "call_fc"
elif block_type == "FB": effective_type_name = "call_fb"
# Nota: Si es un tipo de bloque desconocido, effective_type_name será 'call'
# Si el tipo efectivo de la instrucción coincide con el tipo que estamos procesando en este ciclo...
if effective_type_name == current_type_name:
try:
# Llamar a la función del procesador, pasando 'data'
changed = func_to_call(instruction, network_id, scl_map, access_map, data) # Pasar data
if changed:
made_change_in_base_pass = True
num_processed_this_pass += 1
# La función llamada debe añadir _scl o _error al tipo de la instrucción
# Descomentar para depuración:
# print(f" Procesado: {instr_type_original} UID {instr_uid}")
except Exception as e:
print(f"ERROR(Base) al procesar {instr_type_original} UID {instr_uid} con {func_to_call.__name__}: {e}")
traceback.print_exc()
# Marcar como error para no reintentar
instruction["scl"] = f"// ERROR en procesador base: {e}"
instruction["type"] = instr_type_original + "_error"
made_change_in_base_pass = True # Considerar error como cambio
# --- FASE 2: Procesador de Agrupación (Se ejecuta después de toda la Fase 1) ---
# Ejecutar solo si hubo cambios en la fase base (o en el primer pase)
if made_change_in_base_pass or passes == 1:
print(f" Fase 2 (Agrupación IF):")
for network in data.get("networks", []):
network_id = network["id"]
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
# Iterar sobre instrucciones ya procesadas que podrían generar condiciones
for instruction in network_logic:
# Intentar agrupar solo si está procesado (_scl) y no previamente agrupado
if instruction["type"].endswith("_scl") and not instruction.get("grouped", False):
try:
# Llamar a la función de agrupación, pasando 'data'
group_changed = process_group_ifs(instruction, network_id, scl_map, access_map, data)
if group_changed:
made_change_in_group_pass = True
num_grouped_this_pass += 1
except Exception as e:
print(f"ERROR(Group) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}")
traceback.print_exc()
# No marcamos la instrucción origen como error, solo falló la agrupación
# --- Comprobar si se completó el procesamiento ---
if not made_change_in_base_pass and not made_change_in_group_pass:
print(f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---")
processing_complete = True
else:
print(f"--- Fin Pase {passes}: {num_processed_this_pass} procesados, {num_grouped_this_pass} agrupados. Continuando...")
# --- Comprobar límite de pases ---
if passes == max_passes and not processing_complete:
print(f"\n--- ADVERTENCIA: Límite de {max_passes} pases alcanzado. Puede haber dependencias no resueltas. ---")
# --- FIN BUCLE ITERATIVO ---
# --- Verificación Final de Instrucciones No Procesadas ---
print("\n--- Verificación Final de Instrucciones No Procesadas ---")
unprocessed_count = 0
unprocessed_details = []
ignored_types = ['raw_scl_chunk', 'unsupported_lang'] # Tipos que esperamos no procesar
for network in data.get("networks", []):
network_id = network.get("id", "Unknown ID")
network_title = network.get("title", f"Network {network_id}")
for instruction in network.get("logic", []):
instr_uid = instruction.get("instruction_uid", "Unknown UID")
instr_type = instruction.get("type", "Unknown Type")
is_grouped = instruction.get("grouped", False)
# Comprobar si NO está procesada, NO es error, NO está agrupada Y NO es un tipo ignorado
if (not instr_type.endswith(SCL_SUFFIX) and
"_error" not in instr_type and
not is_grouped and
instr_type.lower() not in ignored_types):
unprocessed_count += 1
unprocessed_details.append(
f" - Red '{network_title}' (ID: {network_id}), "
f"Instrucción UID: {instr_uid}, Tipo Original: '{instr_type}'"
)
if unprocessed_count > 0:
print(f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones que no fueron procesadas (y no son tipos ignorados):")
if unprocessed_details:
for detail in unprocessed_details:
print(detail)
print(">>> Estos tipos podrían necesitar un procesador en el directorio 'processors' o tener dependencias irresolubles.")
# else: # No debería pasar si unprocessed_count > 0 y la lógica es correcta
# print("...")
else:
print("INFO: Todas las instrucciones relevantes fueron procesadas a SCL, marcadas como error o agrupadas exitosamente.")
# --- Guardar JSON Final ---
output_filename = json_filepath.replace("_simplified.json", "_simplified_processed.json")
print(f"\nGuardando JSON procesado en: {output_filename}")
try:
with open(output_filename, "w", encoding="utf-8") as f:
# Usamos la 'data' local que hemos estado modificando
json.dump(data, f, indent=4, ensure_ascii=False)
print("Guardado completado.")
except Exception as e:
print(f"Error Crítico al guardar JSON procesado: {e}")
traceback.print_exc()
# --- Ejecución (igual que antes) ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process simplified JSON to embed SCL logic.")
parser.add_argument(
"source_xml_filepath",
nargs="?",
default="TestLAD.xml",
help="Path to the original source XML file (used to derive JSON input name, default: TestLAD.xml)"
)
args = parser.parse_args()
xml_filename_base = os.path.splitext(os.path.basename(args.source_xml_filepath))[0]
# Usar directorio del script actual si el XML no tiene ruta, o la ruta del XML si la tiene
xml_dir = os.path.dirname(args.source_xml_filepath)
input_dir = xml_dir if xml_dir else os.path.dirname(__file__) # Directorio de entrada/salida
input_json_file = os.path.join(input_dir, f"{xml_filename_base}_simplified.json")
if not os.path.exists(input_json_file):
print(f"Error Fatal: El archivo de entrada JSON simplificado no existe: '{input_json_file}'")
print(f"Asegúrate de haber ejecutado 'x1_to_json.py' primero sobre '{args.source_xml_filepath}'.")
sys.exit(1)
else:
process_json_to_scl(input_json_file)