Funcion basica de Pbox y Nbox

This commit is contained in:
Miguel 2025-04-19 00:45:45 +02:00
parent b2d3f0f5bf
commit 4bf7619cc1
10 changed files with 2283 additions and 1839 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -71,9 +71,7 @@ BEGIN
// RLO: (NOT "HMI_Variables_Status"."System"."Blender_Prod_CIP") // RLO: (NOT "HMI_Variables_Status"."System"."Blender_Prod_CIP")
"gBlenderCIPMode" := (NOT "HMI_Variables_Status"."System"."Blender_Prod_CIP"); "gBlenderCIPMode" := (NOT "HMI_Variables_Status"."System"."Blender_Prod_CIP");
IF (NOT "HMI_Variables_Status"."System"."Blender_Prod_CIP") THEN "HMI_Variables_Status"."Procedures"."BlenderStateNum" := 19;
"HMI_Variables_Status"."Procedures"."BlenderStateNum" := 19;
END_IF;
// Network 10: Error Faults // Network 10: Error Faults
@ -154,7 +152,7 @@ BEGIN
// Network 21: Input Data // Network 21: Input Data
// ERROR: FB Call Input sin instancia "Input_Data"();
// Network 22: Sel Brix Source Check // Network 22: Sel Brix Source Check
@ -226,7 +224,7 @@ BEGIN
// Network 34: First Production // Network 34: First Production
// ERROR: FB Call ProcedureFirstProduction sin instancia "FirstProduction_Data"();
// Network 35: CIP MAIN Calling // Network 35: CIP MAIN Calling
@ -255,7 +253,9 @@ BEGIN
// Network 41: Blend Procedure Data // Network 41: Blend Procedure Data
// RLO: (NOT "mDelayPowerOnTmr") // RLO: (NOT "mDelayPowerOnTmr")
// ERROR: FB Call Procedure sin instancia IF (NOT "mDelayPowerOnTmr") THEN
"Blender_Procedure Data"();
END_IF;
// Network 42: Pneumatic Valve Control // Network 42: Pneumatic Valve Control
@ -376,7 +376,9 @@ BEGIN
// Network 67: to HMI - Recipe Management // Network 67: to HMI - Recipe Management
// RLO: "AUX TRUE" // RLO: "AUX TRUE"
// ERROR: FB Call RecipeManagement___Prod sin instancia IF "AUX TRUE" THEN
"RecipeManagement_Data"();
END_IF;
// Network 68: Recipe Calculation // Network 68: Recipe Calculation

View File

@ -77,30 +77,44 @@
</Symbol> </Symbol>
</Access> </Access>
<Access Scope="GlobalVariable" UId="22"> <Access Scope="GlobalVariable" UId="22">
<Symbol>
<Component Name="M19001" />
</Symbol>
</Access>
<Access Scope="GlobalVariable" UId="23">
<Symbol> <Symbol>
<Component Name="Clock_5Hz" /> <Component Name="Clock_5Hz" />
</Symbol> </Symbol>
</Access> </Access>
<Part Name="Contact" UId="23" /> <Part Name="Contact" UId="24" />
<Part Name="Coil" UId="24" /> <Part Name="PBox" UId="25" />
<Part Name="Coil" UId="26" />
</Parts> </Parts>
<Wires> <Wires>
<Wire UId="25">
<Powerrail />
<NameCon UId="23" Name="in" />
</Wire>
<Wire UId="26">
<IdentCon UId="21" />
<NameCon UId="23" Name="operand" />
</Wire>
<Wire UId="27"> <Wire UId="27">
<NameCon UId="23" Name="out" /> <Powerrail />
<NameCon UId="24" Name="in" /> <NameCon UId="24" Name="in" />
</Wire> </Wire>
<Wire UId="28"> <Wire UId="28">
<IdentCon UId="22" /> <IdentCon UId="21" />
<NameCon UId="24" Name="operand" /> <NameCon UId="24" Name="operand" />
</Wire> </Wire>
<Wire UId="29">
<NameCon UId="24" Name="out" />
<NameCon UId="25" Name="in" />
</Wire>
<Wire UId="30">
<IdentCon UId="22" />
<NameCon UId="25" Name="bit" />
</Wire>
<Wire UId="31">
<NameCon UId="25" Name="out" />
<NameCon UId="26" Name="in" />
</Wire>
<Wire UId="32">
<IdentCon UId="23" />
<NameCon UId="26" Name="operand" />
</Wire>
</Wires> </Wires>
</FlgNet></NetworkSource> </FlgNet></NetworkSource>
<ProgrammingLanguage>LAD</ProgrammingLanguage> <ProgrammingLanguage>LAD</ProgrammingLanguage>
@ -210,32 +224,46 @@
</Symbol> </Symbol>
</Access> </Access>
<Access Scope="GlobalVariable" UId="22"> <Access Scope="GlobalVariable" UId="22">
<Symbol>
<Component Name="M19001" />
</Symbol>
</Access>
<Access Scope="GlobalVariable" UId="23">
<Symbol> <Symbol>
<Component Name="Clock_5Hz" /> <Component Name="Clock_5Hz" />
</Symbol> </Symbol>
</Access> </Access>
<Part Name="Contact" UId="23"> <Part Name="Contact" UId="24">
<Negated Name="operand" /> <Negated Name="operand" />
</Part> </Part>
<Part Name="Coil" UId="24" /> <Part Name="NBox" UId="25" />
<Part Name="Coil" UId="26" />
</Parts> </Parts>
<Wires> <Wires>
<Wire UId="25">
<Powerrail />
<NameCon UId="23" Name="in" />
</Wire>
<Wire UId="26">
<IdentCon UId="21" />
<NameCon UId="23" Name="operand" />
</Wire>
<Wire UId="27"> <Wire UId="27">
<NameCon UId="23" Name="out" /> <Powerrail />
<NameCon UId="24" Name="in" /> <NameCon UId="24" Name="in" />
</Wire> </Wire>
<Wire UId="28"> <Wire UId="28">
<IdentCon UId="22" /> <IdentCon UId="21" />
<NameCon UId="24" Name="operand" /> <NameCon UId="24" Name="operand" />
</Wire> </Wire>
<Wire UId="29">
<NameCon UId="24" Name="out" />
<NameCon UId="25" Name="in" />
</Wire>
<Wire UId="30">
<IdentCon UId="22" />
<NameCon UId="25" Name="bit" />
</Wire>
<Wire UId="31">
<NameCon UId="25" Name="out" />
<NameCon UId="26" Name="in" />
</Wire>
<Wire UId="32">
<IdentCon UId="23" />
<NameCon UId="26" Name="operand" />
</Wire>
</Wires> </Wires>
</FlgNet></NetworkSource> </FlgNet></NetworkSource>
<ProgrammingLanguage>LAD</ProgrammingLanguage> <ProgrammingLanguage>LAD</ProgrammingLanguage>

View File

@ -18,42 +18,64 @@
"comment": "", "comment": "",
"logic": [ "logic": [
{ {
"instruction_uid": "23", "instruction_uid": "24",
"uid": "23", "uid": "24",
"type": "Contact", "type": "Contact",
"template_values": {}, "template_values": {},
"negated_pins": {}, "negated_pins": {},
"inputs": { "inputs": {
"in": {
"type": "powerrail"
},
"operand": { "operand": {
"uid": "21", "uid": "21",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_10Hz\"" "name": "\"Clock_10Hz\""
},
"in": {
"type": "powerrail"
} }
}, },
"outputs": {} "outputs": {}
}, },
{ {
"instruction_uid": "24", "instruction_uid": "25",
"uid": "24", "uid": "25",
"type": "PBox",
"template_values": {},
"negated_pins": {},
"inputs": {
"bit": {
"uid": "22",
"scope": "GlobalVariable",
"type": "variable",
"name": "\"M19001\""
},
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "24",
"source_pin": "out"
}
},
"outputs": {}
},
{
"instruction_uid": "26",
"uid": "26",
"type": "Coil", "type": "Coil",
"template_values": {}, "template_values": {},
"negated_pins": {}, "negated_pins": {},
"inputs": { "inputs": {
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "23",
"source_pin": "out"
},
"operand": { "operand": {
"uid": "22", "uid": "23",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_5Hz\"" "name": "\"Clock_5Hz\""
},
"in": {
"type": "connection",
"source_instruction_type": "PBox",
"source_instruction_uid": "25",
"source_pin": "out"
} }
}, },
"outputs": {} "outputs": {}
@ -66,56 +88,71 @@
"comment": "", "comment": "",
"logic": [ "logic": [
{ {
"instruction_uid": "23", "instruction_uid": "24",
"uid": "23", "uid": "24",
"type": "Contact", "type": "Contact",
"template_values": {}, "template_values": {},
"negated_pins": { "negated_pins": {
"operand": true "operand": true
}, },
"inputs": { "inputs": {
"in": {
"type": "powerrail"
},
"operand": { "operand": {
"uid": "21", "uid": "21",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_10Hz\"" "name": "\"Clock_10Hz\""
},
"in": {
"type": "powerrail"
} }
}, },
"outputs": {} "outputs": {}
}, },
{ {
"instruction_uid": "24", "instruction_uid": "25",
"uid": "24", "uid": "25",
"type": "NBox",
"template_values": {},
"negated_pins": {},
"inputs": {
"bit": {
"uid": "22",
"scope": "GlobalVariable",
"type": "variable",
"name": "\"M19001\""
},
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "24",
"source_pin": "out"
}
},
"outputs": {}
},
{
"instruction_uid": "26",
"uid": "26",
"type": "Coil", "type": "Coil",
"template_values": {}, "template_values": {},
"negated_pins": {}, "negated_pins": {},
"inputs": { "inputs": {
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "23",
"source_pin": "out"
},
"operand": { "operand": {
"uid": "22", "uid": "23",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_5Hz\"" "name": "\"Clock_5Hz\""
},
"in": {
"type": "connection",
"source_instruction_type": "NBox",
"source_instruction_uid": "25",
"source_pin": "out"
} }
}, },
"outputs": {} "outputs": {}
} }
] ]
},
{
"id": "2B",
"title": "",
"comment": "",
"logic": [],
"error": "FlgNet not found"
} }
] ]
} }

View File

@ -18,47 +18,70 @@
"comment": "", "comment": "",
"logic": [ "logic": [
{ {
"instruction_uid": "23", "instruction_uid": "24",
"uid": "23", "uid": "24",
"type": "Contact_scl", "type": "Contact_scl",
"template_values": {}, "template_values": {},
"negated_pins": {}, "negated_pins": {},
"inputs": { "inputs": {
"in": {
"type": "powerrail"
},
"operand": { "operand": {
"uid": "21", "uid": "21",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_10Hz\"" "name": "\"Clock_10Hz\""
},
"in": {
"type": "powerrail"
} }
}, },
"outputs": {}, "outputs": {},
"scl": "// RLO: \"Clock_10Hz\"" "scl": "// RLO: \"Clock_10Hz\""
}, },
{ {
"instruction_uid": "24", "instruction_uid": "25",
"uid": "24", "uid": "25",
"type": "PBox_scl",
"template_values": {},
"negated_pins": {},
"inputs": {
"bit": {
"uid": "22",
"scope": "GlobalVariable",
"type": "variable",
"name": "\"M19001\""
},
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "24",
"source_pin": "out"
}
},
"outputs": {},
"scl": "\"stat_M19001\" := \"Clock_10Hz\"; // P_TRIG: \"Clock_10Hz\" AND NOT \"stat_M19001\""
},
{
"instruction_uid": "26",
"uid": "26",
"type": "Coil_scl", "type": "Coil_scl",
"template_values": {}, "template_values": {},
"negated_pins": {}, "negated_pins": {},
"inputs": { "inputs": {
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "23",
"source_pin": "out"
},
"operand": { "operand": {
"uid": "22", "uid": "23",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_5Hz\"" "name": "\"Clock_5Hz\""
},
"in": {
"type": "connection",
"source_instruction_type": "PBox",
"source_instruction_uid": "25",
"source_pin": "out"
} }
}, },
"outputs": {}, "outputs": {},
"scl": "\"Clock_5Hz\" := \"Clock_10Hz\";" "scl": "\"Clock_5Hz\" := \"Clock_10Hz\" AND NOT \"stat_M19001\";"
} }
] ]
}, },
@ -68,58 +91,74 @@
"comment": "", "comment": "",
"logic": [ "logic": [
{ {
"instruction_uid": "23", "instruction_uid": "24",
"uid": "23", "uid": "24",
"type": "Contact_scl", "type": "Contact_scl",
"template_values": {}, "template_values": {},
"negated_pins": { "negated_pins": {
"operand": true "operand": true
}, },
"inputs": { "inputs": {
"in": {
"type": "powerrail"
},
"operand": { "operand": {
"uid": "21", "uid": "21",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_10Hz\"" "name": "\"Clock_10Hz\""
},
"in": {
"type": "powerrail"
} }
}, },
"outputs": {}, "outputs": {},
"scl": "// RLO: (NOT \"Clock_10Hz\")" "scl": "// RLO: (NOT \"Clock_10Hz\")"
}, },
{ {
"instruction_uid": "24", "instruction_uid": "25",
"uid": "24", "uid": "25",
"type": "NBox_scl",
"template_values": {},
"negated_pins": {},
"inputs": {
"bit": {
"uid": "22",
"scope": "GlobalVariable",
"type": "variable",
"name": "\"M19001\""
},
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "24",
"source_pin": "out"
}
},
"outputs": {},
"scl": "\"stat_M19001\" := (NOT \"Clock_10Hz\"); // N_TRIG: NOT (NOT \"Clock_10Hz\") AND \"stat_M19001\""
},
{
"instruction_uid": "26",
"uid": "26",
"type": "Coil_scl", "type": "Coil_scl",
"template_values": {}, "template_values": {},
"negated_pins": {}, "negated_pins": {},
"inputs": { "inputs": {
"in": {
"type": "connection",
"source_instruction_type": "Contact",
"source_instruction_uid": "23",
"source_pin": "out"
},
"operand": { "operand": {
"uid": "22", "uid": "23",
"scope": "GlobalVariable", "scope": "GlobalVariable",
"type": "variable", "type": "variable",
"name": "\"Clock_5Hz\"" "name": "\"Clock_5Hz\""
},
"in": {
"type": "connection",
"source_instruction_type": "NBox",
"source_instruction_uid": "25",
"source_pin": "out"
} }
}, },
"outputs": {}, "outputs": {},
"scl": "\"Clock_5Hz\" := (NOT \"Clock_10Hz\");" "scl": "\"Clock_5Hz\" := NOT (NOT \"Clock_10Hz\") AND \"stat_M19001\";"
} }
] ]
},
{
"id": "2B",
"title": "",
"comment": "",
"logic": [],
"error": "FlgNet not found"
} }
] ]
} }

View File

@ -15,6 +15,10 @@ END_VAR
VAR_IN_OUT VAR_IN_OUT
END_VAR END_VAR
VAR_STAT
"stat_M19001" : Bool; // Memory for edge detection
END_VAR
VAR_TEMP VAR_TEMP
END_VAR END_VAR
@ -23,13 +27,13 @@ BEGIN
// Network 1: Clock Bit // Network 1: Clock Bit
// RLO: "Clock_10Hz" // RLO: "Clock_10Hz"
"Clock_5Hz" := "Clock_10Hz"; "stat_M19001" := "Clock_10Hz"; // P_TRIG: "Clock_10Hz" AND NOT "stat_M19001"
"Clock_5Hz" := "Clock_10Hz" AND NOT "stat_M19001";
// Network 2: Clock Bit // Network 2: Clock Bit
// RLO: (NOT "Clock_10Hz") // RLO: (NOT "Clock_10Hz")
"Clock_5Hz" := (NOT "Clock_10Hz"); "stat_M19001" := (NOT "Clock_10Hz"); // N_TRIG: NOT (NOT "Clock_10Hz") AND "stat_M19001"
"Clock_5Hz" := NOT (NOT "Clock_10Hz") AND "stat_M19001";
// Network 3:
END_FUNCTION_BLOCK END_FUNCTION_BLOCK

View File

@ -13,9 +13,8 @@ ns = {
# --- Helper Functions --- # --- Helper Functions ---
# (get_multilingual_text, get_symbol_name, parse_access, parse_part - sin cambios)
# ... (código de estas funciones aquí) ...
def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"): def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"):
# (Sin cambios respecto a la versión anterior)
if element is None: if element is None:
return "" return ""
try: try:
@ -42,8 +41,8 @@ def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"):
print(f"Advertencia: Error extrayendo MultilingualText: {e}") print(f"Advertencia: Error extrayendo MultilingualText: {e}")
return "" return ""
def get_symbol_name(symbol_element): def get_symbol_name(symbol_element):
# (Sin cambios respecto a la versión anterior)
if symbol_element is None: if symbol_element is None:
return None return None
try: try:
@ -53,668 +52,312 @@ def get_symbol_name(symbol_element):
print(f"Advertencia: Excepción en get_symbol_name: {e}") print(f"Advertencia: Excepción en get_symbol_name: {e}")
return None return None
def parse_access(access_element): def parse_access(access_element):
if access_element is None: # (Sin cambios respecto a la versión anterior)
return None if access_element is None: return None
uid = access_element.get("UId") uid = access_element.get("UId"); scope = access_element.get("Scope")
scope = access_element.get("Scope")
info = {"uid": uid, "scope": scope, "type": "unknown"} info = {"uid": uid, "scope": scope, "type": "unknown"}
symbol = access_element.xpath("./*[local-name()='Symbol']") symbol = access_element.xpath("./*[local-name()='Symbol']")
constant = access_element.xpath("./*[local-name()='Constant']") constant = access_element.xpath("./*[local-name()='Constant']")
if symbol: if symbol:
info["type"] = "variable" info["type"] = "variable"
info["name"] = get_symbol_name(symbol[0]) info["name"] = get_symbol_name(symbol[0])
if info["name"] is None: if info["name"] is None: info["type"] = "error_parsing_symbol"; print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}"); return info
info["type"] = "error_parsing_symbol"
print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}")
return info
elif constant: elif constant:
info["type"] = "constant" info["type"] = "constant"
const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']") const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']")
const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']") const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']")
info["datatype"] = ( info["datatype"] = const_type_elem[0].text if const_type_elem and const_type_elem[0].text is not None else "Unknown"
const_type_elem[0].text value_str = const_val_elem[0].text if const_val_elem and const_val_elem[0].text is not None else None
if const_type_elem and const_type_elem[0].text is not None if value_str is None: info["type"] = "error_parsing_constant"; info["value"] = None; print(f"Error: Constante sin valor Access UID={uid}"); return info
else "Unknown"
)
value_str = (
const_val_elem[0].text
if const_val_elem and const_val_elem[0].text is not None
else None
)
if value_str is None:
info["type"] = "error_parsing_constant"
info["value"] = None
print(f"Error: Constante sin valor Access UID={uid}")
return info
if info["datatype"] == "Unknown": if info["datatype"] == "Unknown":
val_lower = value_str.lower() val_lower = value_str.lower()
if val_lower in ["true", "false"]: if val_lower in ["true", "false"]: info["datatype"] = "Bool"
info["datatype"] = "Bool" elif value_str.isdigit() or (value_str.startswith('-') and value_str[1:].isdigit()): info["datatype"] = "Int"
elif value_str.isdigit() or ( elif '.' in value_str:
value_str.startswith("-") and value_str[1:].isdigit() try: float(value_str); info["datatype"] = "Real"
): except ValueError: pass
info["datatype"] = "Int" elif '#' in value_str: info["datatype"] = "TypedConstant"
elif "." in value_str:
try:
float(value_str)
info["datatype"] = "Real"
except ValueError:
pass
elif "#" in value_str:
info["datatype"] = "TypedConstant"
info["value"] = value_str info["value"] = value_str
dtype_lower = info["datatype"].lower() dtype_lower = info["datatype"].lower()
val_str_processed = value_str.split("#")[-1] if "#" in value_str else value_str val_str_processed = value_str.split('#')[-1] if '#' in value_str else value_str
try: try:
if dtype_lower in [ if dtype_lower in ['int', 'dint', 'udint', 'sint', 'usint', 'lint', 'ulint', 'word', 'dword', 'lword', 'byte']: info["value"] = int(val_str_processed)
"int", elif dtype_lower == 'bool': info["value"] = (val_str_processed.lower() == 'true' or val_str_processed == '1')
"dint", elif dtype_lower in ['real', 'lreal']: info["value"] = float(val_str_processed)
"udint", elif dtype_lower == 'typedconstant': info["value"] = value_str
"sint", except (ValueError, TypeError) as e: print(f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}"); info["value"] = value_str
"usint", else: info["type"] = "unknown_structure"; print(f"Advertencia: Access UID={uid} no es Symbol ni Constant."); return info
"lint", if info['type'] == 'variable' and info.get('name') is None: print(f"Error Interno: parse_access var sin nombre UID {uid}."); info['type'] = "error_no_name"; return info
"ulint",
"word",
"dword",
"lword",
"byte",
]:
info["value"] = int(val_str_processed)
elif dtype_lower == "bool":
info["value"] = (
val_str_processed.lower() == "true" or val_str_processed == "1"
)
elif dtype_lower in ["real", "lreal"]:
info["value"] = float(val_str_processed)
elif dtype_lower == "typedconstant":
info["value"] = value_str
except (ValueError, TypeError) as e:
print(
f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}"
)
info["value"] = value_str
else:
info["type"] = "unknown_structure"
print(f"Advertencia: Access UID={uid} no es Symbol ni Constant.")
return info
if info["type"] == "variable" and info.get("name") is None:
print(f"Error Interno: parse_access var sin nombre UID {uid}.")
info["type"] = "error_no_name"
return info
return info return info
def parse_part(part_element): def parse_part(part_element):
""" # (Sin cambios respecto a la versión anterior)
Parsea un elemento Part (instrucción) y extrae UID, nombre,
valores de plantilla y pines negados.
"""
if part_element is None: return None if part_element is None: return None
uid = part_element.get('UId'); name = part_element.get('Name') uid = part_element.get('UId'); name = part_element.get('Name')
if not uid or not name: print(f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}"); return None if not uid or not name: print(f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}"); return None
template_values = {} template_values = {}
try: try:
for tv in part_element.xpath("./*[local-name()='TemplateValue']"): for tv in part_element.xpath("./*[local-name()='TemplateValue']"):
tv_name = tv.get('Name'); tv_type = tv.get('Type') tv_name = tv.get('Name'); tv_type = tv.get('Type')
if tv_name and tv_type: template_values[tv_name] = tv_type if tv_name and tv_type: template_values[tv_name] = tv_type
except Exception as e: print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}") except Exception as e: print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}")
negated_pins = {}
# --- INICIO NUEVA LÓGICA PARA NEGACIÓN ---
negated_pins = {} # Diccionario para pines negados: {pin_name: True}
try: try:
for negated_elem in part_element.xpath("./*[local-name()='Negated']"): for negated_elem in part_element.xpath("./*[local-name()='Negated']"):
negated_pin_name = negated_elem.get('Name') negated_pin_name = negated_elem.get('Name')
if negated_pin_name: if negated_pin_name: negated_pins[negated_pin_name] = True
negated_pins[negated_pin_name] = True
# print(f"DEBUG: Pin negado detectado: {name} UID {uid}, Pin: {negated_pin_name}") # Debug
except Exception as e: print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}") except Exception as e: print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}")
return { return {'uid': uid, 'type': name, 'template_values': template_values, 'negated_pins': negated_pins}
'uid': uid,
'type': name, # Usar 'type' para consistencia
'template_values': template_values,
'negated_pins': negated_pins # Añadir diccionario de pines negados
}
# --- NUEVA FUNCIÓN parse_call ---
def parse_call(call_element): def parse_call(call_element):
""" # (Mantiene la corrección para DB de instancia)
Parsea un elemento Call (llamada a FC/FB) y extrae su información. if call_element is None: return None
"""
if call_element is None:
return None
uid = call_element.get("UId") uid = call_element.get("UId")
if not uid: if not uid: print(f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"); return None
print(
f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"
)
return None
call_info_elem = call_element.xpath("./*[local-name()='CallInfo']") call_info_elem = call_element.xpath("./*[local-name()='CallInfo']")
if not call_info_elem: if not call_info_elem: print(f"Error: Call UID {uid} sin elemento CallInfo."); return None
print(f"Error: Call UID {uid} sin elemento CallInfo.")
return None
call_info = call_info_elem[0] call_info = call_info_elem[0]
block_name = call_info.get("Name") block_name = call_info.get("Name")
block_type = call_info.get("BlockType") # FC, FB, etc. block_type = call_info.get("BlockType")
instance_name = None instance_name = None; instance_scope = None
instance_scope = None if not block_name or not block_type: print(f"Error: CallInfo para UID {uid} sin Name o BlockType."); return None
if block_type == "FB":
# Si es una llamada a FB, puede tener información de instancia instance_elem_list = call_info.xpath("./*[local-name()='Instance']")
instance_elem = call_info.xpath("./*[local-name()='Instance']") if instance_elem_list:
if instance_elem: instance_elem = instance_elem_list[0]
instance = instance_elem[0] instance_scope = instance_elem.get("Scope")
instance_scope = instance.get("Scope") # Ej: GlobalVariable, InstanceDB component_elem_list = instance_elem.xpath("./*[local-name()='Component']") # Busca Component directo
# El nombre de la instancia DB suele estar en Component dentro de Symbol if component_elem_list:
symbol_elem = instance.xpath("./*[local-name()='Symbol']") component_elem = component_elem_list[0]
if symbol_elem: db_name_raw = component_elem.get('Name')
instance_name = get_symbol_name(symbol_elem[0]) if db_name_raw: instance_name = f'"{db_name_raw}"' # Añade comillas
else: print(f"Advertencia: <Component> dentro de <Instance> para FB Call UID {uid} no tiene atributo 'Name'.")
if not block_name or not block_type: else: print(f"Advertencia: No se encontró <Component> dentro de <Instance> para FB Call UID {uid}. No se pudo obtener el nombre del DB.")
print(f"Error: CallInfo para UID {uid} sin Name o BlockType.") else: print(f"Advertencia: FB Call '{block_name}' UID {uid} no tiene elemento <Instance>.")
return None call_data = {"uid": uid, "type": "Call", "block_name": block_name, "block_type": block_type}
if instance_name: call_data["instance_db"] = instance_name
call_data = { if instance_scope: call_data["instance_scope"] = instance_scope
"uid": uid,
"type": "Call", # Tipo genérico para nuestra estructura JSON
"block_name": block_name,
"block_type": block_type,
}
if instance_name:
call_data["instance_db"] = instance_name
if instance_scope:
call_data["instance_scope"] = instance_scope
return call_data return call_data
# --- FIN NUEVA FUNCIÓN --- # --- Función parse_network con XPath corregido para Title/Comment ---
# --- Función parse_network MODIFICADA ---
def parse_network(network_element): def parse_network(network_element):
""" """
Parsea una red, extrae lógica (incluyendo Calls) y añade conexiones EN implícitas. Parsea una red, extrae lógica y añade conexiones EN implícitas.
""" """
if network_element is None: if network_element is None:
return { return {"id": "ERROR", "title": "Invalid Network Element", "comment": "", "logic": [], "error": "Input element was None"}
"id": "ERROR",
"title": "Invalid Network Element",
"comment": "",
"logic": [],
"error": "Input element was None",
}
network_id = network_element.get("ID") network_id = network_element.get("ID")
# --- CORRECCIÓN XPath para Title y Comment ---
# Usar local-name() como en la versión original que funcionaba para esto
title_element = network_element.xpath( title_element = network_element.xpath(
".//*[local-name()='MultilingualText'][@CompositionName='Title']" ".//*[local-name()='MultilingualText'][@CompositionName='Title']" # Busca en cualquier nivel descendiente
) )
network_title = ( network_title = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}"
get_multilingual_text(title_element[0])
if title_element comment_element = network_element.xpath(
else f"Network {network_id}" "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" # Busca directamente bajo ObjectList
)
comment_title_element = network_element.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
network_comment = (
get_multilingual_text(comment_title_element[0]) if comment_title_element else ""
) )
network_comment = get_multilingual_text(comment_element[0]) if comment_element else ""
# --- FIN CORRECCIÓN XPath ---
flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns)
if not flgnet_list: if not flgnet_list:
return { return {"id": network_id, "title": network_title, "comment": network_comment, "logic": [], "error": "FlgNet not found"}
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": [],
"error": "FlgNet not found",
}
flgnet = flgnet_list[0] flgnet = flgnet_list[0]
# 1. Parsear Access, Parts y Calls # 1. Parsear Access, Parts y Calls (usando parse_call corregido)
access_map = { access_map = {acc_info["uid"]: acc_info for acc in flgnet.xpath(".//flg:Access", namespaces=ns) if (acc_info := parse_access(acc)) and acc_info['type'] != 'unknown'}
acc_info["uid"]: acc_info
for acc in flgnet.xpath(".//flg:Access", namespaces=ns)
if (acc_info := parse_access(acc))
}
# --- MODIFICADO: Unir Parts y Calls en un solo mapa ---
parts_and_calls_map = {} parts_and_calls_map = {}
instruction_elements = flgnet.xpath( instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns)
".//flg:Part | .//flg:Call", namespaces=ns
) # Buscar ambos tipos
for element in instruction_elements: for element in instruction_elements:
parsed_info = None parsed_info = None
if element.tag.endswith("Part"): if element.tag == etree.QName(ns["flg"], "Part"): parsed_info = parse_part(element)
parsed_info = parse_part(element) elif element.tag == etree.QName(ns["flg"], "Call"): parsed_info = parse_call(element) # Usa la versión con fix DB
elif element.tag.endswith("Call"): if parsed_info and "uid" in parsed_info: parts_and_calls_map[parsed_info["uid"]] = parsed_info
parsed_info = parse_call(element) else: print(f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}")
if parsed_info and "uid" in parsed_info: # 2. Parsear Wires (sin cambios)
parts_and_calls_map[parsed_info["uid"]] = parsed_info wire_connections = defaultdict(list); source_connections = defaultdict(list); eno_outputs = defaultdict(list)
else:
print(
f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}"
)
# --- FIN MODIFICADO ---
# 2. Parsear Wires y construir mapas de conexiones
wire_connections = defaultdict(list)
source_connections = defaultdict(list)
eno_outputs = defaultdict(list)
flg_ns_uri = ns["flg"] flg_ns_uri = ns["flg"]
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns): for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
source_uid, source_pin, dest_uid, dest_pin = None, None, None, None source_uid, source_pin, dest_uid, dest_pin = None, None, None, None
children = wire.getchildren() children = wire.getchildren();
if len(children) < 2: if len(children) < 2: continue
continue
source_elem, dest_elem = children[0], children[1] source_elem, dest_elem = children[0], children[1]
if source_elem.tag == etree.QName(flg_ns_uri, "Powerrail"): if source_elem.tag == etree.QName(flg_ns_uri, "Powerrail"): source_uid, source_pin = "POWERRAIL", "out"
source_uid, source_pin = "POWERRAIL", "out" elif source_elem.tag == etree.QName(flg_ns_uri, "IdentCon"): source_uid, source_pin = source_elem.get("UId"), "value"
elif source_elem.tag == etree.QName(flg_ns_uri, "IdentCon"): elif source_elem.tag == etree.QName(flg_ns_uri, "NameCon"): source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name")
source_uid, source_pin = source_elem.get("UId"), "value" if dest_elem.tag == etree.QName(flg_ns_uri, "IdentCon"): dest_uid, dest_pin = dest_elem.get("UId"), "value"
elif source_elem.tag == etree.QName(flg_ns_uri, "NameCon"): elif dest_elem.tag == etree.QName(flg_ns_uri, "NameCon"): dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name")
source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name") if source_uid is not None and source_pin is not None and dest_uid is not None and dest_pin is not None:
if dest_elem.tag == etree.QName(flg_ns_uri, "IdentCon"): dest_key = (dest_uid, dest_pin); source_info = (source_uid, source_pin)
dest_uid, dest_pin = dest_elem.get("UId"), "value" if source_info not in wire_connections[dest_key]: wire_connections[dest_key].append(source_info)
elif dest_elem.tag == etree.QName(flg_ns_uri, "NameCon"): source_key = (source_uid, source_pin); dest_info = (dest_uid, dest_pin)
dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name") if dest_info not in source_connections[source_key]: source_connections[source_key].append(dest_info)
if dest_uid and dest_pin and source_uid is not None: if source_pin == "eno" and source_uid in parts_and_calls_map:
dest_key = (dest_uid, dest_pin) if dest_info not in eno_outputs[source_uid]: eno_outputs[source_uid].append(dest_info)
source_key = (source_uid, source_pin)
source_info = (source_uid, source_pin)
dest_info = (dest_uid, dest_pin)
if source_info not in wire_connections[dest_key]:
wire_connections[dest_key].append(source_info)
if dest_info not in source_connections[source_key]:
source_connections[source_key].append(dest_info)
if (
source_pin == "eno" and source_uid in parts_and_calls_map
): # Usar mapa combinado
if dest_info not in eno_outputs[source_uid]:
eno_outputs[source_uid].append(dest_info)
# 3. Construir la representación lógica INICIAL # 3. Construcción Lógica Inicial (sin cambios)
all_logic_steps = {} all_logic_steps = {}
functional_block_types = [ functional_block_types = ['Move', 'Add', 'Sub', 'Mul', 'Div', 'Mod', 'Convert', 'Call', 'Se', 'Sd', 'BLKMOV']
"Move", rlo_generators = ['Contact', 'O', 'Eq', 'Ne', 'Gt', 'Lt', 'Ge', 'Le', 'And', 'Xor', 'PBox', 'NBox']
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call",
] # Añadir Call
rlo_generators = [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"And",
"Xor",
"PBox",
]
# --- MODIFICADO: Iterar sobre el mapa combinado ---
for instruction_uid, instruction_info in parts_and_calls_map.items(): for instruction_uid, instruction_info in parts_and_calls_map.items():
# Usar directamente la info parseada (part_info o call_info) instruction_repr = {"instruction_uid": instruction_uid, **instruction_info}; instruction_repr["inputs"] = {}; instruction_repr["outputs"] = {}
instruction_repr = { possible_input_pins = set(['en', 'in', 'in1', 'in2', 'in3', 'in4', 's', 'r', 'clk', 'cu', 'cd', 'ld', 'pv', 'tv', 'bit', 'operand', 'pre', 'SRCBLK'])
"instruction_uid": instruction_uid, for dest_pin_name in possible_input_pins:
**instruction_info,
} # Copiar datos base
instruction_repr["inputs"] = {}
instruction_repr["outputs"] = {}
# --- FIN MODIFICADO ---
# Rellenar inputs explícitos
# Añadir más pines si las llamadas a FB los usan (ej: parámetros FC/FB)
possible_pins = set(
["en", "in", "in1", "in2", "in3", "in4", "bit", "operand", "pre", "clk"]
)
# Añadir pines específicos de la llamada si es un FB? Más complejo.
for dest_pin_name in possible_pins:
dest_key = (instruction_uid, dest_pin_name) dest_key = (instruction_uid, dest_pin_name)
if dest_key in wire_connections: if dest_key in wire_connections:
sources_list = wire_connections[dest_key] sources_list = wire_connections[dest_key]; input_sources_repr = []
input_sources_repr = []
for source_uid, source_pin in sources_list: for source_uid, source_pin in sources_list:
if source_uid == "POWERRAIL": if source_uid == "POWERRAIL": input_sources_repr.append({"type": "powerrail"})
input_sources_repr.append({"type": "powerrail"}) elif source_uid in access_map: input_sources_repr.append(access_map[source_uid])
elif source_uid in access_map: elif source_uid in parts_and_calls_map: input_sources_repr.append({"type": "connection", "source_instruction_type": parts_and_calls_map[source_uid]["type"], "source_instruction_uid": source_uid, "source_pin": source_pin})
input_sources_repr.append(access_map[source_uid]) else: input_sources_repr.append({"type": "unknown_source", "uid": source_uid})
elif source_uid in parts_and_calls_map: # Usar mapa combinado if len(input_sources_repr) == 1: instruction_repr["inputs"][dest_pin_name] = input_sources_repr[0]
input_sources_repr.append( elif len(input_sources_repr) > 1: instruction_repr["inputs"][dest_pin_name] = input_sources_repr
{ possible_output_pins = set(['out', 'out1', 'Q', 'eno', 'RET_VAL', 'DSTBLK', 'q', 'rt', 'rtbcd', 'cv', 'cvbcd'])
"type": "connection", for source_pin_name in possible_output_pins:
# Usar el tipo del mapa combinado
"source_instruction_type": parts_and_calls_map[
source_uid
]["type"],
"source_instruction_uid": source_uid,
"source_pin": source_pin,
}
)
else:
input_sources_repr.append(
{"type": "unknown_source", "uid": source_uid}
)
if len(input_sources_repr) == 1:
instruction_repr["inputs"][dest_pin_name] = input_sources_repr[0]
elif len(input_sources_repr) > 1:
instruction_repr["inputs"][dest_pin_name] = input_sources_repr
# Rellenar outputs explícitos (hacia Access)
for source_pin_name in [
"out",
"out1",
"Q",
"eno",
]: # Añadir salidas comunes de FC/FB si es necesario
source_key = (instruction_uid, source_pin_name) source_key = (instruction_uid, source_pin_name)
if source_key in source_connections: if source_key in source_connections:
for dest_uid, dest_pin in source_connections[source_key]: for dest_uid, dest_pin in source_connections[source_key]:
if dest_uid in access_map: if dest_uid in access_map:
if source_pin_name not in instruction_repr["outputs"]: if source_pin_name not in instruction_repr["outputs"]: instruction_repr["outputs"][source_pin_name] = []
instruction_repr["outputs"][source_pin_name] = [] if access_map[dest_uid] not in instruction_repr["outputs"][source_pin_name]: instruction_repr["outputs"][source_pin_name].append(access_map[dest_uid])
if (
access_map[dest_uid]
not in instruction_repr["outputs"][source_pin_name]
):
instruction_repr["outputs"][source_pin_name].append(
access_map[dest_uid]
)
all_logic_steps[instruction_uid] = instruction_repr all_logic_steps[instruction_uid] = instruction_repr
# --- 4. INFERENCIA Y PROPAGACIÓN DE CONEXIONES 'EN' IMPLÍCITAS --- # 4. Inferencia EN (sin cambios)
# (Esta lógica probablemente necesite ajustes para considerar 'Call' como bloque funcional) processed_blocks_en_inference = set(); something_changed = True; inference_passes = 0; max_inference_passes = len(all_logic_steps) + 5
# print(f"DEBUG: Iniciando inferencia EN para Red {network_id}...") try: sorted_uids_for_en = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf'))
processed_blocks_en_inference = set() except ValueError: sorted_uids_for_en = sorted(all_logic_steps.keys())
something_changed = True ordered_logic_list_for_en = [all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps]
inference_passes = 0
max_inference_passes = len(all_logic_steps) + 5
while something_changed and inference_passes < max_inference_passes: while something_changed and inference_passes < max_inference_passes:
something_changed = False something_changed = False; inference_passes += 1
inference_passes += 1 for i, instruction in enumerate(ordered_logic_list_for_en):
try: part_uid = instruction["instruction_uid"]; part_type_original = instruction["type"].replace('_scl', '').replace('_error', '')
sorted_uids_for_pass = sorted( if (part_type_original in functional_block_types and "en" not in instruction["inputs"] and part_uid not in processed_blocks_en_inference):
all_logic_steps.keys(),
key=lambda x: int(x) if x.isdigit() else float("inf"),
)
except ValueError:
sorted_uids_for_pass = sorted(all_logic_steps.keys())
current_logic_list = [
all_logic_steps[uid]
for uid in sorted_uids_for_pass
if uid in all_logic_steps
]
for i, instruction in enumerate(
current_logic_list
): # Usar enumerate para obtener índice
part_uid = instruction["instruction_uid"]
part_type = instruction["type"] # Ahora puede ser 'Call'
# Si es un bloque funcional (incluyendo Call) sin 'en' explícito
if (
part_type in functional_block_types
and "en" not in instruction["inputs"]
and part_uid not in processed_blocks_en_inference
):
inferred_en_source = None inferred_en_source = None
my_index = i # Ya tenemos el índice if i > 0:
for j in range(i - 1, -1, -1):
if my_index > 0: prev_instr = ordered_logic_list_for_en[j]; prev_uid = prev_instr["instruction_uid"]; prev_type_original = prev_instr["type"].replace('_scl', '').replace('_error', '')
for j in range(my_index - 1, -1, -1): # Buscar hacia atrás if prev_type_original in rlo_generators: inferred_en_source = {"type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "out"}; break
prev_instr = current_logic_list[j] elif prev_type_original in functional_block_types:
prev_uid = prev_instr["instruction_uid"]
prev_type = prev_instr["type"]
if prev_type in rlo_generators:
inferred_en_source = {
"type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type,
"source_pin": "out",
}
break
elif prev_type in functional_block_types:
source_key_eno = (prev_uid, "eno") source_key_eno = (prev_uid, "eno")
if source_key_eno in source_connections: if source_key_eno in source_connections: inferred_en_source = {"type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "eno"}; break
inferred_en_source = { else: continue
"type": "connection", elif prev_type_original in ['Coil', 'SCoil', 'RCoil', 'SetCoil', 'ResetCoil', 'SdCoil']: break
"source_instruction_uid": prev_uid, if inferred_en_source: all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source; processed_blocks_en_inference.add(part_uid); something_changed = True
"source_instruction_type": prev_type,
"source_pin": "eno",
}
break
if inferred_en_source: # 5. Añadir lógica ENO interesante (sin cambios)
# Asegurarse de que 'instruction' se refiera al diccionario original
all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source
processed_blocks_en_inference.add(part_uid)
something_changed = True
# --- 5. Añadir lógica ENO interesante ---
# (Necesita usar parts_and_calls_map ahora)
for source_instr_uid, eno_destinations in eno_outputs.items(): for source_instr_uid, eno_destinations in eno_outputs.items():
if source_instr_uid not in all_logic_steps: if source_instr_uid not in all_logic_steps: continue
continue
interesting_eno_logic = [] interesting_eno_logic = []
for dest_uid, dest_pin in eno_destinations: for dest_uid, dest_pin in eno_destinations:
is_direct_en_connection = ( is_direct_en_connection = False
dest_uid in parts_and_calls_map and dest_pin == "en" if dest_uid in parts_and_calls_map and dest_pin == 'en':
) # Check en mapa combinado try:
source_idx = sorted_uids_for_en.index(source_instr_uid); dest_idx = sorted_uids_for_en.index(dest_uid)
if dest_idx == source_idx + 1 and parts_and_calls_map[dest_uid]['type'] in functional_block_types: is_direct_en_connection = True
except ValueError: pass
if not is_direct_en_connection: if not is_direct_en_connection:
target_info = {"target_pin": dest_pin} target_info = {"target_pin": dest_pin}
if dest_uid in parts_and_calls_map: if dest_uid in parts_and_calls_map: target_info.update({"target_type": "instruction", "target_uid": dest_uid, "target_name": parts_and_calls_map[dest_uid].get("name", parts_and_calls_map[dest_uid].get("type"))})
target_info.update( elif dest_uid in access_map: target_info.update({"target_type": "operand", "target_details": access_map[dest_uid]})
{ else: target_info.update({"target_type": "unknown", "target_uid": dest_uid})
"target_type": "instruction",
"target_uid": dest_uid,
"target_name": parts_and_calls_map[dest_uid].get(
"name", parts_and_calls_map[dest_uid].get("type")
),
}
) # Usar 'name' si existe (Part) o 'type' (Call)
elif dest_uid in access_map:
target_info.update(
{
"target_type": "operand",
"target_details": access_map[dest_uid],
}
)
else:
target_info.update(
{"target_type": "unknown", "target_uid": dest_uid}
)
interesting_eno_logic.append(target_info) interesting_eno_logic.append(target_info)
if interesting_eno_logic: if interesting_eno_logic: all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic
all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic
# --- 6. Ordenar Lógica Final y Devolver --- # 6. Ordenar y Devolver (sin cambios)
try: network_logic_final = [all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps]
sorted_uids = sorted( return {"id": network_id, "title": network_title, "comment": network_comment, "logic": network_logic_final}
all_logic_steps.keys(),
key=lambda x: int(x) if x.isdigit() else float("inf"),
)
except ValueError:
print(f"Advertencia: UIDs no numéricos red {network_id}. Orden alfabético.")
sorted_uids = sorted(all_logic_steps.keys())
network_logic = [
all_logic_steps[uid] for uid in sorted_uids if uid in all_logic_steps
]
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": network_logic,
}
# --- Función Principal convert_xml_to_json (sin cambios) --- # --- Función Principal convert_xml_to_json (sin cambios en su flujo general) ---
def convert_xml_to_json(xml_filepath, json_filepath): def convert_xml_to_json(xml_filepath, json_filepath):
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
if not os.path.exists(xml_filepath): if not os.path.exists(xml_filepath): print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'"); return
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return
try: try:
print("Paso 1: Parseando archivo XML...") print("Paso 1: Parseando archivo XML...")
parser = etree.XMLParser(remove_blank_text=True) parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(xml_filepath, parser) tree = etree.parse(xml_filepath, parser)
root = tree.getroot() root = tree.getroot()
print("Paso 1: Parseo XML completado.") print("Paso 1: Parseo XML completado.")
print("Paso 2: Buscando el bloque SW.Blocks.FC...") print("Paso 2: Buscando el bloque SW.Blocks.FC...") # Asume FC primero
fc_block_list = root.xpath("//*[local-name()='SW.Blocks.FC']") block_list = root.xpath("//*[local-name()='SW.Blocks.FC']")
if not fc_block_list: block_type_found = "FC"
print("Error Crítico: No se encontró <SW.Blocks.FC>.") if not block_list:
return block_list = root.xpath("//*[local-name()='SW.Blocks.FB']") # Busca FB si no hay FC
fc_block = fc_block_list[0] block_type_found = "FB"
print(f"Paso 2: Bloque SW.Blocks.FC encontrado (ID={fc_block.get('ID')}).") if not block_list: print("Error Crítico: No se encontró <SW.Blocks.FC> ni <SW.Blocks.FB>."); return
else: print("Advertencia: Se encontró <SW.Blocks.FB> en lugar de <SW.Blocks.FC>.")
the_block = block_list[0]
print(f"Paso 2: Bloque SW.Blocks.{block_type_found} encontrado (ID={the_block.get('ID')}).")
print("Paso 3: Extrayendo atributos del bloque...") print("Paso 3: Extrayendo atributos del bloque...")
attribute_list_node = fc_block.xpath("./*[local-name()='AttributeList']") attribute_list_node = the_block.xpath("./*[local-name()='AttributeList']")
block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown" block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown"
if attribute_list_node: if attribute_list_node:
attr_list = attribute_list_node[0] attr_list = attribute_list_node[0]
name_node = attr_list.xpath("./*[local-name()='Name']/text()") name_node = attr_list.xpath("./*[local-name()='Name']/text()")
block_name_val = name_node[0].strip() if name_node else block_name_val block_name_val = name_node[0].strip() if name_node else block_name_val
num_node = attr_list.xpath("./*[local-name()='Number']/text()") num_node = attr_list.xpath("./*[local-name()='Number']/text()")
block_number_val = ( try: block_number_val = int(num_node[0]) if num_node else None
int(num_node[0]) except ValueError: block_number_val = None
if num_node and num_node[0].isdigit() lang_node = attr_list.xpath("./*[local-name()='ProgrammingLanguage']/text()")
else block_number_val
)
lang_node = attr_list.xpath(
"./*[local-name()='ProgrammingLanguage']/text()"
)
block_lang_val = lang_node[0].strip() if lang_node else block_lang_val block_lang_val = lang_node[0].strip() if lang_node else block_lang_val
print( print(f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'")
f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'" else: print(f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}.")
)
else:
print("Advertencia: No se encontró AttributeList para el bloque FC.")
block_comment_val = "" block_comment_val = ""
comment_node_list = fc_block.xpath( comment_node_list = the_block.xpath("./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']")
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" if comment_node_list: block_comment_val = get_multilingual_text(comment_node_list[0]); print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
) result = {"block_name": block_name_val, "block_number": block_number_val, "language": block_lang_val, "block_comment": block_comment_val, "interface": {}, "networks": []}
if comment_node_list:
block_comment_val = get_multilingual_text(comment_node_list[0])
print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
result = {
"block_name": block_name_val,
"block_number": block_number_val,
"language": block_lang_val,
"block_comment": block_comment_val,
"interface": {},
"networks": [],
}
print("Paso 4: Extrayendo la interfaz del bloque...") print("Paso 4: Extrayendo la interfaz del bloque...")
if attribute_list_node: if attribute_list_node:
interface_node_list = attribute_list_node[0].xpath( interface_node_list = attribute_list_node[0].xpath(".//*[local-name()='Interface']")
"./*[local-name()='Interface']"
)
if interface_node_list: if interface_node_list:
interface_node = interface_node_list[0] interface_node = interface_node_list[0]
print("Paso 4: Nodo Interface encontrado.") print("Paso 4: Nodo Interface encontrado.")
for section in interface_node.xpath(".//iface:Section", namespaces=ns): for section in interface_node.xpath(".//iface:Section", namespaces=ns):
section_name = section.get("Name") section_name = section.get("Name");
if not section_name: continue
members = [] members = []
for member in section.xpath("./iface:Member", namespaces=ns): for member in section.xpath("./iface:Member", namespaces=ns):
member_name = member.get("Name") member_name = member.get("Name"); member_dtype = member.get("Datatype")
member_dtype = member.get("Datatype") if member_name and member_dtype: members.append({"name": member_name, "datatype": member_dtype})
if member_name and member_dtype: if members: result["interface"][section_name] = members
members.append( if not result["interface"]: print("Advertencia: Interface sin secciones iface:Section válidas.")
{"name": member_name, "datatype": member_dtype} else: print("Advertencia: No se encontró <Interface> dentro de <AttributeList>.")
) if not result["interface"]: print("Advertencia: No se pudo extraer información de la interfaz.")
if members:
result["interface"][section_name] = members
if not result["interface"]:
print("Advertencia: Interface sin secciones iface:Section válidas.")
else:
print(
"Advertencia: No se encontró <Interface> DENTRO de <AttributeList>."
)
if not result["interface"]:
print("Advertencia: No se pudo extraer información de la interfaz.")
print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...") print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...")
networks_processed_count = 0 networks_processed_count = 0
object_list_node = fc_block.xpath("./*[local-name()='ObjectList']") object_list_node = the_block.xpath("./*[local-name()='ObjectList']")
if object_list_node: if object_list_node:
compile_units = object_list_node[0].xpath( compile_units = object_list_node[0].xpath("./*[local-name()='SW.Blocks.CompileUnit']")
"./*[local-name()='SW.Blocks.CompileUnit']" print(f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit.")
)
print(
f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit."
)
for network_elem in compile_units: for network_elem in compile_units:
networks_processed_count += 1 networks_processed_count += 1
# print(f"DEBUG: Procesando red #{networks_processed_count} (ID={network_elem.get('ID')})...") parsed_network = parse_network(network_elem) # Llamada a la función de parseo de red
parsed_network = parse_network( if parsed_network and parsed_network.get("error") is None: result["networks"].append(parsed_network)
network_elem elif parsed_network: print(f"Error: Falló parseo red ID={parsed_network.get('id')}: {parsed_network.get('error')}")
) # Llamada a la función modificada else: print(f"Error Fatal: parse_network devolvió None para CompileUnit ID={network_elem.get('ID')}.")
if parsed_network and parsed_network.get("error") is None: if networks_processed_count == 0: print("Advertencia: ObjectList sin SW.Blocks.CompileUnit.")
result["networks"].append(parsed_network) else: print("Advertencia: No se encontró ObjectList para el bloque.")
elif parsed_network:
print(
f"Error: Falló parseo red ID={parsed_network.get('id')}: {parsed_network.get('error')}"
)
result["networks"].append(parsed_network)
else:
print(
f"Error: parse_network devolvió None para CompileUnit (ID={network_elem.get('ID')})."
)
if networks_processed_count == 0:
print("Advertencia: ObjectList sin SW.Blocks.CompileUnit.")
else:
print("Advertencia: No se encontró ObjectList.")
print("Paso 6: Escribiendo el resultado en el archivo JSON...") print("Paso 6: Escribiendo el resultado en el archivo JSON...")
if not result["interface"]: if not result["interface"]: print("ADVERTENCIA FINAL: 'interface' está vacía.")
print("ADVERTENCIA FINAL: 'interface' está vacía.") if not result["networks"]: print("ADVERTENCIA FINAL: 'networks' está vacía.")
if not result["networks"]:
print("ADVERTENCIA FINAL: 'networks' está vacía.")
# else: # Chequeo ENO logic
# eno_logic_found = any(instr.get('eno_logic') for net in result.get('networks', []) if net.get('error') is None for instr in net.get('logic', []))
# if eno_logic_found: print("INFO FINAL: Lógica ENO interesante detectada.")
# else: print("INFO FINAL: No se detectó lógica ENO interesante.")
try: try:
with open(json_filepath, "w", encoding="utf-8") as f: with open(json_filepath, "w", encoding="utf-8") as f: json.dump(result, f, indent=4, ensure_ascii=False)
json.dump(result, f, indent=4, ensure_ascii=False) print("Paso 6: Escritura completada."); print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'")
print(f"Paso 6: Escritura completada.") except IOError as e: print(f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}")
print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'") except TypeError as e: print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
except IOError as e: except etree.XMLSyntaxError as e: print(f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}")
print( except Exception as e: print(f"Error Crítico: Error inesperado durante la conversión: {e}"); print("--- Traceback ---"); traceback.print_exc(); print("--- Fin Traceback ---")
f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"
)
except TypeError as e:
print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
except etree.XMLSyntaxError as e:
print(f"Error Crítico: Sintaxis XML en '{xml_filepath}'. Detalles: {e}")
except Exception as e:
print(f"Error Crítico: Error inesperado: {e}")
print("--- Traceback ---")
traceback.print_exc()
print("--- Fin Traceback ---")
# --- Punto de Entrada Principal --- # --- Punto de Entrada Principal ---
if __name__ == "__main__": if __name__ == "__main__":
xml_file = "TestLAD.xml" # CAMBIAR AL NUEVO ARCHIVO XML xml_filename_base = "TestLAD"
json_file = xml_file.replace( xml_file = f"{xml_filename_base}.xml"
".xml", "_simplified.json" json_file = f"{xml_filename_base}_simplified.json"
) # Nombre de salida dinámico
convert_xml_to_json(xml_file, json_file) convert_xml_to_json(xml_file, json_file)

View File

@ -608,223 +608,93 @@ def process_move(instruction, network_id, scl_map, access_map):
return True return True
def process_pbox(instruction, network_id, scl_map, access_map, network_logic_list): def process_edge_detector(instruction, network_id, scl_map, access_map):
"""Genera SCL para PBox (P_TRIG) o NBox (N_TRIG)."""
instr_uid = instruction["instruction_uid"] instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"] instr_type_original = instruction["type"] # PBox o NBox
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original:
return False return False
# Obtener CLK (señal de entrada) y MemBit (bit de memoria)
clk_input = instruction["inputs"].get("in")
mem_bit_input = instruction["inputs"].get("bit") mem_bit_input = instruction["inputs"].get("bit")
mem_bit_scl = get_scl_representation(mem_bit_input, network_id, scl_map, access_map)
if mem_bit_scl is None: clk_scl = get_scl_representation(clk_input, network_id, scl_map, access_map)
return False # Dependencia no lista mem_bit_scl_original = get_scl_representation(
mem_bit_input, network_id, scl_map, access_map
)
# Validar y formatear el bit de memoria if clk_scl is None or mem_bit_scl_original is None:
# print(f"DEBUG Edge: Esperando dependencias para {instr_type_original} UID {instr_uid}")
return False # Dependencias no listas
# Validar que el bit de memoria sea una variable
if not (mem_bit_input and mem_bit_input.get("type") == "variable"): if not (mem_bit_input and mem_bit_input.get("type") == "variable"):
print(f"Error: PBOX {instr_uid} 'bit' no es variable o falta información.") print(
instruction["scl"] = ( f"Error: {instr_type_original} {instr_uid} 'bit' no es variable o falta información."
f"// ERROR: PBox {instr_uid} 'bit' no es variable o falta info"
) )
instruction["type"] += "_error" instruction["scl"] = (
f"// ERROR: {instr_type_original} {instr_uid} 'bit' no es variable."
)
instruction["type"] = instr_type_original + "_error"
return True # Procesado con error return True # Procesado con error
mem_bit_scl_formatted = format_variable_name(mem_bit_scl) # --- Renombrar bit de memoria para VAR_STAT ---
# Quitar comillas existentes, añadir prefijo "stat_" y volver a añadir comillas
mem_bit_name_clean = mem_bit_scl_original.strip('"')
stat_mem_bit_scl = (
f'"stat_{mem_bit_name_clean}"' # Nombre SCL para la variable estática
)
# --- Lógica para inferir CLK (similar a la versión anterior) --- # Asegurar paréntesis alrededor de CLK si es complejo
# Determinar si es P_TRIG (conectado a una bobina) # Formatear CLK
is_likely_p_trig = False clk_scl_formatted = clk_scl
consuming_coil_uid = None # Añadir paréntesis si es necesario (expresión compleja o asignación previa)
for potential_consumer in network_logic_list: # No añadir paréntesis a TRUE/FALSE literales
coil_input_signal = potential_consumer.get("inputs", {}).get("in") if (
if ( clk_scl not in ["TRUE", "FALSE"]
isinstance(coil_input_signal, dict) and (" " in clk_scl or "AND" in clk_scl or "OR" in clk_scl or ":=" in clk_scl)
and coil_input_signal.get("type") == "connection" and not (clk_scl.startswith("(") and clk_scl.endswith(")"))
and coil_input_signal.get("source_instruction_uid") == instr_uid ):
and coil_input_signal.get("source_pin") clk_scl_formatted = f"({clk_scl})"
== "out" # PBox output alimenta la bobina
):
consumer_type_original = (
potential_consumer.get("type", "")
.replace("_scl", "")
.replace("_error", "")
)
if consumer_type_original == "Coil":
is_likely_p_trig = True
consuming_coil_uid = potential_consumer.get("instruction_uid")
break # Encontrado consumidor de bobina
rlo_scl = None # Este será el CLK inferido # --- Generar Lógica SCL ---
if is_likely_p_trig: result_scl = "FALSE" # SCL para la salida del flanco (pin 'out')
# Buscar hacia atrás la fuente del RLO que alimenta este PBox scl_comment = "" # Comentario informativo
clk_source_found = False
current_instr_index = -1
for i, instr in enumerate(network_logic_list):
if instr["instruction_uid"] == instr_uid:
current_instr_index = i
break
if current_instr_index != -1: if instr_type_original == "PBox": # Flanco Positivo (P_TRIG)
# Buscar la entrada 'in' del PBox si existe explícitamente result_scl = f"{clk_scl_formatted} AND NOT {stat_mem_bit_scl}"
pbox_in_signal = instruction.get("inputs", {}).get("in") scl_comment = f"// P_TRIG: {result_scl}"
if pbox_in_signal: elif instr_type_original == "NBox": # Flanco Negativo (N_TRIG)
rlo_scl = get_scl_representation( result_scl = f"NOT {clk_scl_formatted} AND {stat_mem_bit_scl}"
pbox_in_signal, network_id, scl_map, access_map scl_comment = f"// N_TRIG: {result_scl}"
) else:
if rlo_scl is not None: # No debería ocurrir si el mapeo de procesadores es correcto
clk_source_found = True print(
# print(f"DEBUG: PBox {instr_uid} CLK encontrado por pin 'in': {rlo_scl}") f"Error interno: process_edge_detector llamado para tipo inesperado {instr_type_original}"
# Si no hay pin 'in' explícito, buscar hacia atrás (lógica LAD clásica)
if not clk_source_found:
# print(f"DEBUG: PBox {instr_uid} buscando CLK hacia atrás...")
for i in range(current_instr_index - 1, -1, -1):
prev_instr = network_logic_list[i]
prev_instr_uid = prev_instr["instruction_uid"]
prev_instr_type_original = (
prev_instr.get("type", "")
.replace(SCL_SUFFIX, "")
.replace("_error", "")
)
# ¿Es una instrucción que genera RLO booleano?
if prev_instr_type_original in [
"Contact",
"Eq",
"O",
"PBox",
"And",
"Xor",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
]:
map_key_prev_out = (network_id, prev_instr_uid, "out")
potential_clk_scl = scl_map.get(map_key_prev_out)
if potential_clk_scl is not None:
rlo_scl = potential_clk_scl
clk_source_found = True
# print(f"DEBUG: PBox {instr_uid} CLK encontrado de {prev_instr_type_original} {prev_instr_uid} (out): {rlo_scl}")
break
# ¿Es un bloque funcional cuya salida ENO podría ser el RLO?
elif prev_instr_type_original in [
"Move",
"Add",
"Convert",
"Mod",
"Call",
]: # Añadir otros bloques funcionales
map_key_prev_eno = (network_id, prev_instr_uid, "eno")
potential_clk_scl = scl_map.get(map_key_prev_eno)
if potential_clk_scl is not None:
rlo_scl = potential_clk_scl
clk_source_found = True
# print(f"DEBUG: PBox {instr_uid} CLK encontrado de {prev_instr_type_original} {prev_instr_uid} (eno): {rlo_scl}")
break
# Si encontramos una bobina, el RLO se detiene ahí
elif prev_instr_type_original == "Coil":
# print(f"DEBUG: PBox {instr_uid} búsqueda CLK detenida por Coil {prev_instr_uid}")
break
if not clk_source_found:
print(f"Error: No se pudo inferir CLK para P_TRIG PBOX {instr_uid}")
instruction["scl"] = f"// ERROR: PBox {instr_uid} (P_TRIG) sin CLK inferido"
instruction["type"] += "_error"
return True # Procesado con error
if rlo_scl is None:
# print(f"DEBUG: PBox {instr_uid} CLK encontrado pero valor es None, esperando...")
return False # Dependencia (CLK) no resuelta aún
# --- Generar SCL ---
scl_comment = ""
result_scl = "" # El valor que PBox pone en scl_map['out']
if is_likely_p_trig:
# Formatear CLK si es variable (poco probable, suele ser resultado lógico)
# clk_signal_formatted = format_variable_name(rlo_scl) if ... else rlo_scl
clk_signal_formatted = (
rlo_scl # Asumir que ya está bien formateado o es una expresión
) )
# Añadir paréntesis si CLK es complejo
if (
" " in clk_signal_formatted
or "AND" in clk_signal_formatted
or "OR" in clk_signal_formatted
) and not (
clk_signal_formatted.startswith("(") and clk_signal_formatted.endswith(")")
):
clk_signal_formatted = f"({clk_signal_formatted})"
# Generar llamada a función de flanco (asumimos P_TRIG)
# Necesitamos un nombre para la instancia del flanco, usualmente una variable STAT
# Podríamos generarla automáticamente o requerirla en el XML.
# Generación automática:
stat_var_name = f"stat_{network_id}_{instr_uid}_ptrig"
stat_var_name_scl = format_variable_name(f'"{stat_var_name}"') # Poner comillas
# La generación SCL real depende de si usamos una función o lógica explícita.
# Usando función estándar (requiere declarar la instancia 'stat_var_name_scl' como P_TRIG en VAR_STAT):
# result_scl = f"{stat_var_name_scl}(CLK := {clk_signal_formatted})" # La función devuelve Q
# scl_comment = f"// P_TRIG {instr_uid}: {result_scl}"
# instruction["scl"] = f"{stat_var_name_scl}(CLK := {clk_signal_formatted}); // Generates edge pulse" # La llamada en sí
# result_scl = f"{stat_var_name_scl}.Q" # La salida es el pin Q de la instancia
# Usando lógica explícita (más portable si no se declaran instancias):
result_scl = f"{clk_signal_formatted} AND NOT {mem_bit_scl_formatted}"
scl_comment = f"// P_TRIG Logic {instr_uid}: {result_scl}"
# La actualización del bit de memoria se hace aparte
instruction["scl"] = ( instruction["scl"] = (
f"{mem_bit_scl_formatted} := {clk_signal_formatted}; // Update edge memory bit" f"// ERROR: Tipo de flanco inesperado {instr_type_original}"
) )
instruction["type"] = instr_type_original + "_error"
return True
else: # Si no es P_TRIG (ej. N_TRIG o simplemente pasando el bit) # La actualización del bit de memoria es igual para P_TRIG y N_TRIG estándar
# Aquí asumimos que es N_TRIG si tiene TemplateValue "Negated" o similar scl_mem_update = f"{stat_mem_bit_scl} := {clk_scl_formatted};"
is_negated_pbox = (
instruction.get("template_values", {}).get("Negated") == "true"
)
if is_negated_pbox:
# Lógica N_TRIG explícita
clk_signal_formatted = rlo_scl # Asumimos que CLK se infiere igual
if clk_signal_formatted is None:
return False # Esperar CLK
if ( # --- Almacenar Resultados ---
" " in clk_signal_formatted # El pulso resultante va al mapa SCL para que lo usen las instrucciones siguientes
or "AND" in clk_signal_formatted
or "OR" in clk_signal_formatted
) and not (
clk_signal_formatted.startswith("(")
and clk_signal_formatted.endswith(")")
):
clk_signal_formatted = f"({clk_signal_formatted})"
result_scl = f"NOT {clk_signal_formatted} AND {mem_bit_scl_formatted}"
scl_comment = f"// N_TRIG Logic {instr_uid}: {result_scl}"
instruction["scl"] = (
f"{mem_bit_scl_formatted} := {clk_signal_formatted}; // Update edge memory bit"
)
else:
# Comportamiento por defecto: pasar el bit de memoria (raro para PBox)
print(
f"Advertencia: PBox {instr_uid} no parece ser P_TRIG ni N_TRIG. Pasando bit de memoria."
)
result_scl = mem_bit_scl_formatted
scl_comment = f"// PBox {instr_uid} - Passing memory bit: {result_scl}"
instruction["scl"] = f"// {scl_comment}" # Solo comentario
# Actualizar el mapa SCL con el resultado booleano del flanco/paso
map_key_out = (network_id, instr_uid, "out") map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = result_scl scl_map[map_key_out] = result_scl
# ENO sigue al CLK (si existe) o es TRUE por defecto? Asumimos que sigue al CLK si es P/N_TRIG # La actualización de memoria es la acción principal de esta instrucción en SCL
map_key_eno = (network_id, instr_uid, "eno") instruction["scl"] = f"{scl_mem_update} {scl_comment}"
scl_map[map_key_eno] = rlo_scl if rlo_scl is not None else "TRUE" instruction["type"] = instr_type_original + SCL_SUFFIX
# El pin ENO normalmente sigue al CLK en los bloques de flanco estándar
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = clk_scl # Usar clk_scl original sin formateo extra
instruction["type"] = instr_type + SCL_SUFFIX
return True return True
@ -1238,25 +1108,26 @@ def process_json_to_scl(json_filepath):
process_eq, process_eq,
process_contact, process_contact,
process_o, process_o,
process_pbox, process_edge_detector, # Usar la nueva función unificada
process_add, process_add,
process_move, process_move,
process_call, process_call,
process_coil, process_coil,
# Añadir aquí nuevos procesadores base para otros tipos de instrucciones # ... otros procesadores base ...
] ]
# Crear mapa por nombre de tipo original (en minúsculas) # Crear mapa por nombre de tipo original (en minúsculas)
processor_map = {} processor_map = {}
for func in base_processors: for func in base_processors:
# Extraer tipo del nombre de la función (p.ej., 'process_contact' -> 'contact')
match = re.match(r"process_(\w+)", func.__name__) match = re.match(r"process_(\w+)", func.__name__)
if match: if match:
type_name = match.group(1).lower() type_name = match.group(1).lower()
# Manejar casos especiales como 'call' que puede ser FC o FB
if type_name == "call": if type_name == "call":
processor_map["call_fc"] = func processor_map["call_fc"] = func
processor_map["call_fb"] = func processor_map["call_fb"] = func
processor_map["call"] = func # Genérico por si acaso processor_map["call"] = func
elif type_name == "edge_detector": # Mapear PBox y NBox a la nueva función
processor_map["pbox"] = func
processor_map["nbox"] = func
else: else:
processor_map[type_name] = func processor_map[type_name] = func
@ -1308,20 +1179,10 @@ def process_json_to_scl(json_filepath):
if func_to_call: if func_to_call:
try: try:
changed = False
# Pasar la lista de lógica completa solo si es necesario (PBox) # Pasar la lista de lógica completa solo si es necesario (PBox)
if func_to_call == process_pbox: changed = func_to_call(
changed = func_to_call( instruction, network_id, scl_map, access_map
instruction, )
network_id,
scl_map,
access_map,
network_logic,
)
else:
changed = func_to_call(
instruction, network_id, scl_map, access_map
)
if changed: if changed:
made_change_in_base_pass = True made_change_in_base_pass = True

508
xcopy.py Normal file
View File

@ -0,0 +1,508 @@
# -*- coding: utf-8 -*-
import json
import os
import copy
import traceback
import re
# --- Constantes y Configuración ---
SCL_SUFFIX = "_scl"
GROUPED_COMMENT = "// Logic included in grouped IF"
# Global data variable
data = {}
# --- Helper Functions ---
# (get_scl_representation, format_variable_name, generate_temp_var_name, get_target_scl_name - sin cambios)
def get_scl_representation(source_info, network_id, scl_map, access_map):
# ... (código sin cambios)
if not source_info:
return None
if isinstance(source_info, list):
scl_parts = []
all_resolved = True
for sub_source in source_info:
sub_scl = get_scl_representation(
sub_source, network_id, scl_map, access_map
)
if sub_scl is None:
all_resolved = False
break
if (
sub_scl in ["TRUE", "FALSE"]
or (sub_scl.startswith('"') and sub_scl.endswith('"'))
or sub_scl.isdigit()
or (sub_scl.startswith("(") and sub_scl.endswith(")"))
):
scl_parts.append(sub_scl)
else:
scl_parts.append(f"({sub_scl})")
return (
" OR ".join(scl_parts)
if len(scl_parts) > 1
else (scl_parts[0] if scl_parts else "FALSE") if all_resolved else None
)
source_type = source_info.get("type")
if source_type == "powerrail":
return "TRUE"
elif source_type == "variable":
name = source_info.get("name")
return (
format_variable_name(name) # Asegura formato correcto aquí también
if name
else f"_ERR_VAR_NO_NAME_{source_info.get('uid')}_"
)
elif source_type == "constant":
dtype = str(source_info.get("datatype", "")).upper()
value = source_info.get("value")
try:
if dtype == "BOOL": return str(value).upper()
elif dtype in ["INT", "DINT", "SINT", "USINT", "UINT", "UDINT", "LINT", "ULINT", "WORD", "DWORD", "LWORD", "BYTE"]: return str(value)
elif dtype in ["REAL", "LREAL"]: s_val = str(value); return s_val if "." in s_val or "e" in s_val.lower() else s_val + ".0"
elif dtype == "STRING": str_val = str(value).replace("'", "''"); return f"'{str_val}'"
elif dtype == "TYPEDCONSTANT": return str(value) # Ej: T#5s
else: str_val = str(value).replace("'", "''"); return f"'{str_val}'"
except Exception as e: print(f"Advertencia: Error formateando constante {source_info}: {e}"); return f"_ERR_CONST_FORMAT_{source_info.get('uid')}_"
elif source_type == "connection":
map_key = (network_id, source_info.get("source_instruction_uid"), source_info.get("source_pin"))
return scl_map.get(map_key)
elif source_type == "unknown_source": print(f"Advertencia: Refiriendo a fuente desconocida UID: {source_info.get('uid')}"); return f"_ERR_UNKNOWN_SRC_{source_info.get('uid')}_"
else: print(f"Advertencia: Tipo de fuente desconocido: {source_info}"); return f"_ERR_INVALID_SRC_TYPE_"
def format_variable_name(name):
# ... (código sin cambios)
if not name: return "_INVALID_NAME_"
if name.startswith('"') and name.endswith('"'): return name
prefix = "";
if name.startswith("#"): prefix = "#"; name = name[1:]
if name and name[0].isdigit(): name = "_" + name
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
def generate_temp_var_name(network_id, instr_uid, pin_name):
# ... (código sin cambios)
net_id_clean = str(network_id).replace("-", "_")
instr_uid_clean = str(instr_uid).replace("-", "_")
pin_name_clean = str(pin_name).replace("-", "_").lower()
return f"#_temp_{net_id_clean}_{instr_uid_clean}_{pin_name_clean}"
def get_target_scl_name(instruction, output_pin_name, network_id, default_to_temp=True):
# ... (código sin cambios)
instr_uid = instruction["instruction_uid"]
output_pin_data = instruction["outputs"].get(output_pin_name)
target_scl = None
if output_pin_data and isinstance(output_pin_data, list) and len(output_pin_data) == 1:
dest_access = output_pin_data[0]
if dest_access.get("type") == "variable":
target_scl = dest_access.get("name")
if target_scl: target_scl = format_variable_name(target_scl)
else: print(f"Error: Var destino {instr_uid}.{output_pin_name} sin nombre (UID: {dest_access.get('uid')}). {'Usando temp.' if default_to_temp else 'Ignorando.'}"); target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) if default_to_temp else None
elif dest_access.get("type") == "constant": print(f"Advertencia: Instr {instr_uid} escribe en const UID {dest_access.get('uid')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"); target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) if default_to_temp else None
else: print(f"Advertencia: Destino {instr_uid}.{output_pin_name} no es var/const: {dest_access.get('type')}. {'Usando temp.' if default_to_temp else 'Ignorando.'}"); target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name) if default_to_temp else None
elif default_to_temp: target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
if target_scl is None and not default_to_temp: return None
if target_scl is None and default_to_temp: target_scl = generate_temp_var_name(network_id, instr_uid, output_pin_name)
return target_scl
# --- Procesadores de Instrucciones ---
# (process_contact, process_eq, process_coil, process_convert, process_mod,
# process_add, process_move, process_o, process_call - sin cambios significativos)
# ... (resto de procesadores base aquí) ...
def process_contact(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
is_negated = instruction.get("negated_pins", {}).get("operand", False)
in_input = instruction["inputs"].get("in")
in_rlo_scl = "TRUE" if in_input is None else get_scl_representation(in_input, network_id, scl_map, access_map)
operand_scl = get_scl_representation(instruction["inputs"].get("operand"), network_id, scl_map, access_map)
if in_rlo_scl is None or operand_scl is None: return False
term = f"NOT {operand_scl}" if is_negated else operand_scl
if not (term.startswith('"') and term.endswith('"')):
if is_negated or (" " in term and not (term.startswith("(") and term.endswith(")"))): term = f"({term})"
new_rlo_scl = term if in_rlo_scl == "TRUE" else (f"({in_rlo_scl}) AND {term}" if ("AND" in in_rlo_scl or "OR" in in_rlo_scl) and not (in_rlo_scl.startswith("(") and in_rlo_scl.endswith(")")) else f"{in_rlo_scl} AND {term}")
map_key = (network_id, instr_uid, "out"); scl_map[map_key] = new_rlo_scl
instruction["scl"] = f"// RLO: {new_rlo_scl}"; instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_eq(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
in1_info = instruction["inputs"].get("in1"); in2_info = instruction["inputs"].get("in2")
in1_scl = get_scl_representation(in1_info, network_id, scl_map, access_map); in2_scl = get_scl_representation(in2_info, network_id, scl_map, access_map)
if in1_scl is None or in2_scl is None: return False
op1 = format_variable_name(in1_scl) if in1_info and in1_info.get("type") == "variable" else in1_scl
op2 = format_variable_name(in2_scl) if in2_info and in2_info.get("type") == "variable" else in2_scl
op1 = f"({op1})" if " " in op1 and not op1.startswith("(") else op1; op2 = f"({op2})" if " " in op2 and not op2.startswith("(") else op2
comparison_scl = f"{op1} = {op2}"; map_key_out = (network_id, instr_uid, "out"); scl_map[map_key_out] = comparison_scl
pre_input = instruction["inputs"].get("pre"); pre_scl = "TRUE" if pre_input is None else get_scl_representation(pre_input, network_id, scl_map, access_map)
if pre_scl is None: return False
map_key_eno = (network_id, instr_uid, "eno"); scl_map[map_key_eno] = pre_scl
instruction["scl"] = f"// Comparison Eq {instr_uid}: {comparison_scl}"; instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_coil(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
in_rlo_scl = get_scl_representation(instruction["inputs"].get("in"), network_id, scl_map, access_map)
operand_info = instruction["inputs"].get("operand"); operand_scl = get_scl_representation(operand_info, network_id, scl_map, access_map)
if in_rlo_scl is None or operand_scl is None: return False
if not (operand_info and operand_info.get("type") == "variable"): print(f"Error: Operando COIL {instr_uid} no es variable o falta información."); instruction["scl"] = f"// ERROR: Coil {instr_uid} operando no es variable o falta info"; instruction["type"] = instr_type + "_error"; return True
operand_scl_formatted = format_variable_name(operand_scl)
if in_rlo_scl == "(TRUE)": in_rlo_scl = "TRUE";
elif in_rlo_scl == "(FALSE)": in_rlo_scl = "FALSE"
scl_final = f"{operand_scl_formatted} := {in_rlo_scl};"; instruction["scl"] = scl_final; instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_convert(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"];
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
en_input = instruction["inputs"].get("en"); en_scl = get_scl_representation(en_input, network_id, scl_map, access_map) if en_input else "TRUE"
in_info = instruction["inputs"].get("in"); in_scl = get_scl_representation(in_info, network_id, scl_map, access_map)
if en_scl is None or in_scl is None: return False
target_scl = get_target_scl_name(instruction, "out", network_id, default_to_temp=True)
if target_scl is None: print(f"Error: Sin destino claro para CONVERT {instr_uid}"); instruction["scl"] = f"// ERROR: Convert {instr_uid} sin destino"; instruction["type"] += "_error"; return True
in_scl_formatted = format_variable_name(in_scl) if in_info and in_info.get("type") == "variable" else in_scl
conversion_expr = in_scl_formatted # Simplificación, asume asignación directa
scl_core = f"{target_scl} := {conversion_expr};"
scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
instruction["scl"] = scl_final; instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out"); scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno"); scl_map[map_key_eno] = en_scl
return True
def process_mod(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"];
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
en_input = instruction["inputs"].get("en"); en_scl = get_scl_representation(en_input, network_id, scl_map, access_map) if en_input else "TRUE"
in1_info = instruction["inputs"].get("in1"); in2_info = instruction["inputs"].get("in2")
in1_scl = get_scl_representation(in1_info, network_id, scl_map, access_map); in2_scl = get_scl_representation(in2_info, network_id, scl_map, access_map)
if en_scl is None or in1_scl is None or in2_scl is None: return False
target_scl = get_target_scl_name(instruction, "out", network_id, default_to_temp=True)
if target_scl is None: print(f"Error: Sin destino MOD {instr_uid}"); instruction["scl"] = f"// ERROR: Mod {instr_uid} sin destino"; instruction["type"] += "_error"; return True
op1 = format_variable_name(in1_scl) if in1_info and in1_info.get("type") == "variable" else in1_scl
op2 = format_variable_name(in2_scl) if in2_info and in2_info.get("type") == "variable" else in2_scl
op1 = f"({op1})" if " " in op1 and not op1.startswith("(") else op1; op2 = f"({op2})" if " " in op2 and not op2.startswith("(") else op2
scl_core = f"{target_scl} := {op1} MOD {op2};"; scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
instruction["scl"] = scl_final; instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out"); scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno"); scl_map[map_key_eno] = en_scl
return True
def process_add(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
en_input = instruction["inputs"].get("en"); en_scl = get_scl_representation(en_input, network_id, scl_map, access_map) if en_input else "TRUE"
in1_info = instruction["inputs"].get("in1"); in2_info = instruction["inputs"].get("in2")
in1_scl = get_scl_representation(in1_info, network_id, scl_map, access_map); in2_scl = get_scl_representation(in2_info, network_id, scl_map, access_map)
if en_scl is None or in1_scl is None or in2_scl is None: return False
target_scl = get_target_scl_name(instruction, "out", network_id, default_to_temp=True)
if target_scl is None: print(f"Error: Sin destino ADD {instr_uid}"); instruction["scl"] = f"// ERROR: Add {instr_uid} sin destino"; instruction["type"] += "_error"; return True
op1 = format_variable_name(in1_scl) if in1_info and in1_info.get("type") == "variable" else in1_scl
op2 = format_variable_name(in2_scl) if in2_info and in2_info.get("type") == "variable" else in2_scl
op1 = f"({op1})" if " " in op1 and not op1.startswith("(") else op1; op2 = f"({op2})" if " " in op2 and not op2.startswith("(") else op2
scl_core = f"{target_scl} := {op1} + {op2};"; scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
instruction["scl"] = scl_final; instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out"); scl_map[map_key_out] = target_scl
map_key_eno = (network_id, instr_uid, "eno"); scl_map[map_key_eno] = en_scl
return True
def process_move(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
en_input = instruction["inputs"].get("en"); en_scl = get_scl_representation(en_input, network_id, scl_map, access_map) if en_input else "TRUE"
in_info = instruction["inputs"].get("in"); in_scl = get_scl_representation(in_info, network_id, scl_map, access_map)
if en_scl is None or in_scl is None: return False
target_scl = get_target_scl_name(instruction, "out1", network_id, default_to_temp=False)
if target_scl is None: target_scl = get_target_scl_name(instruction, "out", network_id, default_to_temp=False)
if target_scl is None: print(f"Advertencia/Error: MOVE {instr_uid} sin destino claro en 'out' o 'out1'."); return False
in_scl_formatted = format_variable_name(in_scl) if in_info and in_info.get("type") == "variable" else in_scl
scl_core = f"{target_scl} := {in_scl_formatted};"; scl_final = f"IF {en_scl} THEN\n {scl_core}\nEND_IF;" if en_scl != "TRUE" else scl_core
instruction["scl"] = scl_final; instruction["type"] = instr_type + SCL_SUFFIX
map_key_out = (network_id, instr_uid, "out"); scl_map[map_key_out] = target_scl
map_key_out1 = (network_id, instr_uid, "out1"); scl_map[map_key_out1] = target_scl
map_key_eno = (network_id, instr_uid, "eno"); scl_map[map_key_eno] = en_scl
return True
# --- NUEVA FUNCIÓN UNIFICADA para PBox y NBox ---
def process_edge_detector(instruction, network_id, scl_map, access_map):
"""Genera SCL para PBox (P_TRIG) o NBox (N_TRIG)."""
instr_uid = instruction["instruction_uid"]
instr_type_original = instruction["type"] # PBox o NBox
if instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original:
return False
# Obtener CLK (señal de entrada) y MemBit (bit de memoria)
clk_input = instruction["inputs"].get("in")
mem_bit_input = instruction["inputs"].get("bit")
clk_scl = get_scl_representation(clk_input, network_id, scl_map, access_map)
mem_bit_scl_original = get_scl_representation(mem_bit_input, network_id, scl_map, access_map)
if clk_scl is None or mem_bit_scl_original is None:
# print(f"DEBUG Edge: Esperando dependencias para {instr_type_original} UID {instr_uid}")
return False # Dependencias no listas
# Validar que el bit de memoria sea una variable
if not (mem_bit_input and mem_bit_input.get("type") == "variable"):
print(f"Error: {instr_type_original} {instr_uid} 'bit' no es variable o falta información.")
instruction["scl"] = f"// ERROR: {instr_type_original} {instr_uid} 'bit' no es variable."
instruction["type"] = instr_type_original + "_error"
return True # Procesado con error
# --- Renombrar bit de memoria para VAR_STAT ---
# Quitar comillas existentes, añadir prefijo "stat_" y volver a añadir comillas
mem_bit_name_clean = mem_bit_scl_original.strip('"')
stat_mem_bit_scl = f'"stat_{mem_bit_name_clean}"' # Nombre SCL para la variable estática
# Asegurar paréntesis alrededor de CLK si es complejo
clk_scl_formatted = clk_scl
if (' ' in clk_scl or 'AND' in clk_scl or 'OR' in clk_scl) and not (clk_scl.startswith('(') and clk_scl.endswith(')')):
clk_scl_formatted = f"({clk_scl})"
# --- Generar Lógica SCL ---
result_scl = "FALSE" # SCL para la salida del flanco (pin 'out')
scl_comment = "" # Comentario informativo
if instr_type_original == "PBox": # Flanco Positivo (P_TRIG)
result_scl = f"{clk_scl_formatted} AND NOT {stat_mem_bit_scl}"
scl_comment = f"// P_TRIG: {result_scl}"
elif instr_type_original == "NBox": # Flanco Negativo (N_TRIG)
result_scl = f"NOT {clk_scl_formatted} AND {stat_mem_bit_scl}"
scl_comment = f"// N_TRIG: {result_scl}"
else:
# No debería ocurrir si el mapeo de procesadores es correcto
print(f"Error interno: process_edge_detector llamado para tipo inesperado {instr_type_original}")
instruction["scl"] = f"// ERROR: Tipo de flanco inesperado {instr_type_original}"
instruction["type"] = instr_type_original + "_error"
return True
# La actualización del bit de memoria es igual para P_TRIG y N_TRIG estándar
scl_mem_update = f"{stat_mem_bit_scl} := {clk_scl_formatted};"
# --- Almacenar Resultados ---
# El pulso resultante va al mapa SCL para que lo usen las instrucciones siguientes
map_key_out = (network_id, instr_uid, "out")
scl_map[map_key_out] = result_scl
# La actualización de memoria es la acción principal de esta instrucción en SCL
instruction["scl"] = f"{scl_mem_update} {scl_comment}"
instruction["type"] = instr_type_original + SCL_SUFFIX
# El pin ENO normalmente sigue al CLK en los bloques de flanco estándar
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = clk_scl # Usar clk_scl original sin formateo extra
return True
# --- FIN NUEVA FUNCIÓN UNIFICADA ---
def process_o(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"]
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
input_pins = sorted([pin for pin in instruction["inputs"] if pin.startswith("in")])
if not input_pins: print(f"Error: O {instr_uid} sin pines de entrada (inX)."); instruction["scl"] = f"// ERROR: O {instr_uid} sin pines inX"; instruction["type"] += "_error"; return True
scl_parts = []; all_resolved = True
for pin in input_pins:
in_scl = get_scl_representation(instruction["inputs"][pin], network_id, scl_map, access_map)
if in_scl is None: all_resolved = False; break
term = in_scl;
if (" " in term or "AND" in term) and not (term.startswith("(") and term.endswith(")")): term = f"({term})"
scl_parts.append(term)
if not all_resolved: return False
result_scl = "FALSE";
if scl_parts:
result_scl = " OR ".join(scl_parts)
if len(scl_parts) == 1: result_scl = scl_parts[0]
map_key_out = (network_id, instr_uid, "out"); scl_map[map_key_out] = result_scl
instruction["scl"] = f"// Logic O {instr_uid}: {result_scl}"; instruction["type"] = instr_type + SCL_SUFFIX
return True
def process_call(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction.get("type", "")
if instr_type.endswith(SCL_SUFFIX) or "_error" in instr_type: return False
block_name = instruction.get("block_name", f"UnknownCall_{instr_uid}"); block_type = instruction.get("block_type")
instance_db = instruction.get("instance_db"); instance_db_scl = format_variable_name(instance_db) if instance_db else None
block_name_scl = format_variable_name(block_name)
en_input = instruction["inputs"].get("en"); en_scl = get_scl_representation(en_input, network_id, scl_map, access_map) if en_input else "TRUE"
if en_scl is None: return False
scl_call_params = []; processed_inputs = {"en"}
for pin_name, source_info in instruction.get("inputs", {}).items():
if pin_name not in processed_inputs:
param_scl = get_scl_representation(source_info, network_id, scl_map, access_map)
if param_scl is None: return False
param_scl_formatted = format_variable_name(param_scl) if source_info.get("type") == "variable" else param_scl
scl_call_params.append(f"{format_variable_name(pin_name)} := {param_scl_formatted}")
processed_inputs.add(pin_name)
scl_call_body = ""; param_string = ", ".join(scl_call_params)
if block_type == "FB":
if not instance_db_scl: print(f"Error: Llamada a FB '{block_name_scl}' (UID {instr_uid}) sin DB de instancia especificado."); instruction["scl"] = f"// ERROR: FB Call {block_name_scl} sin instancia"; instruction["type"] = "Call_FB_error"; return True
scl_call_body = f"{instance_db_scl}({param_string});"
elif block_type == "FC": scl_call_body = f"{block_name_scl}({param_string});"
else: print(f"Advertencia: Tipo de bloque no soportado para Call UID {instr_uid}: {block_type}"); scl_call_body = f"// ERROR: Call a bloque tipo '{block_type}' no soportado: {block_name_scl}"; instruction["type"] = f"Call_{block_type}_error"
scl_final = "";
if en_scl != "TRUE": indented_call = "\\n".join([f" {line}" for line in scl_call_body.splitlines()]); scl_final = f"IF {en_scl} THEN\\n{indented_call}\\nEND_IF;"
else: scl_final = scl_call_body
instruction["scl"] = scl_final; instruction["type"] = f"Call_{block_type}_scl" if "_error" not in instruction["type"] else instruction["type"]
map_key_eno = (network_id, instr_uid, "eno"); scl_map[map_key_eno] = en_scl
return True
# --- Procesador de Agrupación (Sin cambios) ---
def process_group_ifs(instruction, network_id, scl_map, access_map):
# ... (código sin cambios) ...
instr_uid = instruction["instruction_uid"]; instr_type = instruction["type"]; instr_type_original = instr_type.replace("_scl", "").replace("_error", "")
made_change = False
if (not instr_type.endswith("_scl") or "_error" in instr_type or instruction.get("grouped", False) or instr_type_original not in ["Contact", "O", "Eq", "Ne", "Gt", "Lt", "Ge", "Le", "PBox", "NBox", "And", "Xor"]): return False # Añadido NBox aquí
current_scl = instruction.get("scl", "")
if (current_scl.strip().startswith("IF") and "END_IF;" in current_scl) or (current_scl.strip().startswith("//") and "IF" in current_scl): return False
map_key_out = (network_id, instr_uid, "out"); condition_scl = scl_map.get(map_key_out)
if condition_scl is None or condition_scl in ["TRUE", "FALSE"]: return False
grouped_instructions_cores = []; consumer_instr_list = []
network_logic = next((net["logic"] for net in data["networks"] if net["id"] == network_id), []);
if not network_logic: return False
groupable_types = ["Move", "Add", "Sub", "Mul", "Div", "Mod", "Convert", "Call_FC", "Call_FB"]
for consumer_instr in network_logic:
consumer_uid = consumer_instr["instruction_uid"]
if consumer_instr.get("grouped", False) or consumer_uid == instr_uid: continue
consumer_en = consumer_instr.get("inputs", {}).get("en"); consumer_type = consumer_instr.get("type", ""); consumer_type_original = consumer_type.replace("_scl", "").replace("_error", "")
is_enabled_by_us = False
if (isinstance(consumer_en, dict) and consumer_en.get("type") == "connection" and consumer_en.get("source_instruction_uid") == instr_uid and consumer_en.get("source_pin") == "out"): is_enabled_by_us = True
if (is_enabled_by_us and consumer_type.endswith("_scl") and consumer_type_original in groupable_types):
consumer_scl = consumer_instr.get("scl", ""); core_scl = None
if consumer_scl.strip().startswith("IF"):
match = re.search(r"IF\\s+.*\\s+THEN\\s*(.*?)\\s*END_IF;", consumer_scl, re.DOTALL | re.IGNORECASE)
if match: core_scl = match.group(1).strip()
elif consumer_scl and not consumer_scl.strip().startswith("//"): core_scl = consumer_scl.strip()
if core_scl: grouped_instructions_cores.append(core_scl); consumer_instr_list.append(consumer_instr)
if len(grouped_instructions_cores) > 1:
print(f"INFO: Agrupando {len(grouped_instructions_cores)} instrucciones bajo condición de {instr_type_original} UID {instr_uid} (Cond: {condition_scl})")
scl_grouped = [f"IF {condition_scl} THEN"]
for core_line in grouped_instructions_cores: indented_core = "\\n".join([f" {line.strip()}" for line in core_line.splitlines()]); scl_grouped.append(indented_core)
scl_grouped.append("END_IF;"); final_grouped_scl = "\\n".join(scl_grouped)
instruction["scl"] = final_grouped_scl
for consumer_instr in consumer_instr_list: consumer_instr["scl"] = f"{GROUPED_COMMENT} (by UID {instr_uid})"; consumer_instr["grouped"] = True
made_change = True
return made_change
# --- Bucle Principal de Procesamiento ---
def process_json_to_scl(json_filepath):
# ... (Inicio sin cambios) ...
if not os.path.exists(json_filepath): print(f"Error: JSON no encontrado: {json_filepath}"); return
print(f"Cargando JSON desde: {json_filepath}")
try: global data; data = json.load(f)
except Exception as e: print(f"Error al cargar JSON: {e}"); traceback.print_exc(); return
network_access_maps = {}
for network in data.get("networks", []):
net_id = network["id"]; current_access_map = {}
for instr in network.get("logic", []):
for _, source in instr.get("inputs", {}).items():
sources_to_check = source if isinstance(source, list) else ([source] if isinstance(source, dict) else [])
for src in sources_to_check:
if isinstance(src, dict) and src.get("uid") and src.get("type") in ["variable", "constant"]: current_access_map[src["uid"]] = src
for _, dest_list in instr.get("outputs", {}).items():
if isinstance(dest_list, list):
for dest in dest_list:
if isinstance(dest, dict) and dest.get("uid") and dest.get("type") in ["variable", "constant"]: current_access_map[dest["uid"]] = dest
network_access_maps[net_id] = current_access_map
scl_map = {}
max_passes = 30; passes = 0; processing_complete = False
# --- MODIFICACIÓN: Añadir process_edge_detector al mapa ---
base_processors = [
process_convert, process_mod, process_eq, process_contact, process_o,
process_edge_detector, # Usar la nueva función unificada
process_add, process_move, process_call, process_coil,
# ... otros procesadores base ...
]
processor_map = {}
for func in base_processors:
match = re.match(r"process_(\w+)", func.__name__)
if match:
type_name = match.group(1).lower()
if type_name == "call":
processor_map["call_fc"] = func; processor_map["call_fb"] = func; processor_map["call"] = func
elif type_name == "edge_detector": # Mapear PBox y NBox a la nueva función
processor_map["pbox"] = func
processor_map["nbox"] = func
else: processor_map[type_name] = func
# --- FIN MODIFICACIÓN ---
print("\\n--- Iniciando Bucle de Procesamiento Iterativo ---")
while passes < max_passes and not processing_complete:
# ... (Lógica del bucle iterativo sin cambios,
# ahora usará process_edge_detector para PBox y NBox
# a través del processor_map actualizado) ...
passes += 1; made_change_in_base_pass = False; made_change_in_group_pass = False
print(f"\\n--- Pase {passes} ---"); num_processed_this_pass = 0; num_grouped_this_pass = 0
for network in data.get("networks", []):
network_id = network["id"]; access_map = network_access_maps.get(network_id, {})
network_logic = network.get("logic", [])
for instruction in network_logic:
instr_uid = instruction.get("instruction_uid"); instr_type_original = instruction.get("type", "Unknown")
if (instr_type_original.endswith(SCL_SUFFIX) or "_error" in instr_type_original or instruction.get("grouped", False)): continue
lookup_key = instr_type_original.lower()
if instr_type_original == "Call":
block_type = instruction.get("block_type", "").upper()
if block_type == "FC": lookup_key = "call_fc"
elif block_type == "FB": lookup_key = "call_fb"
func_to_call = processor_map.get(lookup_key)
if func_to_call:
try:
# No necesita pasar network_logic excepto para la versión anterior de PBox
changed = func_to_call(instruction, network_id, scl_map, access_map)
if changed: made_change_in_base_pass = True; num_processed_this_pass += 1
except Exception as e: print(f"ERROR(Base) al procesar {instr_type_original} UID {instr_uid} con {func_to_call.__name__}: {e}"); traceback.print_exc(); instruction["scl"] = f"// ERROR en procesador base: {e}"; instruction["type"] = instr_type_original + "_error"; made_change_in_base_pass = True
if made_change_in_base_pass or passes == 1:
for network in data.get("networks", []):
network_id = network["id"]; access_map = network_access_maps.get(network_id, {}); network_logic = network.get("logic", [])
for instruction in network_logic:
if instruction["type"].endswith("_scl") and not instruction.get("grouped", False):
try:
group_changed = process_group_ifs(instruction, network_id, scl_map, access_map)
if group_changed: made_change_in_group_pass = True; num_grouped_this_pass += 1
except Exception as e: print(f"ERROR(Group) al intentar agrupar desde UID {instruction.get('instruction_uid')}: {e}"); traceback.print_exc()
if not made_change_in_base_pass and not made_change_in_group_pass: print(f"\\n--- No se hicieron más cambios en el pase {passes}. Proceso iterativo completado. ---"); processing_complete = True
else: print(f"--- Fin Pase {passes}: {num_processed_this_pass} procesados, {num_grouped_this_pass} agrupados. Continuando...")
if passes == max_passes and not processing_complete: print(f"\\n--- ADVERTENCIA: Límite de {max_passes} pases alcanzado. Puede haber dependencias no resueltas. ---")
# --- Verificación Final y Guardado (Sin cambios) ---
print("\\n--- Verificación Final de Instrucciones No Procesadas ---"); unprocessed_count = 0; unprocessed_details = []
for network in data.get("networks", []):
network_id = network.get("id", "Unknown ID"); network_title = network.get("title", f"Network {network_id}")
for instruction in network.get("logic", []):
instr_uid = instruction.get("instruction_uid", "Unknown UID"); instr_type = instruction.get("type", "Unknown Type"); is_grouped = instruction.get("grouped", False)
if (not instr_type.endswith(SCL_SUFFIX) and "_error" not in instr_type and not is_grouped):
unprocessed_count += 1; unprocessed_details.append(f" - Red '{network_title}' (ID: {network_id}), Instrucción UID: {instr_uid}, Tipo Original: '{instr_type}'")
if unprocessed_count > 0: print(f"ADVERTENCIA: Se encontraron {unprocessed_count} instrucciones que no pudieron ser procesadas a SCL:");
for detail in unprocessed_details: print(detail)
print(">>> Estos tipos de instrucción podrían necesitar un procesador específico en 'x2_process.py'.")
else: print("INFO: Todas las instrucciones fueron procesadas a SCL, marcadas como error o agrupadas exitosamente.")
output_filename = json_filepath.replace("_simplified.json", "_simplified_processed.json")
print(f"\\nGuardando JSON procesado en: {output_filename}")
try:
with open(output_filename, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False)
print("Guardado completado.")
except Exception as e: print(f"Error Crítico al guardar JSON procesado: {e}"); traceback.print_exc()
# --- Ejecución ---
if __name__ == "__main__":
# Asegurarse de usar el nombre base del XML de prueba
xml_filename_base = "TestLAD" # <--- CAMBIADO PARA USAR TestLAD
input_json_file = f"{xml_filename_base}_simplified.json"
if not os.path.exists(input_json_file): print(f"Error Fatal: El archivo de entrada JSON simplificado no existe: '{input_json_file}'"); print("Asegúrate de haber ejecutado 'x1_to_json.py' primero sobre el archivo XML correcto.")
else: process_json_to_scl(input_json_file)