Simatic_XML_Parser_to_SCL/x2_process.py

859 lines
32 KiB
Python

# -*- coding: utf-8 -*-
import json
import os
import copy
import traceback
import re
# --- Constantes y Configuración ---
SCL_SUFFIX = "_scl"
GROUPED_COMMENT = "// Logic included in grouped IF"
# Global data variable
data = {}
# --- Helper Functions ---
# (get_scl_representation, generate_temp_var_name, get_target_scl_name - sin cambios)
def get_scl_representation(source_info, network_id, scl_map, access_map):
if not source_info:
return None
if isinstance(source_info, list):
scl_parts = []
all_resolved = True
for sub_source in source_info:
sub_scl = get_scl_representation(
sub_source, network_id, scl_map, access_map
)
if sub_scl is None:
all_resolved = False
break
if (
sub_scl in ["TRUE", "FALSE"]
or (sub_scl.startswith('"') and sub_scl.endswith('"'))
or sub_scl.isdigit()
or (sub_scl.startswith("(") and sub_scl.endswith(")"))
):
scl_parts.append(sub_scl)
else:
scl_parts.append(f"({sub_scl})")
return (
" OR ".join(scl_parts)
if len(scl_parts) > 1
else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None
)
source_type = source_info.get("type")
if source_type == "powerrail":
return "TRUE"
elif source_type == "variable":
name = source_info.get("name")
return name if name else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_"
elif source_type == "constant":
dtype = str(source_info.get("datatype", "")).upper()
value = source_info.get("value")
try:
if dtype == "BOOL":
return str(value).upper()
elif dtype in [
"INT",
"DINT",
"SINT",
"USINT",
"UINT",
"UDINT",
"LINT",
"ULINT",
"WORD",
"DWORD",
"LWORD",
"BYTE",
]:
return str(value)
elif dtype in ["REAL", "LREAL"]:
s_val = str(value)
return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0"
elif dtype == "STRING":
return f"'{str(value)}'"
elif dtype == "TYPEDCONSTANT":
return str(value)
else:
return f"'{str(value)}'"
except Exception as e:
print(f"Advertencia: Error formateando constante {source_info}: {e}")
return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_"
elif source_type == "connection":
map_key = (
network_id,
source_info.get("source_instruction_uid"),
source_info.get("source_pin"),
)
return scl_map.get(map_key)
elif source_type == "unknown_source":
print(
f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}"
)
return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_"
else:
print(f"Advertencia: Tipo de fuente desconocido: {source_info}")
return f"_ERR_INVALID_SRC_TYPE_"
def generate_temp_var_name(network_id, instr_uid, pin_name):
net_id_clean = str(network_id).replace("-", "_")
instr_uid_clean = str(instr_uid).replace("-", "_")
pin_name_clean = str(pin_name).replace("-", "_").lower()
prefix = "_" if str(net_id_clean)[0].isdigit() else ""
return f"{prefix}temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}"
def get_target_scl_name(instruction, output_pin_name, network_id, default_to_temp=True):
instr_uid = instruction["instruction_uid"]
output_pin_data = instruction["outputs"].get(output_pin_name)
target_scl = None
if (
output_pin_data
and isinstance(output_pin_data, list)
and len(output_pin_data) == 1
):
dest_access = output_pin_data[0]
if dest_access.get("type") == "variable":
target_scl = dest_access.get("name")
if not target_scl:
print(
f"Error: Var destino {instr_uid}.{output_pin_name} sin nombre (UID: {dest_access.get('uid')}). {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
elif dest_access.get("type") == "constant":
print(
f"Advertencia: Instr {instr_uid} escribe en const UID {dest_access.get('uid')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
else:
print(
f"Advertencia: Destino {instr_uid}.{output_pin_name} no es var/const: {dest_access.get('type')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"
)
target_scl = (
generate_temp_var_name(network_id, instr_uid, output_pin_name)
if default_to_temp
else None
)
elif default_to_temp:
target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
return target_scl
# --- Procesadores de Instrucciones (SIMPLIFICADOS - Siempre generan IF si EN no es TRUE) ---
def process_contact(instruction, network_id, scl_map, access_map):
# (Sin cambios respecto a la versión funcional anterior, solo actualiza scl_map)
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
is_negated = False # TODO
in_input = instruction["inputs"].get("in")
in_rlo_scl = (
"TRUE"
if in_input is None
else get_scl_representation(in_input, network_id, scl_map, access_map)
)
operand_scl = get_scl_representation(
instruction["inputs"].get("operand"), network_id, scl_map, access_map
)
if in_rlo_scl is None or operand_scl is None:
return False
term = f"NOT {operand_scl}" if is_negated else operand_scl
if not (term.startswith('"') and term.endswith('"')):
if " " in term and not (term.startswith("(") and term.endswith(")")):
term = f"({term})"
new_rlo_scl = (
term
if in_rlo_scl == "TRUE"
else (
f"({in_rlo_scl}) AND {term}"
if ("AND" in in_rlo_scl or "OR" in in_rlo_scl)
and not (in_rlo_scl.startswith("(") and in_rlo_scl.endswith(")"))
else f"{in_rlo_scl} AND {term}"
)
)
map_key = (network_id, instr_uid, "out")
scl_map[map_key] = new_rlo_scl
instruction["scl"] = f"// RLO: {new_rlo_scl}" # SCL es solo comentario aquí
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_eq(instruction, network_id, scl_map, access_map):
# (Sin cambios respecto a la versión funcional anterior, solo actualiza scl_map)
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
in1_scl = get_scl_representation(
instruction["inputs"].get("in1"), network_id, scl_map, access_map
)
in2_scl = get_scl_representation(
instruction["inputs"].get("in2"), network_id, scl_map, access_map
)
if in1_scl is None or in2_scl is None:
return False
op1 = f"({in1_scl})" if " " in in1_scl else in1_scl
op2 = f"({in2_scl})" if " " in in2_scl else in2_scl
comparison_scl = f"{op1} = {op2}"
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = comparison_scl
pre_input = instruction["inputs"].get("pre")
pre_scl = (
"TRUE"
if pre_input is None
else get_scl_representation(pre_input, network_id, scl_map, access_map)
)
if pre_scl is None:
return False
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = pre_scl
instruction["scl"] = f"// Cond: {comparison_scl}"
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_coil(instruction, network_id, scl_map, access_map):
# (Sin cambios respecto a la versión funcional anterior)
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
in_rlo_scl = get_scl_representation(
instruction["inputs"].get("in"), network_id, scl_map, access_map
)
operand_info = instruction["inputs"].get("operand")
operand_scl = get_scl_representation(operand_info, network_id, scl_map, access_map)
if in_rlo_scl is None or operand_scl is None:
return False
if not (operand_info and operand_info.get("type") == "variable"):
print(f"Error: Operando COIL {instr_uid} no es var")
instruction["scl"] = f"// ERROR: Coil {instr_uid} operando no es variable"
instruction["type"] = instr_type + "_error"
return True
if in_rlo_scl == "(TRUE)":
in_rlo_scl = "TRUE"
elif in_rlo_scl == "(FALSE)":
in_rlo_scl = "FALSE"
scl_final = f"{operand_scl} := {in_rlo_scl};"
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_convert(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
# Ahora EN debe existir si es necesario, no asumimos TRUE si falta
en_scl = (
get_scl_representation(en_input, network_id, scl_map, access_map)
if en_input
else "TRUE"
) # Asumir TRUE solo si no hay 'en' key
in_scl = get_scl_representation(
instruction["inputs"].get("in"), network_id, scl_map, access_map
)
if en_scl is None or in_scl is None:
return False # Esperar si dependencias no listas
target_scl = get_target_scl_name(
instruction, "out", network_id, default_to_temp=True
)
if target_scl is None:
print(f"Error: Sin destino CONVERT {instr_uid}")
instruction["type"] += "_error"
return True
conversion_expr = in_scl # TODO: Logica de conversion explicita
scl_core = f"{target_scl} := {conversion_expr};"
scl_final = (
f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
)
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = en_scl
return True
def process_mod(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
en_scl = (
get_scl_representation(en_input, network_id, scl_map, access_map)
if en_input
else "TRUE"
)
in1_scl = get_scl_representation(
instruction["inputs"].get("in1"), network_id, scl_map, access_map
)
in2_scl = get_scl_representation(
instruction["inputs"].get("in2"), network_id, scl_map, access_map
)
if en_scl is None or in1_scl is None or in2_scl is None:
return False
target_scl = get_target_scl_name(
instruction, "out", network_id, default_to_temp=True
)
if target_scl is None:
print(f"Error: Sin destino MOD {instr_uid}")
instruction["type"] += "_error"
return True
op1 = f"({in1_scl})" if " " in in1_scl else in1_scl
op2 = f"({in2_scl})" if " " in in2_scl else in2_scl
scl_core = f"{target_scl} := {op1} MOD {op2};"
scl_final = (
f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
)
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = en_scl
return True
def process_add(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
en_scl = (
get_scl_representation(en_input, network_id, scl_map, access_map)
if en_input
else "TRUE"
)
in1_scl = get_scl_representation(
instruction["inputs"].get("in1"), network_id, scl_map, access_map
)
in2_scl = get_scl_representation(
instruction["inputs"].get("in2"), network_id, scl_map, access_map
)
if en_scl is None or in1_scl is None or in2_scl is None:
return False
target_scl = get_target_scl_name(
instruction, "out", network_id, default_to_temp=True
)
if target_scl is None:
print(f"Error: Sin destino ADD {instr_uid}")
instruction["type"] += "_error"
return True
op1 = f"({in1_scl})" if " " in in1_scl else in1_scl
op2 = f"({in2_scl})" if " " in in2_scl else in2_scl
scl_core = f"{target_scl} := {op1} + {op2};"
scl_final = (
f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
)
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = en_scl
return True
def process_move(instruction, network_id, scl_map, access_map):
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
en_input = instruction["inputs"].get("en")
en_scl = (
get_scl_representation(en_input, network_id, scl_map, access_map)
if en_input
else "TRUE"
)
in_scl = get_scl_representation(
instruction["inputs"].get("in"), network_id, scl_map, access_map
)
if en_scl is None or in_scl is None:
return False
target_scl = get_target_scl_name(
instruction, "out1", network_id, default_to_temp=False
) # No usar temp por defecto
if target_scl is None:
print(f"Advertencia: MOVE {instr_uid} sin destino claro.")
return False # No procesar
scl_core = f"{target_scl} := {in_scl};"
scl_final = (
f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
)
instruction["scl"] = scl_final
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_pbox(instruction, network_id, scl_map, access_map, network_logic_list):
# (Sin cambios respecto a la versión funcional anterior)
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
mem_bit_input = instruction["inputs"].get("bit")
mem_bit_scl = get_scl_representation(mem_bit_input, network_id, scl_map, access_map)
if mem_bit_scl is None:
return False
if not (mem_bit_input and mem_bit_input.get("type") == "variable"):
print(f"Error: PBOX {instr_uid} bit no es var")
instruction["scl"] = f"// ERROR: PBox {instr_uid} bit no es var"
instruction["type"] += "_error"
return True
is_likely_p_trig = False
consuming_coil_uid = None
for potential_consumer in network_logic_list:
coil_input_signal = potential_consumer.get("inputs", {}).get("in")
if (
isinstance(coil_input_signal, dict)
and coil_input_signal.get("type") == "connection"
and coil_input_signal.get("source_instruction_uid") == instr_uid
and coil_input_signal.get("source_pin") == "out"
):
if (
potential_consumer.get("type", "")
.replace("_scl", "")
.replace("_error", "")
== "Coil"
):
is_likely_p_trig = True
break
rlo_scl = None
if is_likely_p_trig:
clk_source_found = False
current_instr_index = -1
for i, instr in enumerate(network_logic_list):
if instr["instruction_uid"] == instr_uid:
current_instr_index = i
break
if current_instr_index != -1:
for i in range(current_instr_index - 1, -1, -1):
prev_instr = network_logic_list[i]
prev_instr_uid = prev_instr["instruction_uid"]
prev_instr_type = (
prev_instr.get("type", "")
.replace(SCL_SUFFIX, "")
.replace("_error", "")
)
if prev_instr_type in [
"Contact",
"Eq",
"O",
"PBox",
"And",
"Xor",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
]:
map_key_prev_out = (network_id, prev_instr_uid, "out")
potential_clk_scl = scl_map.get(map_key_prev_out)
if potential_clk_scl is not None:
rlo_scl = potential_clk_scl
clk_source_found = True
break
elif prev_instr_type in ["Move", "Add", "Convert", "Mod"]:
map_key_prev_eno = (network_id, prev_instr_uid, "eno")
potential_clk_scl = scl_map.get(map_key_prev_eno)
if potential_clk_scl is not None:
rlo_scl = potential_clk_scl
clk_source_found = True
break
if not clk_source_found:
print(f"Error: No se pudo inferir CLK PBOX {instr_uid}")
instruction["scl"] = f"// ERROR: PBox {instr_uid} sin CLK"
instruction["type"] += "_error"
return True
if rlo_scl is None:
return False
scl_comment = ""
if is_likely_p_trig:
clk_signal_formatted = f"({rlo_scl})" if " " in rlo_scl else rlo_scl
result_scl = f"P_TRIG_FUNC(CLK := {clk_signal_formatted}, M := {mem_bit_scl})"
scl_comment = f"// Edge PBox {instr_uid} -> {result_scl}"
else:
print(f"Advertencia: PBox {instr_uid} no como P_TRIG. Pasando bit.")
result_scl = mem_bit_scl
scl_comment = f"// PBox {instr_uid} - Passing bit: {result_scl}"
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = result_scl
instruction["scl"] = scl_comment
instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_o(instruction, network_id, scl_map, access_map):
# (Sin cambios respecto a la versión funcional anterior)
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type:
return False
input_pins = [pin for pin in instruction["inputs"] if pin.startswith("in")]
if not input_pins:
print(f"Error: O {instr_uid} sin pines inX")
instruction["scl"] = f"// ERROR: O {instr_uid} sin pines in"
instruction["type"] += "_error"
return True
scl_parts = []
all_resolved = True
for pin in sorted(input_pins):
in_scl = get_scl_representation(
instruction["inputs"][pin], network_id, scl_map, access_map
)
if in_scl is None:
all_resolved = False
break
term = (
f"({in_scl})"
if (" " in in_scl or "AND" in in_scl)
and not (in_scl.startswith("(") and in_scl.endswith(")"))
else in_scl
)
scl_parts.append(term)
if not all_resolved:
return False
result_scl = (
" OR ".join(scl_parts)
if len(scl_parts) > 1
else (scl_parts[0] if scl_parts else "FALSE")
)
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = result_scl
instruction["scl"] = f"// Logic O {instr_uid}: {result_scl}"
instruction["type"] = instr_type + SCL_SUFFIX
return True
# --- NUEVO: Procesador de Agrupación (Refinado) ---
def process_group_ifs(instruction, network_id, scl_map, access_map):
"""
Busca instrucciones que generan condiciones (Contact, O, Eq, PBox) ya procesadas
y, si habilitan un grupo (>1) de bloques funcionales, construye el bloque IF agrupado.
Modifica el campo 'scl' de la instrucción generadora de condición.
"""
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"]
instr_type_original = instr_type.replace("_scl", "").replace("_error", "")
made_change = False
# Solo actuar sobre generadores de condición ya procesados (_scl)
if not instr_type.endswith("_scl") or instr_type_original not in [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"PBox",
]:
return False
# Evitar reagrupar si ya se hizo
if instruction.get("scl", "").strip().startswith("IF"):
# print(f"DEBUG Group: {instr_uid} ya tiene IF, saltando agrupación.")
return False
# Obtener la condición generada por esta instrucción (debería estar en scl_map)
map_key_out = (network_id, instr_uid, "out")
condition_scl = scl_map.get(map_key_out)
# No agrupar para condiciones triviales o no encontradas
if condition_scl is None or condition_scl in ["TRUE", "FALSE"]:
return False
# Encontrar todos los bloques funcionales habilitados DIRECTAMENTE por esta condición
grouped_instructions_cores = [] # Lista de SCL 'core'
consumer_instr_list = [] # Lista de instrucciones consumidoras
network_logic = next(
(net["logic"] for net in data["networks"] if net["id"] == network_id), []
)
if not network_logic:
return False
for consumer_instr in network_logic:
consumer_uid = consumer_instr["instruction_uid"]
# Saltar si ya está agrupado por otra condición
if consumer_instr.get("grouped", False):
continue
consumer_en = consumer_instr.get("inputs", {}).get("en")
consumer_type = consumer_instr.get("type", "") # Tipo actual (_scl o no)
consumer_type_original = consumer_type.replace("_scl", "").replace("_error", "")
# ¿Está conectado a nuestra salida 'out'?
if (
isinstance(consumer_en, dict)
and consumer_en.get("type") == "connection"
and consumer_en.get("source_instruction_uid") == instr_uid
and consumer_en.get("source_pin") == "out"
):
# ¿Es un bloque funcional procesado?
if consumer_type.endswith("_scl") and consumer_type_original in [
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
]:
consumer_scl = consumer_instr.get("scl", "")
# Extraer el SCL core (la parte DENTRO del IF o la línea única si no había IF)
core_scl = None
if consumer_scl.strip().startswith("IF"):
match = re.search(r"THEN\s*(.+)\s*END_IF;", consumer_scl, re.DOTALL)
if match:
core_scl = match.group(1).strip()
elif consumer_scl and not consumer_scl.strip().startswith(
"//"
): # Si no es IF y no es solo comentario
core_scl = consumer_scl.strip()
if core_scl:
grouped_instructions_cores.append(core_scl)
consumer_instr_list.append(
consumer_instr
) # Guardar referencia a la instrucción
# else: # Comentado para reducir ruido
# print(f"DEBUG Group: Consumidor {consumer_uid} no tenía SCL core extraíble.")
# Si encontramos más de un consumidor
if len(grouped_instructions_cores) > 1:
print(
f"INFO: Agrupando {len(grouped_instructions_cores)} instrucciones bajo condición de {instr_type_original} UID {instr_uid}"
)
scl_grouped = [f"IF {condition_scl} THEN"]
for core_line in grouped_instructions_cores:
# Añadir indentación adecuada si el core tiene múltiples líneas
indented_core = "\n".join(
[f" {line.strip()}" for line in core_line.splitlines()]
)
scl_grouped.append(indented_core)
scl_grouped.append("END_IF;")
final_grouped_scl = "\n".join(scl_grouped)
# Sobrescribir 'scl' de la instrucción generadora
instruction["scl"] = final_grouped_scl
# Marcar los consumidores
for consumer_instr in consumer_instr_list:
consumer_instr["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})"
consumer_instr["grouped"] = True
made_change = True
return made_change
# --- Bucle Principal de Procesamiento ---
def process_json_to_scl(json_filepath):
"""Lee el JSON, aplica los procesadores iterativamente y guarda el resultado."""
if not os.path.exists(json_filepath):
print(f"Error: JSON no encontrado: {json_filepath}")
return
print(f"Cargando JSON desde: {json_filepath}")
try:
with open(json_filepath, "r", encoding="utf-8") as f:
global data
data = json.load(f)
except Exception as e:
print(f"Error al cargar JSON: {e}")
return
network_access_maps = {}
# print("Creando mapas de acceso por red...") # Comentado para brevedad
for network in data.get("networks", []):
net_id = network["id"]
current_access_map = {}
for instr in network.get("logic", []):
for _, source in instr.get("inputs", {}).items():
sources_to_check = (
source
if isinstance(source, list)
else ([source] if isinstance(source, dict) else [])
)
for src in sources_to_check:
if (
isinstance(src, dict)
and src.get("uid")
and src.get("scope")
and src.get("type") in ["variable", "constant"]
):
current_access_map[src["uid"]] = src
for _, dest_list in instr.get("outputs", {}).items():
if isinstance(dest_list, list):
for dest in dest_list:
if (
isinstance(dest, dict)
and dest.get("uid")
and dest.get("scope")
and dest.get("type") in ["variable", "constant"]
):
current_access_map[dest["uid"]] = dest
network_access_maps[net_id] = current_access_map
scl_map = {}
max_passes = 25
passes = 0
# Lista y mapa de procesadores base
base_processors = [
process_convert,
process_mod,
process_eq,
process_contact,
process_o,
process_pbox,
process_add,
process_move,
process_coil,
]
processor_map = {
func.__name__.split("_")[1].lower(): func for func in base_processors
}
print("\n--- Iniciando Bucle de Procesamiento Iterativo ---")
while passes < max_passes:
passes += 1
made_change_in_base_pass = False
made_change_in_group_pass = False
print(f"\n--- Pase {passes} ---")
# --- FASE 1: Procesadores Base ---
# print(f"DEBUG: Iniciando Fase 1 (Procesadores Base) - Pase {passes}") # Debug
for network in data.get("networks", []):
network_id = network["id"]
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
instr_type_original = instruction["type"]
if (
instr_type_original.endswith(SCL_SUFFIX)
or "_error" in instr_type_original
or instruction.get("grouped", False)
):
continue
instr_type_lower_lookup = instr_type_original.lower()
func_to_call = processor_map.get(instr_type_lower_lookup)
if func_to_call:
try:
changed = False
if func_to_call == process_pbox:
changed = func_to_call(
instruction,
network_id,
scl_map,
access_map,
network_logic,
)
else:
changed = func_to_call(
instruction, network_id, scl_map, access_map
)
if changed:
made_change_in_base_pass = True
except Exception as e:
print(
f"ERROR(Base) {func_to_call.__name__} UID {instruction.get('instruction_uid')}: {e}"
)
traceback.print_exc()
instruction["scl"] = f"// ERROR base: {e}"
instruction["type"] += "_error"
made_change_in_base_pass = True
# --- FASE 2: Procesador de Agrupación ---
# print(f"DEBUG: Iniciando Fase 2 (Agrupación IF) - Pase {passes}") # Debug
for network in data.get("networks", []):
network_id = network["id"]
access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
if instruction["type"].endswith("_scl") and not instruction.get(
"grouped", False
): # Solo en generadores procesados y no agrupados
try:
group_changed = process_group_ifs(
instruction, network_id, scl_map, access_map
)
if group_changed:
made_change_in_group_pass = True
except Exception as e:
print(
f"ERROR(Group) process_group_ifs UID {instruction.get('instruction_uid')}: {e}"
)
traceback.print_exc()
# Decidir si continuar
if not made_change_in_base_pass and not made_change_in_group_pass:
print(
f"\n--- No se hicieron cambios en el pase {passes}. Proceso completado. ---"
)
break
# else: print(f"DEBUG: Cambios Pase {passes}: Base={made_change_in_base_pass}, Grupo={made_change_in_group_pass}. Continuando...") # Debug
if passes == max_passes:
print(f"\n--- Límite de {max_passes} pases alcanzado. ---")
# --- Guardar JSON Final ---
output_filename = json_filepath.replace(".json", "_scl_processed.json")
print(f"\nGuardando JSON procesado en: {output_filename}")
try:
with open(output_filename, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
print("Guardado completado.")
except Exception as e:
print(f"Error al guardar JSON: {e}")
# --- Ejecución ---
if __name__ == "__main__":
input_json_file = "BlenderRun_ProdTime_simplified.json" # Asegúrate que este es el generado por x1_to_json.py MODIFICADO
process_json_to_scl(input_json_file)