Simatic_XML_Parser_to_SCL/x2_process.py

643 lines
26 KiB
Python

# -*- coding: utf-8 -*-
import json
import argparse
import os
import copy
import traceback
import re
import importlib
import sys
import sympy # Import sympy
# Import necessary components from processors directory
from processors.processor_utils import (
format_variable_name, # Keep if used outside processors
sympy_expr_to_scl, # Needed for IF grouping and maybe others
# get_target_scl_name might be used here? Unlikely.
)
from processors.symbol_manager import SymbolManager # Import the manager
# --- Constantes y Configuración ---
SCL_SUFFIX = "_sympy_processed"
GROUPED_COMMENT = "// Logic included in grouped IF"
SIMPLIFIED_IF_COMMENT = "// Simplified IF condition by script"
# Global data dictionary
data = {}
# --- (process_group_ifs y load_processors SIN CAMBIOS) ---
def process_group_ifs(instruction, network_id, sympy_map, symbol_manager, data):
"""
Busca condiciones (ya procesadas -> tienen expr SymPy en sympy_map)
y, si habilitan un grupo (>1) de bloques funcionales (con SCL ya generado),
construye el bloque IF agrupado CON LA CONDICIÓN SIMPLIFICADA.
Modifica el campo 'scl' de la instrucción generadora de condición.
(Esta es la implementación de la función como la tenías en el archivo original)
"""
instr_uid = instruction["instruction_uid"]
instr_type_original = (
instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "")
)
made_change = False
# Check if this instruction *could* generate a condition suitable for grouping
# It must have been processed by the new SymPy method
if (
not instruction.get("type", "").endswith(
SCL_SUFFIX
) # Check if processed by new method
or "_error" in instruction.get("type", "")
or instruction.get("grouped", False)
or instr_type_original
not in [ # Original types that produce boolean results
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"PBox",
"NBox",
"And",
"Xor",
"Not", # Add others like comparison
]
):
return False
# Avoid reagruping if SCL already contains a complex IF (less likely now)
current_scl = instruction.get("scl", "")
if (
current_scl.strip().startswith("IF")
and "END_IF;" in current_scl
and GROUPED_COMMENT not in current_scl
):
return False
# *** Get the SymPy expression for the condition ***
map_key_out = (network_id, instr_uid, "out")
sympy_condition_expr = sympy_map.get(map_key_out)
# No SymPy expression found or trivial conditions
if sympy_condition_expr is None or sympy_condition_expr in [
sympy.true,
sympy.false,
]:
return False
# --- Find consumer instructions (logic similar to before) ---
grouped_instructions_cores = []
consumer_instr_list = []
network_logic = next(
(net["logic"] for net in data["networks"] if net["id"] == network_id), []
)
if not network_logic:
return False
groupable_types = [ # Types whose *final SCL* we want to group
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call_FC",
"Call_FB", # Assuming these generate final SCL in their processors now
# SCoil/RCoil might also be groupable if their SCL is final assignment
"SCoil",
"RCoil",
"BLKMOV", # Added BLKMOV
"TON",
"TOF",
"TP",
"Se",
"Sd", # Added timers
"CTU",
"CTD",
"CTUD", # Added counters
]
for consumer_instr in network_logic:
consumer_uid = consumer_instr["instruction_uid"]
if consumer_instr.get("grouped", False) or consumer_uid == instr_uid:
continue
consumer_en = consumer_instr.get("inputs", {}).get("en")
consumer_type = consumer_instr.get("type", "") # Current type suffix matters
consumer_type_original = consumer_type.replace(SCL_SUFFIX, "").replace(
"_error", ""
)
is_enabled_by_us = False
if (
isinstance(consumer_en, dict)
and consumer_en.get("type") == "connection"
and consumer_en.get("source_instruction_uid") == instr_uid
and consumer_en.get("source_pin") == "out"
):
is_enabled_by_us = True
# Check if consumer is groupable AND has its final SCL generated
if (
is_enabled_by_us
and consumer_type.endswith(SCL_SUFFIX) # Check if processed
and consumer_type_original in groupable_types
):
consumer_scl = consumer_instr.get("scl", "")
# Extract core SCL
core_scl = None
if consumer_scl:
# If consumer SCL itself is an IF generated by EN, take the body
if consumer_scl.strip().startswith("IF"):
match = re.search(
r"IF\s+.*?THEN\s*(.*?)\s*END_IF;", # More robust regex
consumer_scl,
re.DOTALL | re.IGNORECASE,
)
core_scl = match.group(1).strip() if match else None
# If body contains another IF, maybe don't group? (optional complexity)
# if core_scl and core_scl.strip().startswith("IF"): core_scl = None
elif not consumer_scl.strip().startswith(
"//"
): # Otherwise, take the whole line if not comment
core_scl = consumer_scl.strip()
if core_scl:
grouped_instructions_cores.append(core_scl)
consumer_instr_list.append(consumer_instr)
# --- If groupable consumers found ---
if len(grouped_instructions_cores) > 1:
print(
f"INFO: Agrupando {len(grouped_instructions_cores)} instr. bajo condición de {instr_type_original} UID {instr_uid}"
)
# *** Simplify the SymPy condition ***
try:
# simplified_expr = sympy.simplify_logic(sympy_condition_expr, force=True)
simplified_expr = sympy.logic.boolalg.to_dnf(
sympy_condition_expr, simplify=True
)
except Exception as e:
print(f"Error simplifying condition for grouping UID {instr_uid}: {e}")
simplified_expr = sympy_condition_expr # Fallback
# *** Convert simplified condition to SCL string ***
condition_scl_simplified = sympy_expr_to_scl(simplified_expr, symbol_manager)
# *** Build the grouped IF SCL ***
scl_grouped_lines = [f"IF {condition_scl_simplified} THEN"]
for core_line in grouped_instructions_cores:
indented_core = "\n".join(
[f" {line.strip()}" for line in core_line.splitlines()]
)
scl_grouped_lines.append(indented_core)
scl_grouped_lines.append("END_IF;")
final_grouped_scl = "\n".join(scl_grouped_lines)
# Update the generator instruction's SCL
instruction["scl"] = final_grouped_scl
# Mark consumers as grouped
for consumer_instr in consumer_instr_list:
consumer_instr["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})"
consumer_instr["grouped"] = True
made_change = True
return made_change
def load_processors(processors_dir="processors"):
"""
Escanea el directorio, importa módulos, construye el mapa y una lista
ordenada por prioridad.
"""
processor_map = {}
processor_list_unsorted = [] # Lista para guardar (priority, type_name, func)
default_priority = 10 # Prioridad si no se define en get_processor_info
if not os.path.isdir(processors_dir):
print(f"Error: Directorio de procesadores no encontrado: '{processors_dir}'")
return processor_map, [] # Devuelve mapa vacío y lista vacía
print(f"Cargando procesadores desde: '{processors_dir}'")
processors_package = os.path.basename(processors_dir)
for filename in os.listdir(processors_dir):
if filename.startswith("process_") and filename.endswith(".py"):
module_name_rel = filename[:-3]
full_module_name = f"{processors_package}.{module_name_rel}"
try:
module = importlib.import_module(full_module_name)
if hasattr(module, "get_processor_info") and callable(
module.get_processor_info
):
processor_info = module.get_processor_info()
info_list = []
if isinstance(processor_info, dict):
info_list = [processor_info]
elif isinstance(processor_info, list):
info_list = processor_info
else:
print(
f" Advertencia: get_processor_info en {full_module_name} devolvió tipo inesperado. Se ignora."
)
continue
for info in info_list:
if (
isinstance(info, dict)
and "type_name" in info
and "processor_func" in info
):
type_name = info["type_name"].lower()
processor_func = info["processor_func"]
# Obtener prioridad, usar default si no existe
priority = info.get("priority", default_priority)
if callable(processor_func):
if type_name in processor_map:
print(
f" Advertencia: '{type_name}' en {full_module_name} sobrescribe definición anterior."
)
processor_map[type_name] = processor_func
# Añadir a la lista para ordenar
processor_list_unsorted.append(
{
"priority": priority,
"type_name": type_name,
"func": processor_func,
}
)
print(
f" - Cargado '{type_name}' (Prio: {priority}) desde {module_name_rel}.py"
)
else:
print(
f" Advertencia: 'processor_func' para '{type_name}' en {full_module_name} no es callable."
)
else:
print(
f" Advertencia: Entrada inválida en {full_module_name}: {info}"
)
else:
print(
f" Advertencia: Módulo {module_name_rel}.py no tiene 'get_processor_info'."
)
except ImportError as e:
print(f"Error importando {full_module_name}: {e}")
except Exception as e:
print(f"Error procesando {full_module_name}: {e}")
traceback.print_exc()
# Ordenar la lista por prioridad (menor primero)
processor_list_sorted = sorted(processor_list_unsorted, key=lambda x: x["priority"])
print(f"\nTotal de tipos de procesadores cargados: {len(processor_map)}")
print(
f"Orden de procesamiento por prioridad: {[item['type_name'] for item in processor_list_sorted]}"
)
# Devolver el mapa (para lookup rápido si es necesario) y la lista ordenada
return processor_map, processor_list_sorted
# --- Bucle Principal de Procesamiento (MODIFICADO) ---
def process_json_to_scl(json_filepath, output_json_filepath):
"""
Lee JSON simplificado, aplica procesadores dinámicos (ignorando STL, UDT, TagTable, DB),
y guarda JSON procesado en la ruta especificada.
"""
global data
if not os.path.exists(json_filepath):
print(f"Error: JSON no encontrado: {json_filepath}")
return False
print(f"Cargando JSON desde: {json_filepath}")
try:
with open(json_filepath, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
print(f"Error al cargar JSON: {e}")
traceback.print_exc()
return False
# --- MODIFICADO: Obtener tipo de bloque (FC, FB, GlobalDB, OB, PlcUDT, PlcTagTable) ---
block_type = data.get("block_type", "Unknown")
print(f"Procesando bloque tipo: {block_type}")
# --- MODIFICADO: SALTAR PROCESAMIENTO PARA DB, UDT, TAG TABLE ---
if block_type in [
"GlobalDB",
"PlcUDT",
"PlcTagTable",
]: # <-- Comprobar tipos a saltar
print(f"INFO: El bloque es {block_type}. Saltando procesamiento lógico de x2.")
print(
f"Guardando JSON de {block_type} (sin cambios lógicos) en: {output_json_filepath}"
)
try:
with open(output_json_filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
print(f"Guardado de {block_type} completado.")
return True
except Exception as e:
print(f"Error Crítico al guardar JSON de {block_type}: {e}")
traceback.print_exc()
return False
# --- SI NO ES DB/UDT/TAG TABLE (FC, FB, OB), CONTINUAR CON EL PROCESAMIENTO LÓGICO ---
print(f"INFO: El bloque es {block_type}. Iniciando procesamiento lógico...")
# (Carga de procesadores y mapas de acceso SIN CAMBIOS)
script_dir = os.path.dirname(__file__)
processors_dir_path = os.path.join(script_dir, "processors")
processor_map, sorted_processors = load_processors(processors_dir_path)
if not processor_map:
print("Error crítico: No se cargaron procesadores. Abortando.")
return False
network_access_maps = {}
for network in data.get("networks", []):
net_id = network["id"]
current_access_map = {}
for instr in network.get("logic", []):
for _, source in instr.get("inputs", {}).items():
sources_to_check = (
source
if isinstance(source, list)
else ([source] if isinstance(source, dict) else [])
)
for src in sources_to_check:
if (
isinstance(src, dict)
and src.get("uid")
and src.get("type") in ["variable", "constant"]
):
current_access_map[src["uid"]] = src
for _, dest_list in instr.get("outputs", {}).items():
if isinstance(dest_list, list):
for dest in dest_list:
if (
isinstance(dest, dict)
and dest.get("uid")
and dest.get("type") in ["variable", "constant"]
):
current_access_map[dest["uid"]] = dest
network_access_maps[net_id] = current_access_map
# (Inicialización de SymbolManager y bucle iterativo SIN CAMBIOS)
symbol_manager = SymbolManager()
sympy_map = {}
max_passes = 30
passes = 0
processing_complete = False
print(f"\n--- Iniciando Bucle de Procesamiento Iterativo ({block_type}) ---")
while passes < max_passes and not processing_complete:
passes += 1
made_change_in_base_pass = False
made_change_in_group_pass = False
print(f"\n--- Pase {passes} ---")
num_sympy_processed_this_pass = 0
num_grouped_this_pass = 0
# FASE 1: Procesadores Base (Ignorando STL)
print(f" Fase 1 (SymPy Base - Orden por Prioridad):")
num_sympy_processed_this_pass = 0
for processor_info in sorted_processors:
current_type_name = processor_info["type_name"]
func_to_call = processor_info["func"]
for network in data.get("networks", []):
network_id = network["id"]
network_lang = network.get("language", "LAD")
if network_lang == "STL":
continue
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
instr_uid = instruction.get("instruction_uid")
instr_type_current = instruction.get("type", "Unknown")
if (
instr_type_current.endswith(SCL_SUFFIX)
or "_error" in instr_type_current
or instruction.get("grouped", False)
or instr_type_current
in [
"RAW_STL_CHUNK",
"RAW_SCL_CHUNK",
"UNSUPPORTED_LANG",
"UNSUPPORTED_CONTENT",
"PARSING_ERROR",
]
):
continue
lookup_key = instr_type_current.lower()
effective_type_name = lookup_key
if instr_type_current == "Call":
call_block_type = instruction.get("block_type", "").upper()
if call_block_type == "FC":
effective_type_name = "call_fc"
elif call_block_type == "FB":
effective_type_name = "call_fb"
if effective_type_name == current_type_name:
try:
changed = func_to_call(
instruction, network_id, sympy_map, symbol_manager, data
)
if changed:
made_change_in_base_pass = True
num_sympy_processed_this_pass += 1
except Exception as e:
print(
f"ERROR(SymPy Base) al procesar {instr_type_current} UID {instr_uid}: {e}"
)
traceback.print_exc()
instruction["scl"] = (
f"// ERROR en SymPy procesador base: {e}"
)
instruction["type"] = instr_type_current + "_error"
made_change_in_base_pass = True
print(
f" -> {num_sympy_processed_this_pass} instrucciones (no STL) procesadas con SymPy."
)
# FASE 2: Agrupación IF (Ignorando STL)
if made_change_in_base_pass or passes == 1:
print(f" Fase 2 (Agrupación IF con Simplificación):")
num_grouped_this_pass = 0
for network in data.get("networks", []):
network_id = network["id"]
network_lang = network.get("language", "LAD")
if network_lang == "STL":
continue
network_logic = network.get("logic", [])
uids_in_network = sorted(
[
instr.get("instruction_uid", "Z")
for instr in network_logic
if instr.get("instruction_uid")
]
)
for uid_to_process in uids_in_network:
instruction = next(
(
instr
for instr in network_logic
if instr.get("instruction_uid") == uid_to_process
),
None,
)
if not instruction:
continue
if instruction.get("grouped") or "_error" in instruction.get(
"type", ""
):
continue
if instruction.get("type", "").endswith(SCL_SUFFIX):
try:
group_changed = process_group_ifs(
instruction, network_id, sympy_map, symbol_manager, data
)
if group_changed:
made_change_in_group_pass = True
num_grouped_this_pass += 1
except Exception as e:
print(
f"ERROR(GroupLoop) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}"
)
traceback.print_exc()
print(
f" -> {num_grouped_this_pass} agrupaciones realizadas (en redes no STL)."
)
# Comprobar si se completó
if not made_change_in_base_pass and not made_change_in_group_pass:
print(
f"\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---"
)
processing_complete = True
else:
print(
f"--- Fin Pase {passes}: {num_sympy_processed_this_pass} proc SymPy, {num_grouped_this_pass} agrup. Continuando..."
)
if passes == max_passes and not processing_complete:
print(f"\n--- ADVERTENCIA: Límite de {max_passes} pases alcanzado...")
# --- FIN BUCLE ITERATIVO ---
# (Verificación Final y Guardado JSON SIN CAMBIOS)
print(f"\n--- Verificación Final de Instrucciones No Procesadas ({block_type}) ---")
unprocessed_count = 0
unprocessed_details = []
ignored_types = [
"raw_scl_chunk",
"unsupported_lang",
"raw_stl_chunk",
"unsupported_content",
"parsing_error",
]
for network in data.get("networks", []):
network_id = network.get("id", "Unknown ID")
network_title = network.get("title", f"Network {network_id}")
network_lang = network.get("language", "LAD")
if network_lang == "STL":
continue
for instruction in network.get("logic", []):
instr_uid = instruction.get("instruction_uid", "Unknown UID")
instr_type = instruction.get("type", "Unknown Type")
is_grouped = instruction.get("grouped", False)
if (
not instr_type.endswith(SCL_SUFFIX)
and "_error" not in instr_type
and not is_grouped
and instr_type.lower() not in ignored_types
):
unprocessed_count += 1
unprocessed_details.append(
f" - Red '{network_title}' (ID: {network_id}, Lang: {network_lang}), Instrucción UID: {instr_uid}, Tipo: '{instr_type}'"
)
if unprocessed_count > 0:
print(
f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones (no STL) que parecen no haber sido procesadas:"
)
for detail in unprocessed_details:
print(detail)
else:
print(
"INFO: Todas las instrucciones relevantes (no STL) parecen haber sido procesadas o agrupadas."
)
print(f"\nGuardando JSON procesado ({block_type}) en: {output_json_filepath}")
try:
with open(output_json_filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
print("Guardado completado.")
return True
except Exception as e:
print(f"Error Crítico al guardar JSON procesado: {e}")
traceback.print_exc()
return False
# --- Ejecución (MODIFICADO) ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Process simplified JSON to embed SCL logic. Expects original XML filepath as argument."
)
parser.add_argument(
"source_xml_filepath",
help="Path to the original source XML file (passed from x0_main.py).",
)
args = parser.parse_args()
source_xml_file = args.source_xml_filepath
if not os.path.exists(source_xml_file):
print(
f"Advertencia (x2): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON correspondiente."
)
xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0]
base_dir = os.path.dirname(source_xml_file)
parsing_dir = os.path.join(base_dir, "parsing")
input_json_file = os.path.join(parsing_dir, f"{xml_filename_base}.json")
output_json_file = os.path.join(parsing_dir, f"{xml_filename_base}_processed.json")
os.makedirs(parsing_dir, exist_ok=True)
print(
f"(x2) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'"
)
if not os.path.exists(input_json_file):
print(
f"Error Fatal (x2): El archivo de entrada JSON no existe: '{input_json_file}'"
)
print(
f"Asegúrate de que 'x1_to_json.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'."
)
sys.exit(1)
else:
try:
success = process_json_to_scl(input_json_file, output_json_file)
if success:
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
print(
f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}"
)
traceback.print_exc()
sys.exit(1)