Simatic_XML_Parser_to_SCL/x2_process.py

929 lines
35 KiB
Python

# -*- coding: utf-8 -*-
import json
import os
import copy
import traceback
import re
# --- Constantes y Configuración ---
SCL_SUFFIX = "_scl"
GROUPED_COMMENT = "// Logic included in grouped IF"
# Global data variable
data = {}
# --- Helper Functions ---
# (get_scl_representation, generate_temp_var_name, get_target_scl_name - sin cambios)
def get_scl_representation(source_info, network_id, scl_map, access_map):
if not source_info:
return None
if isinstance(source_info, list):
scl_parts = []
all_resolved = True
for sub_source in source_info:
sub_scl = get_scl_representation(
sub_source, network_id, scl_map, access_map
)
if sub_scl is None:
all_resolved = False
break
if (
sub_scl in ["TRUE", "FALSE"]
or (sub_scl.startswith('"') and sub_scl.endswith('"'))
or sub_scl.isdigit()
or (sub_scl.startswith("(") and sub_scl.endswith(")"))
):
scl_parts.append(sub_scl)
else:
scl_parts.append(f"({sub_scl})")
return (
" OR ".join(scl_parts)
if len(scl_parts) > 1
else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None
)
source_type = source_info.get("type")
if source_type == "powerrail":
return "TRUE"
elif source_type == "variable":
name = source_info.get("name")
return name if name else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_"
elif source_type == "constant":
dtype = str(source_info.get("datatype", "")).upper()
value = source_info.get("value")
try:
if dtype == "BOOL":
return str(value).upper()
elif dtype in [
"INT",
"DINT",
"SINT",
"USINT",
"UINT",
"UDINT",
"LINT",
"ULINT",
"WORD",
"DWORD",
"LWORD",
"BYTE",
]:
return str(value)
elif dtype in ["REAL", "LREAL"]:
s_val = str(value)
return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0"
elif dtype == "STRING":
return f"'{str(value)}'"
elif dtype == "TYPEDCONSTANT":
return str(value)
else:
return f"'{str(value)}'"
except Exception as e:
print(f"Advertencia: Error formateando constante {source_info}: {e}")
return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_"
elif source_type == "connection":
map_key = (
network_id,
source_info.get("source_instruction_uid"),
source_info.get("source_pin"),
)
return scl_map.get(map_key)
elif source_type == "unknown_source":
print(
f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}"
)
return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_"
else:
print(f"Advertencia: Tipo de fuente desconocido o inválido: {source_info}")
return f"_ERR_INVALID_SRC_TYPE_"
def generate_temp_var_name(network_id, instr_uid, pin_name):
net_id_clean = str(network_id).replace("-", "_")
instr_uid_clean = str(instr_uid).replace("-", "_")
pin_name_clean = str(pin_name).replace("-", "_").lower()
prefix = "_" if str(net_id_clean)[0].isdigit() else ""
return f"{prefix}temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}"
def get_target_scl_name(instruction, output_pin_name, network_id, default_to_temp=True):
instr_uid = instruction["instruction_uid"]
output_pin_data = instruction["outputs"].get(output_pin_name)
target_scl = None
if (
output_pin_data
and isinstance(output_pin_data, list)
and len(output_pin_data) == 1
):
dest_access = output_pin_data[0]
if dest_access.get("type") == "variable":
target_scl = dest_access.get("name")
if not target_scl:
print(
f"Error: Variable destino para {instr_uid}.{output_pin_name} sin nombre (UID: {dest_access.get('uid')}). {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
elif dest_access.get("type") == "constant":
print(
f"Advertencia: Instr {instr_uid} intenta escribir en constante UID {dest_access.get('uid')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
else:
print(
f"Advertencia: Destino de {instr_uid}.{output_pin_name} no es var/const: {dest_access.get('type')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
elif default_to_temp:
target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
return target_scl
# --- Procesadores de Instrucciones (MODIFICADOS para Agrupación) ---
def check_if_grouped(instruction, network_id):
"""Verifica si la entrada EN de esta instrucción viene de una fuente
que alimenta a más de un bloque funcional en la misma red."""
en_input = instruction.get("inputs", {}).get("en")
if not isinstance(en_input, dict) or en_input.get("type") != "connection":
return False, None # No está conectado a otra instrucción o no tiene EN
source_uid = en_input.get("source_instruction_uid")
source_pin = en_input.get("source_pin")
if not source_uid or not source_pin:
return False, None # Información de fuente inválida
consumer_count = 0
network_logic = next(
(net["logic"] for net in data.get("networks", []) if net["id"] == network_id),
[],
)
for instr in network_logic:
other_en = instr.get("inputs", {}).get("en")
if (
isinstance(other_en, dict)
and other_en.get("type") == "connection"
and other_en.get("source_instruction_uid") == source_uid
and other_en.get("source_pin") == source_pin
):
# Contar solo bloques funcionales como consumidores
instr_type_orig = (
instr.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "")
)
if instr_type_orig in [
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
]: # Ampliar si es necesario
consumer_count += 1
is_grouped = consumer_count > 1
# print(f"DEBUG: Check group for {instruction['instruction_uid']}: source={source_uid}.{source_pin}, consumers={consumer_count}, is_grouped={is_grouped}")
return is_grouped, (source_uid, source_pin)
def process_move(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
en_scl = (
"TRUE"
if en_input is None
else get_scl_representation(en_input, network_id, scl_map, access_map)
)
in_scl = get_scl_representation(
instruction["inputs"].get("in"), network_id, scl_map, access_map
)
if en_scl is None or in_scl is None:
return False
target_scl = get_target_scl_name(
instruction, "out1", network_id, default_to_temp=False
)
if target_scl is None:
print(f"Advertencia: MOVE UID: {instr_uid} no tiene destino claro.")
return False
scl_core = f"{target_scl} := {in_scl};"
# --- Lógica de Agrupación ---
is_grouped, _ = check_if_grouped(instruction, network_id)
if is_grouped:
scl_final = scl_core # Solo el core si es parte de grupo
instruction["grouped"] = True # Marcar como agrupado
elif en_scl != "TRUE":
scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
else:
scl_final = scl_core
# --- Fin Lógica de Agrupación ---
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_add(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
en_scl = (
"TRUE"
if en_input is None
else get_scl_representation(en_input, network_id, scl_map, access_map)
)
in1_scl = get_scl_representation(
instruction["inputs"].get("in1"), network_id, scl_map, access_map
)
in2_scl = get_scl_representation(
instruction["inputs"].get("in2"), network_id, scl_map, access_map
)
if en_scl is None or in1_scl is None or in2_scl is None:
return False
target_scl = get_target_scl_name(
instruction, "out", network_id, default_to_temp=True
)
if target_scl is None:
print(f"Error Interno: No se pudo det. destino ADD {instr_uid}")
instruction["type"] += "_error"
return True
op1 = f"({in1_scl})" if " " in in1_scl else in1_scl
op2 = f"({in2_scl})" if " " in in2_scl else in2_scl
scl_core = f"{target_scl} := {op1} + {op2};"
is_grouped, _ = check_if_grouped(instruction, network_id)
if is_grouped:
scl_final = scl_core
instruction["grouped"] = True
elif en_scl != "TRUE":
scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
else:
scl_final = scl_core
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = en_scl
return True
# --- (Aplica lógica similar de 'check_if_grouped' a process_convert, process_mod, etc.) ---
# Ejemplo para process_convert:
def process_convert(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
en_scl = (
"TRUE"
if en_input is None
else get_scl_representation(en_input, network_id, scl_map, access_map)
)
in_scl = get_scl_representation(
instruction["inputs"].get("in"), network_id, scl_map, access_map
)
if en_scl is None or in_scl is None:
return False
target_scl = get_target_scl_name(
instruction, "out", network_id, default_to_temp=True
)
if target_scl is None:
print(f"Error Interno: No se pudo det. destino CONVERT {instr_uid}")
instruction["type"] += "_error"
return True
conversion_expr = in_scl
scl_core = f"{target_scl} := {conversion_expr};"
is_grouped, _ = check_if_grouped(instruction, network_id)
if is_grouped:
scl_final = scl_core
instruction["grouped"] = True
elif en_scl != "TRUE":
scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
else:
scl_final = scl_core
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = en_scl
return True
# Ejemplo para process_mod:
def process_mod(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
en_scl = (
"TRUE"
if en_input is None
else get_scl_representation(en_input, network_id, scl_map, access_map)
)
in1_scl = get_scl_representation(
instruction["inputs"].get("in1"), network_id, scl_map, access_map
)
in2_scl = get_scl_representation(
instruction["inputs"].get("in2"), network_id, scl_map, access_map
)
if en_scl is None or in1_scl is None or in2_scl is None:
return False
target_scl = get_target_scl_name(
instruction, "out", network_id, default_to_temp=True
)
if target_scl is None:
print(f"Error Interno: No se pudo det. destino MOD {instr_uid}")
instruction["type"] += "_error"
return True
op1 = f"({in1_scl})" if " " in in1_scl else in1_scl
op2 = f"({in2_scl})" if " " in in2_scl else in2_scl
scl_core = f"{target_scl} := {op1} MOD {op2};"
is_grouped, _ = check_if_grouped(instruction, network_id)
if is_grouped:
scl_final = scl_core
instruction["grouped"] = True
elif en_scl != "TRUE":
scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;"
else:
scl_final = scl_core
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = en_scl
return True
# --- (process_contact, process_eq, process_coil, process_o, process_pbox - sin cambios respecto a la versión anterior) ---
# ... (Asegúrate de tener las versiones funcionales de estos aquí) ...
def process_contact(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
is_negated = False
in_input = instruction["inputs"].get("in")
in_rlo_scl = (
"TRUE"
if in_input is None
else get_scl_representation(in_input, network_id, scl_map, access_map)
)
operand_scl = get_scl_representation(
instruction["inputs"].get("operand"), network_id, scl_map, access_map
)
if in_rlo_scl is None or operand_scl is None:
return False
term = f"NOT {operand_scl}" if is_negated else operand_scl
if not (term.startswith('"') and term.endswith('"')):
if " " in term and not (term.startswith("(") and term.endswith(")")):
term = f"({term})"
new_rlo_scl = ""
if in_rlo_scl == "TRUE":
new_rlo_scl = term
else:
if ("AND" in in_rlo_scl or "OR" in in_rlo_scl) and not (
in_rlo_scl.startswith("(") and in_rlo_scl.endswith(")")
):
in_rlo_processed = f"({in_rlo_scl})"
else:
in_rlo_processed = in_rlo_scl
new_rlo_scl = f"{in_rlo_processed} AND {term}"
map_key = (network_id, instr_uid, "out")
scl_map[map_key] = new_rlo_scl
instruction["scl"] = f"// RLO updated by Contact {instr_uid}: {new_rlo_scl}"
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_eq(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
in1_scl = get_scl_representation(
instruction["inputs"].get("in1"), network_id, scl_map, access_map
)
in2_scl = get_scl_representation(
instruction["inputs"].get("in2"), network_id, scl_map, access_map
)
if in1_scl is None or in2_scl is None:
return False
op1 = f"({in1_scl})" if " " in in1_scl else in1_scl
op2 = f"({in2_scl})" if " " in in2_scl else in2_scl
comparison_scl = f"{op1} = {op2}"
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = comparison_scl
pre_input = instruction["inputs"].get("pre")
pre_scl = (
"TRUE"
if pre_input is None
else get_scl_representation(pre_input, network_id, scl_map, access_map)
)
if pre_scl is None:
return False
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = pre_scl
instruction["scl"] = f"// Comparison Eq {instr_uid}: {comparison_scl}"
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_coil(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
in_rlo_scl = get_scl_representation(
instruction["inputs"].get("in"), network_id, scl_map, access_map
)
operand_info = instruction["inputs"].get("operand")
operand_scl = get_scl_representation(operand_info, network_id, scl_map, access_map)
if in_rlo_scl is None or operand_scl is None:
return False
if not (operand_info and operand_info.get("type") == "variable"):
print(f"Error: Operando COIL {instr_uid} no es var")
instruction["scl"] = f"// ERROR: Coil {instr_uid} operando no es variable"
instruction["type"] = instr_type + "_error"
return True
if in_rlo_scl == "(TRUE)":
in_rlo_scl = "TRUE"
elif in_rlo_scl == "(FALSE)":
in_rlo_scl = "FALSE"
scl_final = f"{operand_scl} := {in_rlo_scl};"
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_pbox(instruction, network_id, scl_map, access_map, network_logic_list):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
mem_bit_input = instruction["inputs"].get("bit")
mem_bit_scl = get_scl_representation(mem_bit_input, network_id, scl_map, access_map)
if mem_bit_scl is None:
return False
if not (mem_bit_input and mem_bit_input.get("type") == "variable"):
print(f"Error: PBOX {instr_uid} bit no es var")
instruction["scl"] = f"// ERROR: PBox {instr_uid} bit no es var"
instruction["type"] += "_error"
return True
is_likely_p_trig = False
consuming_coil_uid = None
for potential_consumer in network_logic_list:
coil_input_signal = potential_consumer.get("inputs", {}).get("in")
if (
isinstance(coil_input_signal, dict)
and coil_input_signal.get("type") == "connection"
and coil_input_signal.get("source_instruction_uid") == instr_uid
and coil_input_signal.get("source_pin") == "out"
):
if (
potential_consumer.get("type", "")
.replace("_scl", "")
.replace("_error", "")
== "Coil"
):
is_likely_p_trig = True
break
rlo_scl = None
if is_likely_p_trig:
clk_source_found = False
current_instr_index = -1
for i, instr in enumerate(network_logic_list):
if instr["instruction_uid"] == instr_uid:
current_instr_index = i
break
if current_instr_index != -1:
for i in range(current_instr_index - 1, -1, -1):
prev_instr = network_logic_list[i]
prev_instr_uid = prev_instr["instruction_uid"]
prev_instr_type = (
prev_instr.get("type", "")
.replace(SCL_SUFFIX, "")
.replace("_error", "")
)
if prev_instr_type in [
"Contact",
"Eq",
"O",
"PBox",
"And",
"Xor",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
]:
map_key_prev_out = (network_id, prev_instr_uid, "out")
potential_clk_scl = scl_map.get(map_key_prev_out)
if potential_clk_scl is not None:
rlo_scl = potential_clk_scl
clk_source_found = True
break
elif prev_instr_type in ["Move", "Add", "Convert", "Mod"]:
map_key_prev_eno = (network_id, prev_instr_uid, "eno")
potential_clk_scl = scl_map.get(map_key_prev_eno)
if potential_clk_scl is not None:
rlo_scl = potential_clk_scl
clk_source_found = True
break
if not clk_source_found:
print(f"Error: No se pudo inferir CLK para PBOX {instr_uid}")
instruction["scl"] = f"// ERROR: PBox {instr_uid} sin CLK"
instruction["type"] += "_error"
return True
if rlo_scl is None:
return False
scl_comment = ""
if is_likely_p_trig:
clk_signal_formatted = f"({rlo_scl})" if " " in rlo_scl else rlo_scl
result_scl = f"P_TRIG_FUNC(CLK := {clk_signal_formatted}, M := {mem_bit_scl})"
scl_comment = (
f"// Edge detection PBox {instr_uid} -> {result_scl} (CLK inferred)"
)
else:
print(f"Advertencia: PBox {instr_uid} no como P_TRIG. Pasando bit.")
result_scl = mem_bit_scl
scl_comment = f"// PBox {instr_uid} - Passing bit: {result_scl}"
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = result_scl
instruction["scl"] = scl_comment
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_o(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
input_pins = [pin for pin in instruction["inputs"] if pin.startswith("in")]
if not input_pins:
print(f"Error: O {instr_uid} sin pines inX")
instruction["scl"] = f"// ERROR: O {instr_uid} sin pines in"
instruction["type"] += "_error"
return True
scl_parts = []
all_resolved = True
for pin in sorted(input_pins):
in_scl = get_scl_representation(
instruction["inputs"][pin], network_id, scl_map, access_map
)
if in_scl is None:
all_resolved = False
break
term = (
f"({in_scl})"
if (" " in in_scl or "AND" in in_scl)
and not (in_scl.startswith("(") and in_scl.endswith(")"))
else in_scl
)
scl_parts.append(term)
if not all_resolved:
return False
result_scl = (
" OR ".join(scl_parts)
if len(scl_parts) > 1
else (scl_parts[0] if scl_parts else "FALSE")
)
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = result_scl
instruction["scl"] = f"// Logic O {instr_uid}: {result_scl}"
instruction["type"] = instr_type + SCL_SUFFIX
return True
# --- NUEVO: Procesador de Agrupación ---
def process_group_ifs(instruction, network_id, scl_map, access_map):
"""
Busca instrucciones que generan condiciones (Contact, O, Eq, PBox)
y, si habilitan un grupo de bloques funcionales, construye el bloque IF agrupado.
Modifica el campo 'scl' de la instrucción generadora de condición.
"""
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
instr_type_original = instr_type.replace("_scl", "").replace("_error", "")
made_change = False
# Solo actuar sobre generadores de condición ya procesados
if not instr_type.endswith("_scl") or instr_type_original not in [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"PBox",
]:
return False # No es un generador de condición procesado relevante
# Si ya contiene un IF agrupado (de un pase anterior o error), no hacer nada
if instruction.get("scl", "").strip().startswith("IF"):
return False
# Obtener la condición generada por esta instrucción
map_key_out = (network_id, instr_uid, "out")
condition_scl = scl_map.get(map_key_out)
if condition_scl is None or condition_scl == "TRUE" or condition_scl == "FALSE":
return False # No agrupar para condiciones triviales
# Encontrar todos los bloques funcionales habilitados DIRECTAMENTE por esta condición
grouped_instructions_core_scl = []
consumer_uids_processed_in_group = set() # Para marcar consumidores agrupados
network_logic = next(
(net["logic"] for net in data["networks"] if net["id"] == network_id), []
)
if not network_logic:
return False
for consumer_instr in network_logic:
consumer_uid = consumer_instr["instruction_uid"]
# Saltar si el consumidor ya fue marcado como agrupado por otra condición
if consumer_instr.get("grouped", False):
continue
consumer_en = consumer_instr.get("inputs", {}).get("en")
consumer_type = consumer_instr.get("type", "")
consumer_type_original = consumer_type.replace("_scl", "").replace("_error", "")
# Verificar si está conectado a nuestra salida 'out'
if (
isinstance(consumer_en, dict)
and consumer_en.get("type") == "connection"
and consumer_en.get("source_instruction_uid") == instr_uid
and consumer_en.get("source_pin") == "out"
):
# Verificar si es un bloque funcional procesado y si su SCL es solo el 'core'
if (
consumer_type.endswith("_scl")
and consumer_type_original
in ["Move", "Add", "Sub", "Mul", "Div", "Mod", "Convert"]
and consumer_instr.get("scl")
and not consumer_instr["scl"].strip().startswith("IF")
): # Asegurarse de que NO tenga IF
core_scl = consumer_instr["scl"].strip()
grouped_instructions_core_scl.append(core_scl)
consumer_uids_processed_in_group.add(consumer_uid)
# Si encontramos más de un consumidor agrupado
if len(grouped_instructions_core_scl) > 1:
print(
f"INFO: Agrupando {len(grouped_instructions_core_scl)} instrucciones para condición de {instr_type_original} UID {instr_uid}"
)
# Construir el bloque IF agrupado
scl_grouped = [f"IF {condition_scl} THEN"]
for core_line in grouped_instructions_core_scl:
scl_grouped.append(f" {core_line}") # Añadir indentación
scl_grouped.append("END_IF;")
final_grouped_scl = "\n".join(scl_grouped)
# Sobrescribir el campo 'scl' de la instrucción generadora de condición
instruction["scl"] = final_grouped_scl
# Marcar los consumidores para que generate_scl los ignore
for consumer_uid_to_mark in consumer_uids_processed_in_group:
for instr_to_mark in network_logic:
if instr_to_mark["instruction_uid"] == consumer_uid_to_mark:
# Añadir comentario y flag
instr_to_mark["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})"
instr_to_mark["grouped"] = True
break
made_change = True
return made_change
# --- Bucle Principal de Procesamiento ---
def process_json_to_scl(json_filepath):
"""Lee el JSON, aplica los procesadores iterativamente y guarda el resultado."""
if not os.path.exists(json_filepath):
print(f"Error: Archivo JSON no encontrado en {json_filepath}")
return
print(f"Cargando JSON desde: {json_filepath}")
try:
with open(json_filepath, "r", encoding="utf-8") as f:
global data
data = json.load(f)
except Exception as e:
print(f"Error al cargar o parsear JSON: {e}")
return
# Reconstruir access_map dinámicamente
network_access_maps = {}
# print("Creando mapas de acceso por red...")
for network in data.get("networks", []):
net_id = network["id"]
current_access_map = {}
for instr in network.get("logic", []):
for _, source in instr.get("inputs", {}).items():
sources_to_check = (
source
if isinstance(source, list)
else ([source] if isinstance(source, dict) else [])
)
for src in sources_to_check:
if (
isinstance(src, dict)
and src.get("uid")
and src.get("scope")
and src.get("type") in ["variable", "constant"]
):
current_access_map[src["uid"]] = src
for _, dest_list in instr.get("outputs", {}).items():
if isinstance(dest_list, list):
for dest in dest_list:
if (
isinstance(dest, dict)
and dest.get("uid")
and dest.get("scope")
and dest.get("type") in ["variable", "constant"]
):
current_access_map[dest["uid"]] = dest
network_access_maps[net_id] = current_access_map
scl_map = {}
max_passes = 25
passes = 0
# Lista de procesadores base + procesador de agrupación
base_processors = [
process_convert,
process_mod,
process_eq,
process_contact,
process_o,
process_pbox,
process_add,
process_move,
process_coil,
]
processor_map = {
func.__name__.split("_")[1].lower(): func for func in base_processors
}
print("\n--- Iniciando Bucle de Procesamiento Iterativo ---")
while passes < max_passes:
passes += 1
made_change_in_base_pass = False
made_change_in_group_pass = False
print(f"\n--- Pase {passes} ---")
# --- FASE 1: Procesadores Base ---
print(f"DEBUG: Iniciando Fase 1 (Procesadores Base) - Pase {passes}")
for network in data.get("networks", []):
network_id = network["id"]
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
instr_type_original = instruction["type"]
if (
instr_type_original.endswith(SCL_SUFFIX)
or "_error" in instr_type_original
or instruction.get("grouped", False)
):
continue # Saltar ya procesados, erróneos o agrupados
instr_type_lower_lookup = instr_type_original.lower()
func_to_call = processor_map.get(instr_type_lower_lookup)
if func_to_call:
try:
changed = False
if func_to_call == process_pbox:
changed = func_to_call(
instruction,
network_id,
scl_map,
access_map,
network_logic,
)
else:
changed = func_to_call(
instruction, network_id, scl_map, access_map
)
if changed:
# print(f"DEBUG: Cambio BASE detectado por {func_to_call.__name__} en UID {instruction['instruction_uid']}")
made_change_in_base_pass = True
except Exception as e:
print(
f"ERROR(Base) al ejecutar {func_to_call.__name__} en UID {instruction.get('instruction_uid')} Red {network_id}: {e}"
)
traceback.print_exc()
instruction["scl"] = f"// ERROR during base processing: {e}"
instruction["type"] += "_error"
made_change_in_base_pass = True # Considerar error como cambio
# --- FASE 2: Procesador de Agrupación ---
print(f"DEBUG: Iniciando Fase 2 (Agrupación IF) - Pase {passes}")
for network in data.get("networks", []):
network_id = network["id"]
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
# Solo intentar agrupar en generadores de condición ya procesados
if instruction["type"].endswith("_scl") and not instruction.get(
"grouped", False
):
try:
group_changed = process_group_ifs(
instruction, network_id, scl_map, access_map
)
if group_changed:
# print(f"DEBUG: Cambio GROUP detectado por process_group_ifs en UID {instruction['instruction_uid']}")
made_change_in_group_pass = True
except Exception as e:
print(
f"ERROR(Group) al ejecutar process_group_ifs en UID {instruction.get('instruction_uid')}: {e}"
)
traceback.print_exc()
# No marcar la instrucción como error por fallo en agrupación
# Decidir si continuar: Hubo algún cambio en CUALQUIERA de las fases?
if not made_change_in_base_pass and not made_change_in_group_pass:
print(
f"\n--- No se hicieron cambios en el pase {passes}. Proceso completado. ---"
)
break
else:
print(
f"DEBUG: Cambios en Pase {passes}: Base={made_change_in_base_pass}, Grupo={made_change_in_group_pass}. Continuando..."
)
if passes == max_passes:
print(
f"\n--- Límite de {max_passes} pases alcanzado. Puede haber dependencias circulares o lógica no procesada. ---"
)
# --- Guardar JSON Final ---
output_filename = json_filepath.replace(".json", "_scl_processed.json")
print(f"\nGuardando JSON procesado en: {output_filename}")
try:
with open(output_filename, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
print("Guardado completado.")
except Exception as e:
print(f"Error al guardar el JSON procesado: {e}")
# --- Ejecución ---
if __name__ == "__main__":
input_json_file = "BlenderRun_ProdTime_simplified.json"
process_json_to_scl(input_json_file)