Intentando con FC mas complejas

This commit is contained in:
Miguel 2025-04-18 18:12:33 +02:00
parent 48467ac7ea
commit 3f3dd1ed29
11 changed files with 18567 additions and 1378 deletions

View File

@ -16,13 +16,41 @@
"id": "9", "id": "9",
"title": "PID Reset Integral", "title": "PID Reset Integral",
"comment": "", "comment": "",
"logic": [] "logic": [
{
"instruction_uid": "21",
"uid": "21",
"type": "Call",
"block_name": "BlenderPID_PIDResInteg",
"block_type": "FC",
"inputs": {
"en": {
"type": "powerrail"
}
},
"outputs": {}
}
]
}, },
{ {
"id": "1A", "id": "1A",
"title": "Ctrl Init Errors", "title": "Ctrl Init Errors",
"comment": "", "comment": "",
"logic": [] "logic": [
{
"instruction_uid": "21",
"uid": "21",
"type": "Call",
"block_name": "BlenderCtrl_InitErrors",
"block_type": "FC",
"inputs": {
"en": {
"type": "powerrail"
}
},
"outputs": {}
}
]
}, },
{ {
"id": "2B", "id": "2B",
@ -31,7 +59,11 @@
"logic": [ "logic": [
{ {
"instruction_uid": "23", "instruction_uid": "23",
"uid": "23",
"type": "Move", "type": "Move",
"template_values": {
"Card": "Cardinality"
},
"inputs": { "inputs": {
"en": { "en": {
"type": "powerrail" "type": "powerrail"

View File

@ -0,0 +1,96 @@
{
"block_name": "BlenderCtrl_ProdModeInit",
"block_number": 2012,
"language": "LAD",
"block_comment": "",
"interface": {
"Return": [
{
"name": "Ret_Val",
"datatype": "Void"
}
]
},
"networks": [
{
"id": "9",
"title": "PID Reset Integral",
"comment": "",
"logic": [
{
"instruction_uid": "21",
"uid": "21",
"type": "Call_FC_scl",
"block_name": "BlenderPID_PIDResInteg",
"block_type": "FC",
"inputs": {
"en": {
"type": "powerrail"
}
},
"outputs": {},
"scl": "BlenderPID_PIDResInteg();"
}
]
},
{
"id": "1A",
"title": "Ctrl Init Errors",
"comment": "",
"logic": [
{
"instruction_uid": "21",
"uid": "21",
"type": "Call_FC_scl",
"block_name": "BlenderCtrl_InitErrors",
"block_type": "FC",
"inputs": {
"en": {
"type": "powerrail"
}
},
"outputs": {},
"scl": "BlenderCtrl_InitErrors();"
}
]
},
{
"id": "2B",
"title": "RunOut Counter",
"comment": "",
"logic": [
{
"instruction_uid": "23",
"uid": "23",
"type": "Move_scl",
"template_values": {
"Card": "Cardinality"
},
"inputs": {
"en": {
"type": "powerrail"
},
"in": {
"uid": "21",
"scope": "LiteralConstant",
"type": "constant",
"datatype": "Real",
"value": 0.0
}
},
"outputs": {
"out1": [
{
"uid": "22",
"scope": "GlobalVariable",
"type": "variable",
"name": "\"HMI_Variables_Status\".\"Analog_Values\".\"TP301RunOutCount\""
}
]
},
"scl": "\"HMI_Variables_Status\".\"Analog_Values\".\"TP301RunOutCount\" := 0.0;"
}
]
}
]
}

View File

@ -0,0 +1,35 @@
// Block Name (Original): BlenderCtrl_ProdModeInit
// Block Number: 2012
// Original Language: LAD
FUNCTION_BLOCK "BlenderCtrl_ProdModeInit"
{ S7_Optimized_Access := 'TRUE' }
VERSION : 0.1
VAR_INPUT
END_VAR
VAR_OUTPUT
END_VAR
VAR_IN_OUT
END_VAR
VAR_TEMP
END_VAR
BEGIN
// Network 1: PID Reset Integral
BlenderPID_PIDResInteg();
// Network 2: Ctrl Init Errors
BlenderCtrl_InitErrors();
// Network 3: RunOut Counter
"HMI_Variables_Status"."Analog_Values"."TP301RunOutCount" := 0.0;
END_FUNCTION_BLOCK

10030
BlenderCtrl__Main.xml Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,392 @@
// Block Name (Original): BlenderCtrl__Main
// Block Number: 2000
// Original Language: LAD
FUNCTION_BLOCK "BlenderCtrl__Main"
{ S7_Optimized_Access := 'TRUE' }
VERSION : 0.1
VAR_INPUT
END_VAR
VAR_OUTPUT
END_VAR
VAR_IN_OUT
END_VAR
VAR_TEMP
All_Auto_RETVAL : Int;
Reset_SP_Word_RETVAL : Int;
mResetWaterTot : Bool;
mResetSyrupTot : Bool;
mResetCO2Tot : Bool;
mResetProductTot : Bool;
Block_Move_Err : Int;
END_VAR
BEGIN
// Network 1: Clock Generation
Clock Signal();
// Network 2: Machine Init
BlenderCtrl_MachineInit();
// Network 3: Filler Head
// RLO: "AUX FALSE"
// Network 4: Emergency Pressed
// RLO: "gIN_VoltageOk"
// Network 5: Air and CO2 pressure ok and auxiliary ok
// RLO: "gIN_LinePressCO2Ok"
// RLO: "gWorkshopTest"
// RLO: "gWorkshopTest" AND "gWorkshop_Co2_Presence"
// RLO: ("gWorkshopTest" AND "gWorkshop_Co2_Presence") AND "gWorkshop_CIP_Signals"
// RLO: ("gIN_LinePressCO2Ok" OR (("gWorkshopTest" AND "gWorkshop_Co2_Presence") AND "gWorkshop_CIP_Signals")) AND "HMI_Digital"."_PAL_S11"."Filtered"
// RLO: "Disable_Bit"
// RLO: ((("gIN_LinePressCO2Ok" OR (("gWorkshopTest" AND "gWorkshop_Co2_Presence") AND "gWorkshop_CIP_Signals")) AND "HMI_Digital"."_PAL_S11"."Filtered") OR "Disable_Bit") AND "gIN_VoltageOk"
"gBlenderSuppliesOk" := ((("gIN_LinePressCO2Ok" OR (("gWorkshopTest" AND "gWorkshop_Co2_Presence") AND "gWorkshop_CIP_Signals")) AND "HMI_Digital"."_PAL_S11"."Filtered") OR "Disable_Bit") AND "gIN_VoltageOk";
// Network 6: Blender State Num
"HMI_Variables_Status"."Procedures"."BlenderStateNum" := 0;
// Network 7: Delay Power On
// RLO: "FirstScan"
// Network 8: Production Mode
// RLO: "HMI_Variables_Status"."System"."Blender_Prod_CIP"
"gBlenderProdMode" := "HMI_Variables_Status"."System"."Blender_Prod_CIP";
// Network 9: CIp Mode
// RLO: "HMI_Variables_Status"."System"."Blender_Prod_CIP"
"gBlenderCIPMode" := "HMI_Variables_Status"."System"."Blender_Prod_CIP";
IF "HMI_Variables_Status"."System"."Blender_Prod_CIP" THEN
"HMI_Variables_Status"."Procedures"."BlenderStateNum" := 19;
END_IF;
// Network 10: Error Faults
// RLO: "AUX FALSE"
// Network 11: Filler Bottle Count Used to push Product
// RLO: "System_RunOut_Variables"."ProdPipeRunOutWaterCount"
"System_RunOut_Variables"."ProdPipeRunOutFillerBott" := "System_RunOut_Variables"."ProdPipeRunOutWaterCount";
// Network 12: Water Bypass Enable
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass"
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_ByPassDeair"
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_ByPassDeair" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Deaireation"
// RLO: ("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass" OR ("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_ByPassDeair" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Deaireation")) AND "Blender_Variables_Pers"."gWaterRecipe"
// RLO: (("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass" OR ("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_ByPassDeair" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Deaireation")) AND "Blender_Variables_Pers"."gWaterRecipe") AND "Blender_Variables_Pers"."gCarboStillRecipe"
"gStillWaterByPassEn" := (("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass" OR ("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_ByPassDeair" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Deaireation")) AND "Blender_Variables_Pers"."gWaterRecipe") AND "Blender_Variables_Pers"."gCarboStillRecipe";
// Network 13: Still Water Bypass
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BlendFillSystem"
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BlendFillSystem" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass"
// RLO: ("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BlendFillSystem" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass") AND "Blender_Variables_Pers"."gWaterRecipe"
// RLO: (("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BlendFillSystem" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass") AND "Blender_Variables_Pers"."gWaterRecipe") AND "Blender_Variables_Pers"."gCarboStillRecipe"
"gBlendFiStillWaterByPass" := (("HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BlendFillSystem" AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_StillWaterByPass") AND "Blender_Variables_Pers"."gWaterRecipe") AND "Blender_Variables_Pers"."gCarboStillRecipe";
// Network 14: Manual Syrup Drain Valve Open - Operator Alarm
// RLO: "gSyrupRoomEn"
// RLO: "gSyrupRoomEn" AND "gIN_HVP301_Aux"
// RLO: ("gSyrupRoomEn" AND "gIN_HVP301_Aux") AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_FastChangeOverEnabled"
// RLO: (("gSyrupRoomEn" AND "gIN_HVP301_Aux") AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_FastChangeOverEnabled") AND "Procedure_Variables"."FTP302Line_Preparation"."Done"
// RLO: ((("gSyrupRoomEn" AND "gIN_HVP301_Aux") AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_FastChangeOverEnabled") AND "Procedure_Variables"."FTP302Line_Preparation"."Done") AND "Procedure_Variables"."Syr_RunOut"."Done"
// RLO: "gBlenderCIPMode"
// RLO: "gBlenderCIPMode" AND "gIN_CIP_CIPRunning"
// RLO: ("gBlenderCIPMode" AND "gIN_CIP_CIPRunning") AND "Procedure_Variables"."Blender_Run"."Running"
"gHVP301_Open" := (((("gSyrupRoomEn" AND "gIN_HVP301_Aux") AND "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_FastChangeOverEnabled") AND "Procedure_Variables"."FTP302Line_Preparation"."Done") AND "Procedure_Variables"."Syr_RunOut"."Done") OR (("gBlenderCIPMode" AND "gIN_CIP_CIPRunning") AND "Procedure_Variables"."Blender_Run"."Running");
// Network 15: Manual Syrup Drain Valve Open - Operator Alarm
// RLO: "gIN_HVM302_Aux"
// Network 16: Maselli Control
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BrixMeter"
// Cond: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_MeterType" = 6
IF "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_MeterType" = 6 THEN
Maselli_PA_Control();
END_IF;
// Network 17: mPDS Control
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BrixMeter"
// Cond: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_MeterType" = 5
IF "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_MeterType" = 5 THEN
mPDS_PA_Control();
END_IF;
// Network 18: mPDS Syrup Control
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_SyrBrixMeter"
IF "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_SyrBrixMeter" THEN
mPDS_SYR_PA_Control();
END_IF;
// Network 19: Co2 Analog Input
// GetProdBrixCO2_FromAnalogIn
// CALL "GetProdBrixCO2_FromAn"
// NOP 0
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_BrixMeter"
// Cond: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_MeterType" = 3
IF "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_MeterType" = 3 THEN
GetProdBrixCO2_Anal_Inpt();
END_IF;
// Network 20: Quality
ProductQuality();
// Network 21: Input Data
// ERROR: Call a bloque no soportado: Input
// Network 22: Sel Brix Source Check
SelCheckBrixSource();
// Network 23: Check Water Cooling System Temperature
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_InverterRecirPumpPPM306"
IF "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_InverterRecirPumpPPM306" THEN
CTRLCoolingSystem();
END_IF;
// Network 24: Tank Level
TankLevel();
// Network 25: Production ONS
// RLO: "gBlenderProdMode"
// PBox 26 - Passing bit: "M19001"
// RLO: "M19001" AND "mDelayPowerOnTmr"
"gProductionONS" := "M19001" AND "mDelayPowerOnTmr";
// Network 26: Blender Prod Mode Init
// RLO: "gProductionONS"
// RLO: "Procedure_Variables"."Blender_Rinse"."ONS_Done"
// RLO: ("gProductionONS" OR "Procedure_Variables"."Blender_Rinse"."ONS_Done") AND "Blender_Variables_Pers"."gBlenderStarted"
IF ("gProductionONS" OR "Procedure_Variables"."Blender_Rinse"."ONS_Done") AND "Blender_Variables_Pers"."gBlenderStarted" THEN
BlenderCtrl_ProdModeInit();
END_IF;
// Network 27: Rinse ONS
// RLO: "HMI_Variables_Status"."System"."Blender_Prod_CIP"
// PBox 26 - Passing bit: "M19002"
// RLO: "M19002" AND "mDelayPowerOnTmr"
"gRinseONS" := "M19002" AND "mDelayPowerOnTmr";
// Network 28: CIP ONS
// RLO: "gBlenderCIPMode"
// PBox 26 - Passing bit: "M19003"
// RLO: "M19003" AND "mDelayPowerOnTmr"
"gCIPONS" := "M19003" AND "mDelayPowerOnTmr";
// Network 29: CIp Mode Init
// RLO: "gCIPONS"
IF "gCIPONS" THEN
BlenderCtrl_CIPModeInit();
END_IF;
// Network 30: Reset SPWords
BlenderCtrl_ResetSPWord();
// Network 31: Blender Run Control
BlenderRun__Control();
// Network 32: Tank Pressure Control
Prod Tank PressCtrl();
// Network 33: Balaiage
Baialage();
// Network 34: First Production
// ERROR: Call a bloque no soportado: ProcedureFirstProduction
// Network 35: CIP MAIN Calling
CIPMain();
// Network 36: Blender Rinse
BlenderRinse();
// Network 37: Safeties
Safeties();
// Network 38: Instrument Scanner
Instrument_Scanner();
// Network 39: Vacuum Control
VacuumCtrl();
// Network 40: Syrup Room Control
SyrupRoomCtrl();
// Network 41: Blend Procedure Data
// RLO: "mDelayPowerOnTmr"
IF "mDelayPowerOnTmr" THEN
// ERROR: Call a bloque no soportado: Procedure
END_IF;
// Network 42: Pneumatic Valve Control
Pneumatic Valve Ctrl();
// Network 43: Pumps Control
PumpsControl();
// Network 44: Prod Report Manager
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Report"
IF "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Report" THEN
ProdReportManager();
END_IF;
// Network 45: Outputs
Output();
// Network 46: SLIM BLOCK
SLIM_Block();
// Network 47: Interlocking Panel 1
Interlocking_Panel_1();
// Network 48: Filler Control
FillerControl();
// Network 49: Blender Ctrl Update PWORD
BlenderCtrl_UpdatePWord();
// Network 50: ResetTotalizer
// RLO: "gBlendResetTotalizer"
// Network 51: ResetWaterTot
// RLO: "gFTN301_ResetTot"
// RLO: "mResetTotalizerTmr"
// Network 52: Water VFM Reset Totalizer
// RLO: "gFTN301_ResetTot"
// Network 53: ResetCO2Tot
// RLO: "gFTP302_ResetTot"
// RLO: "mResetTotalizerTmr"
// Network 54: Syrup MFM Reset Totalizer
// RLO: "gFTP302_ResetTot"
// Network 55: ResetProductTot
// RLO: "gFTM303_ResetTot"
// RLO: "mResetTotalizerTmr"
// Network 56: CO2 MFM Reset Tot
// RLO: "gFTM303_ResetTot"
// Network 57: ResetCO2Tot
// RLO: "gProductMFMResetTot"
// RLO: "mResetTotalizerTmr"
// Network 58: Reset Totalizer
// RLO: "gProductMFMResetTot"
// Network 59: Reset Totalizer
// RLO: "gBlendResetTotalizer"
// Network 60: Blender Ctrl Command
// RLO: "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Simulation"
IF "HMI_Blender_Parameters"."Processor_Options"."Blender_OPT"."_Simulation" THEN
BlenderCtrl_MFM Command();
END_IF;
// Network 61: DP Global Diag
CPU_DP Global Diag();
// Network 62: Profibus
Profibus Network();
// Network 63: Valve Fault
ModValveFault();
// Network 64: All Auto
// RLO: "HMI_Variables_Cmd"."Commands_From_HMI"."F7_DeviceControl"."Command"
// RLO: "HMI_Variables_Cmd"."Commands_From_HMI"."F7_DeviceControl"."Command" AND "HMI_Variables_Cmd"."Commands_From_HMI"."F7_DeviceControl"."Enable"
// Network 65: Ctrl HMI Manual Active
BlenderCtrl_ManualActive();
// Network 66: Mod Copy Recipe
// RLO: "HMI_Variables_Cmd"."Recipe"."Main_Page"
// RLO: "HMI_Variables_Cmd"."Recipe"."Main_Page" AND "mFP_Recip_Main_Page"
"mAux_FP_M700_1" := "HMI_Variables_Cmd"."Recipe"."Main_Page" AND "mFP_Recip_Main_Page";
// RLO: "HMI_Variables_Cmd"."Recipe"."Main_Page"
// RLO: "HMI_Variables_Cmd"."Recipe"."Main_Page" AND "HMI_Variables_Cmd"."Recipe"."Edit"
// RLO: "mAux_FP_M700_1"
// Network 67: to HMI - Recipe Management
// RLO: "AUX TRUE"
IF "AUX TRUE" THEN
// ERROR: Call a bloque no soportado: RecipeManagement - Prod
END_IF;
// Network 68: Recipe Calculation
RecipeCalculation();
END_FUNCTION_BLOCK

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,7 @@ import json
import os import os
from lxml import etree from lxml import etree
import traceback import traceback
from collections import defaultdict # Para facilitar el manejo de conexiones from collections import defaultdict
# --- Namespaces --- # --- Namespaces ---
ns = { ns = {
@ -14,6 +14,7 @@ ns = {
# --- Helper Functions --- # --- Helper Functions ---
# (get_multilingual_text, get_symbol_name, parse_access, parse_part - sin cambios) # (get_multilingual_text, get_symbol_name, parse_access, parse_part - sin cambios)
# ... (código de estas funciones aquí) ...
def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"): def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"):
if element is None: if element is None:
return "" return ""
@ -100,7 +101,7 @@ def parse_access(access_element):
float(value_str) float(value_str)
info["datatype"] = "Real" info["datatype"] = "Real"
except ValueError: except ValueError:
pass # Si no es float, sigue siendo Unknown (o lo que fuera antes) pass
elif "#" in value_str: elif "#" in value_str:
info["datatype"] = "TypedConstant" info["datatype"] = "TypedConstant"
info["value"] = value_str info["value"] = value_str
@ -164,276 +165,549 @@ def parse_part(part_element):
template_values[tv_name] = tv_type template_values[tv_name] = tv_type
except Exception as e: except Exception as e:
print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}") print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}")
return {"uid": uid, "name": name, "template_values": template_values} return {
"uid": uid,
"type": name,
"template_values": template_values,
} # Cambiado Name a type
# --- NUEVA FUNCIÓN parse_call ---
def parse_call(call_element):
"""
Parsea un elemento Call (llamada a FC/FB) y extrae su información.
"""
if call_element is None:
return None
uid = call_element.get("UId")
if not uid:
print(
f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"
)
return None
call_info_elem = call_element.xpath("./*[local-name()='CallInfo']")
if not call_info_elem:
print(f"Error: Call UID {uid} sin elemento CallInfo.")
return None
call_info = call_info_elem[0]
block_name = call_info.get("Name")
block_type = call_info.get("BlockType") # FC, FB, etc.
instance_name = None
instance_scope = None
# Si es una llamada a FB, puede tener información de instancia
instance_elem = call_info.xpath("./*[local-name()='Instance']")
if instance_elem:
instance = instance_elem[0]
instance_scope = instance.get("Scope") # Ej: GlobalVariable, InstanceDB
# El nombre de la instancia DB suele estar en Component dentro de Symbol
symbol_elem = instance.xpath("./*[local-name()='Symbol']")
if symbol_elem:
instance_name = get_symbol_name(symbol_elem[0])
if not block_name or not block_type:
print(f"Error: CallInfo para UID {uid} sin Name o BlockType.")
return None
call_data = {
"uid": uid,
"type": "Call", # Tipo genérico para nuestra estructura JSON
"block_name": block_name,
"block_type": block_type,
}
if instance_name:
call_data["instance_db"] = instance_name
if instance_scope:
call_data["instance_scope"] = instance_scope
return call_data
# --- FIN NUEVA FUNCIÓN ---
# --- Función parse_network MODIFICADA ---
def parse_network(network_element): def parse_network(network_element):
""" """
Parsea una red (CompileUnit), extrae lógica, infiere conexiones EN implícitas, Parsea una red, extrae lógica (incluyendo Calls) y añade conexiones EN implícitas.
y añade lógica ENO interesante.
""" """
if network_element is None: if network_element is None:
print("Error: parse_network llamado con network_element=None") return {
return {'id': 'ERROR', 'title': 'Invalid Network Element', 'comment': '', 'logic': [], 'error': 'Input element was None'} "id": "ERROR",
"title": "Invalid Network Element",
"comment": "",
"logic": [],
"error": "Input element was None",
}
network_id = network_element.get("ID")
title_element = network_element.xpath(
".//*[local-name()='MultilingualText'][@CompositionName='Title']"
)
network_title = (
get_multilingual_text(title_element[0])
if title_element
else f"Network {network_id}"
)
comment_title_element = network_element.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
network_comment = (
get_multilingual_text(comment_title_element[0]) if comment_title_element else ""
)
network_id = network_element.get('ID')
# print(f"--- Parseando Red ID={network_id} ---") # Descomentar para depuración detallada
# Extrae el título de la red
title_element = network_element.xpath(".//*[local-name()='MultilingualText'][@CompositionName='Title']")
network_title = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}"
# Extrae el comentario de la red
network_comment = ""
comment_title_element = network_element.xpath("./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']")
if comment_title_element:
network_comment = get_multilingual_text(comment_title_element[0])
# Encuentra FlgNet
flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns)
if not flgnet_list: if not flgnet_list:
print(f"Error: No se encontró FlgNet en la red ID={network_id}") return {
return {'id': network_id, 'title': network_title, 'comment': network_comment, 'logic': [], 'error': 'FlgNet not found'} "id": network_id,
"title": network_title,
"comment": network_comment,
"logic": [],
"error": "FlgNet not found",
}
flgnet = flgnet_list[0] flgnet = flgnet_list[0]
# 1. Parsear Access y Parts # 1. Parsear Access, Parts y Calls
access_map = {acc_info['uid']: acc_info for acc in flgnet.xpath(".//flg:Access", namespaces=ns) if (acc_info := parse_access(acc))} access_map = {
parts_map = {part_info['uid']: part_info for part in flgnet.xpath(".//flg:Part", namespaces=ns) if (part_info := parse_part(part))} acc_info["uid"]: acc_info
# print(f"DEBUG: Red {network_id} - Access={len(access_map)}, Parts={len(parts_map)}") # Debug for acc in flgnet.xpath(".//flg:Access", namespaces=ns)
if (acc_info := parse_access(acc))
}
# --- MODIFICADO: Unir Parts y Calls en un solo mapa ---
parts_and_calls_map = {}
instruction_elements = flgnet.xpath(
".//flg:Part | .//flg:Call", namespaces=ns
) # Buscar ambos tipos
for element in instruction_elements:
parsed_info = None
if element.tag.endswith("Part"):
parsed_info = parse_part(element)
elif element.tag.endswith("Call"):
parsed_info = parse_call(element)
if parsed_info and "uid" in parsed_info:
parts_and_calls_map[parsed_info["uid"]] = parsed_info
else:
print(
f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}"
)
# --- FIN MODIFICADO ---
# 2. Parsear Wires y construir mapas de conexiones # 2. Parsear Wires y construir mapas de conexiones
wire_connections = defaultdict(list) # key=(dest_uid, dest_pin), value=list of (src_uid, src_pin) wire_connections = defaultdict(list)
source_connections = defaultdict(list) # key=(src_uid, src_pin), value=list of (dest_uid, dest_pin) source_connections = defaultdict(list)
eno_outputs = defaultdict(list) # key=src_uid, value=list of (dest_uid, dest_pin) from ENO eno_outputs = defaultdict(list)
flg_ns_uri = ns['flg'] flg_ns_uri = ns["flg"]
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns): for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
source_uid, source_pin, dest_uid, dest_pin = None, None, None, None source_uid, source_pin, dest_uid, dest_pin = None, None, None, None
children = wire.getchildren() children = wire.getchildren()
if len(children) < 2: continue if len(children) < 2:
continue
source_elem, dest_elem = children[0], children[1] source_elem, dest_elem = children[0], children[1]
if source_elem.tag == etree.QName(flg_ns_uri, "Powerrail"):
# Determina la fuente source_uid, source_pin = "POWERRAIL", "out"
if source_elem.tag == etree.QName(flg_ns_uri, 'Powerrail'): source_uid, source_pin = 'POWERRAIL', 'out' elif source_elem.tag == etree.QName(flg_ns_uri, "IdentCon"):
elif source_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): source_uid, source_pin = source_elem.get('UId'), 'value' source_uid, source_pin = source_elem.get("UId"), "value"
elif source_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): source_uid, source_pin = source_elem.get('UId'), source_elem.get('Name') elif source_elem.tag == etree.QName(flg_ns_uri, "NameCon"):
source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name")
# Determina el destino if dest_elem.tag == etree.QName(flg_ns_uri, "IdentCon"):
if dest_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): dest_uid, dest_pin = dest_elem.get('UId'), 'value' dest_uid, dest_pin = dest_elem.get("UId"), "value"
elif dest_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): dest_uid, dest_pin = dest_elem.get('UId'), dest_elem.get('Name') elif dest_elem.tag == etree.QName(flg_ns_uri, "NameCon"):
dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name")
# Registrar conexiones si son válidas
if dest_uid and dest_pin and source_uid is not None: if dest_uid and dest_pin and source_uid is not None:
dest_key = (dest_uid, dest_pin); source_key = (source_uid, source_pin) dest_key = (dest_uid, dest_pin)
source_info = (source_uid, source_pin); dest_info = (dest_uid, dest_pin) source_key = (source_uid, source_pin)
source_info = (source_uid, source_pin)
if source_info not in wire_connections[dest_key]: wire_connections[dest_key].append(source_info) dest_info = (dest_uid, dest_pin)
if dest_info not in source_connections[source_key]: source_connections[source_key].append(dest_info) if source_info not in wire_connections[dest_key]:
wire_connections[dest_key].append(source_info)
# Registrar conexiones que SALEN de un pin ENO if dest_info not in source_connections[source_key]:
if source_pin == 'eno' and source_uid in parts_map: source_connections[source_key].append(dest_info)
if dest_info not in eno_outputs[source_uid]: if (
eno_outputs[source_uid].append(dest_info) source_pin == "eno" and source_uid in parts_and_calls_map
): # Usar mapa combinado
if dest_info not in eno_outputs[source_uid]:
eno_outputs[source_uid].append(dest_info)
# 3. Construir la representación lógica INICIAL # 3. Construir la representación lógica INICIAL
all_logic_steps = {} all_logic_steps = {}
functional_block_types = ['Move', 'Add', 'Sub', 'Mul', 'Div', 'Mod', 'Convert'] # Ampliar si es necesario functional_block_types = [
rlo_generators = ['Contact', 'O', 'Eq', 'Ne', 'Gt', 'Lt', 'Ge', 'Le', 'And', 'Xor', 'PBox'] # Ampliar si es necesario "Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call",
] # Añadir Call
rlo_generators = [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"And",
"Xor",
"PBox",
]
# --- MODIFICADO: Iterar sobre el mapa combinado ---
for instruction_uid, instruction_info in parts_and_calls_map.items():
# Usar directamente la info parseada (part_info o call_info)
instruction_repr = {
"instruction_uid": instruction_uid,
**instruction_info,
} # Copiar datos base
instruction_repr["inputs"] = {}
instruction_repr["outputs"] = {}
# --- FIN MODIFICADO ---
for part_uid, part_info in parts_map.items():
instruction_repr = {'instruction_uid': part_uid, 'type': part_info['name'], 'inputs': {}, 'outputs': {}}
# Rellenar inputs explícitos # Rellenar inputs explícitos
for dest_pin_name in ['en', 'in', 'in1', 'in2', 'in3', 'in4', 'bit', 'operand', 'pre', 'clk']: # Añadir pines comunes # Añadir más pines si las llamadas a FB los usan (ej: parámetros FC/FB)
dest_key = (part_uid, dest_pin_name) possible_pins = set(
if dest_key in wire_connections: ["en", "in", "in1", "in2", "in3", "in4", "bit", "operand", "pre", "clk"]
sources_list = wire_connections[dest_key] )
input_sources_repr = [] # Añadir pines específicos de la llamada si es un FB? Más complejo.
for source_uid, source_pin in sources_list: for dest_pin_name in possible_pins:
if source_uid == 'POWERRAIL': input_sources_repr.append({'type': 'powerrail'}) dest_key = (instruction_uid, dest_pin_name)
elif source_uid in access_map: input_sources_repr.append(access_map[source_uid]) if dest_key in wire_connections:
elif source_uid in parts_map: sources_list = wire_connections[dest_key]
input_sources_repr.append({'type': 'connection', 'source_instruction_uid': source_uid, input_sources_repr = []
'source_instruction_type': parts_map[source_uid]['name'], 'source_pin': source_pin}) for source_uid, source_pin in sources_list:
else: input_sources_repr.append({'type': 'unknown_source', 'uid': source_uid}) if source_uid == "POWERRAIL":
if len(input_sources_repr) == 1: instruction_repr['inputs'][dest_pin_name] = input_sources_repr[0] input_sources_repr.append({"type": "powerrail"})
elif len(input_sources_repr) > 1: instruction_repr['inputs'][dest_pin_name] = input_sources_repr # Rama OR/Multiple elif source_uid in access_map:
input_sources_repr.append(access_map[source_uid])
elif source_uid in parts_and_calls_map: # Usar mapa combinado
input_sources_repr.append(
{
"type": "connection",
# Usar el tipo del mapa combinado
"source_instruction_type": parts_and_calls_map[
source_uid
]["type"],
"source_instruction_uid": source_uid,
"source_pin": source_pin,
}
)
else:
input_sources_repr.append(
{"type": "unknown_source", "uid": source_uid}
)
if len(input_sources_repr) == 1:
instruction_repr["inputs"][dest_pin_name] = input_sources_repr[0]
elif len(input_sources_repr) > 1:
instruction_repr["inputs"][dest_pin_name] = input_sources_repr
# Rellenar outputs explícitos (hacia Access) # Rellenar outputs explícitos (hacia Access)
for source_pin_name in ['out', 'out1', 'Q', 'eno']: # Añadir pines comunes for source_pin_name in [
source_key = (part_uid, source_pin_name) "out",
if source_key in source_connections: "out1",
for dest_uid, dest_pin in source_connections[source_key]: "Q",
if dest_uid in access_map: "eno",
if source_pin_name not in instruction_repr['outputs']: instruction_repr['outputs'][source_pin_name] = [] ]: # Añadir salidas comunes de FC/FB si es necesario
if access_map[dest_uid] not in instruction_repr['outputs'][source_pin_name]: source_key = (instruction_uid, source_pin_name)
instruction_repr['outputs'][source_pin_name].append(access_map[dest_uid]) if source_key in source_connections:
for dest_uid, dest_pin in source_connections[source_key]:
if dest_uid in access_map:
if source_pin_name not in instruction_repr["outputs"]:
instruction_repr["outputs"][source_pin_name] = []
if (
access_map[dest_uid]
not in instruction_repr["outputs"][source_pin_name]
):
instruction_repr["outputs"][source_pin_name].append(
access_map[dest_uid]
)
all_logic_steps[part_uid] = instruction_repr all_logic_steps[instruction_uid] = instruction_repr
# --- 4. INFERENCIA Y PROPAGACIÓN DE CONEXIONES 'EN' IMPLÍCITAS --- # --- 4. INFERENCIA Y PROPAGACIÓN DE CONEXIONES 'EN' IMPLÍCITAS ---
# print(f"DEBUG: Iniciando inferencia EN para Red {network_id}...") # Debug # (Esta lógica probablemente necesite ajustes para considerar 'Call' como bloque funcional)
processed_blocks_en_inference = set() # Evitar procesar el mismo bloque múltiples veces en inferencia EN # print(f"DEBUG: Iniciando inferencia EN para Red {network_id}...")
processed_blocks_en_inference = set()
something_changed = True something_changed = True
inference_passes = 0 inference_passes = 0
max_inference_passes = len(all_logic_steps) + 5 max_inference_passes = len(all_logic_steps) + 5
while something_changed and inference_passes < max_inference_passes: while something_changed and inference_passes < max_inference_passes:
something_changed = False something_changed = False
inference_passes += 1 inference_passes += 1
# print(f"DEBUG: Pase de inferencia EN {inference_passes}...") # Debug
# Ordenar por UID para intentar procesar en orden lógico
try: try:
sorted_uids_for_pass = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf')) sorted_uids_for_pass = sorted(
all_logic_steps.keys(),
key=lambda x: int(x) if x.isdigit() else float("inf"),
)
except ValueError: except ValueError:
sorted_uids_for_pass = sorted(all_logic_steps.keys()) sorted_uids_for_pass = sorted(all_logic_steps.keys())
for part_uid in sorted_uids_for_pass: current_logic_list = [
if part_uid not in all_logic_steps: continue # Seguridad all_logic_steps[uid]
instruction = all_logic_steps[part_uid] for uid in sorted_uids_for_pass
part_type = instruction['type'] if uid in all_logic_steps
]
if part_type in functional_block_types and 'en' not in instruction['inputs'] and part_uid not in processed_blocks_en_inference: for i, instruction in enumerate(
# print(f"DEBUG: Intentando inferir EN para {part_type} UID {part_uid}") # Debug current_logic_list
): # Usar enumerate para obtener índice
part_uid = instruction["instruction_uid"]
part_type = instruction["type"] # Ahora puede ser 'Call'
# Si es un bloque funcional (incluyendo Call) sin 'en' explícito
if (
part_type in functional_block_types
and "en" not in instruction["inputs"]
and part_uid not in processed_blocks_en_inference
):
inferred_en_source = None inferred_en_source = None
# Usar la lista ordenada por UID para buscar atrás my_index = i # Ya tenemos el índice
my_index = -1
current_logic_list = [all_logic_steps[uid] for uid in sorted_uids_for_pass if uid in all_logic_steps] # Lista actual ordenada
for i, instr in enumerate(current_logic_list):
if instr['instruction_uid'] == part_uid:
my_index = i
break
if my_index > 0: if my_index > 0:
for i in range(my_index - 1, -1, -1): for j in range(my_index - 1, -1, -1): # Buscar hacia atrás
prev_instr = current_logic_list[i] prev_instr = current_logic_list[j]
prev_uid = prev_instr['instruction_uid'] prev_uid = prev_instr["instruction_uid"]
prev_type = prev_instr['type'] prev_type = prev_instr["type"]
if prev_type in rlo_generators: if prev_type in rlo_generators:
inferred_en_source = {'type': 'connection', 'source_instruction_uid': prev_uid, 'source_instruction_type': prev_type, 'source_pin': 'out'} inferred_en_source = {
# print(f"DEBUG: Inferido EN para {part_uid} desde RLO de {prev_type} {prev_uid}.out") # Debug "type": "connection",
break "source_instruction_uid": prev_uid,
elif prev_type in functional_block_types: "source_instruction_type": prev_type,
source_key_eno = (prev_uid, 'eno') "source_pin": "out",
# Verificar si el ENO del bloque anterior está conectado a *algo* }
if source_key_eno in source_connections: break
inferred_en_source = {'type': 'connection', 'source_instruction_uid': prev_uid, 'source_instruction_type': prev_type, 'source_pin': 'eno'} elif prev_type in functional_block_types:
# print(f"DEBUG: Inferido EN para {part_uid} desde ENO de {prev_type} {prev_uid}.eno") # Debug source_key_eno = (prev_uid, "eno")
break if source_key_eno in source_connections:
# Si no hay conexión ENO explícita, podríamos asumir que sigue el RLO del EN de ese bloque? Más complejo. inferred_en_source = {
# Por ahora, solo usamos ENO si está conectado. "type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type,
"source_pin": "eno",
}
break
if inferred_en_source: if inferred_en_source:
instruction['inputs']['en'] = inferred_en_source # Asegurarse de que 'instruction' se refiera al diccionario original
# print(f"INFO: Conexión EN inferida añadida a {part_type} UID {part_uid}") # Info all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source
processed_blocks_en_inference.add(part_uid) processed_blocks_en_inference.add(part_uid)
something_changed = True something_changed = True
# else:
# print(f"DEBUG: No se pudo inferir EN para {part_type} UID {part_uid}") # Debug
# --- 5. Añadir lógica ENO interesante --- # --- 5. Añadir lógica ENO interesante ---
# (Necesita usar parts_and_calls_map ahora)
for source_instr_uid, eno_destinations in eno_outputs.items(): for source_instr_uid, eno_destinations in eno_outputs.items():
if source_instr_uid not in all_logic_steps: continue if source_instr_uid not in all_logic_steps:
continue
interesting_eno_logic = [] interesting_eno_logic = []
for dest_uid, dest_pin in eno_destinations: for dest_uid, dest_pin in eno_destinations:
is_direct_en_connection = (dest_uid in parts_map and dest_pin == 'en') is_direct_en_connection = (
dest_uid in parts_and_calls_map and dest_pin == "en"
) # Check en mapa combinado
if not is_direct_en_connection: if not is_direct_en_connection:
target_info = {'target_pin': dest_pin} target_info = {"target_pin": dest_pin}
if dest_uid in parts_map: target_info.update({'target_type': 'instruction', 'target_uid': dest_uid, 'target_name': parts_map[dest_uid]['name']}) if dest_uid in parts_and_calls_map:
elif dest_uid in access_map: target_info.update({'target_type': 'operand', 'target_details': access_map[dest_uid]}) target_info.update(
else: target_info.update({'target_type': 'unknown', 'target_uid': dest_uid}) {
"target_type": "instruction",
"target_uid": dest_uid,
"target_name": parts_and_calls_map[dest_uid].get(
"name", parts_and_calls_map[dest_uid].get("type")
),
}
) # Usar 'name' si existe (Part) o 'type' (Call)
elif dest_uid in access_map:
target_info.update(
{
"target_type": "operand",
"target_details": access_map[dest_uid],
}
)
else:
target_info.update(
{"target_type": "unknown", "target_uid": dest_uid}
)
interesting_eno_logic.append(target_info) interesting_eno_logic.append(target_info)
if interesting_eno_logic: if interesting_eno_logic:
all_logic_steps[source_instr_uid]['eno_logic'] = interesting_eno_logic all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic
# print(f"DEBUG: Red {network_id} - Añadida lógica ENO interesante para {source_instr_uid}") # Debug
# --- 6. Ordenar Lógica Final y Devolver --- # --- 6. Ordenar Lógica Final y Devolver ---
try: sorted_uids = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf'))
except ValueError: print(f"Advertencia: UIDs no numéricos red {network_id}. Orden alfabético."); sorted_uids = sorted(all_logic_steps.keys())
network_logic = [all_logic_steps[uid] for uid in sorted_uids if uid in all_logic_steps]
# print(f"--- Fin Parseo Red ID={network_id} ---") # Debug
return {'id': network_id, 'title': network_title, 'comment': network_comment, 'logic': network_logic}
# --- Función Principal convert_xml_to_json (sin cambios respecto a la versión anterior) ---
def convert_xml_to_json(xml_filepath, json_filepath):
"""
Función principal que orquesta la conversión del archivo XML de Openness
a un archivo JSON simplificado que representa la estructura del bloque FC,
incluyendo comentarios, inferencia EN y lógica ENO no trivial.
"""
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
if not os.path.exists(xml_filepath): print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'"); return
try: try:
print("Paso 1: Parseando archivo XML..."); parser = etree.XMLParser(remove_blank_text=True); tree = etree.parse(xml_filepath, parser); root = tree.getroot(); print("Paso 1: Parseo XML completado.") sorted_uids = sorted(
print("Paso 2: Buscando el bloque SW.Blocks.FC..."); fc_block_list = root.xpath("//*[local-name()='SW.Blocks.FC']"); all_logic_steps.keys(),
if not fc_block_list: print("Error Crítico: No se encontró <SW.Blocks.FC>."); return key=lambda x: int(x) if x.isdigit() else float("inf"),
fc_block = fc_block_list[0]; print(f"Paso 2: Bloque SW.Blocks.FC encontrado (ID={fc_block.get('ID')}).") )
print("Paso 3: Extrayendo atributos del bloque..."); attribute_list_node = fc_block.xpath("./*[local-name()='AttributeList']") except ValueError:
print(f"Advertencia: UIDs no numéricos red {network_id}. Orden alfabético.")
sorted_uids = sorted(all_logic_steps.keys())
network_logic = [
all_logic_steps[uid] for uid in sorted_uids if uid in all_logic_steps
]
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": network_logic,
}
# --- Función Principal convert_xml_to_json (sin cambios) ---
def convert_xml_to_json(xml_filepath, json_filepath):
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
if not os.path.exists(xml_filepath):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return
try:
print("Paso 1: Parseando archivo XML...")
parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(xml_filepath, parser)
root = tree.getroot()
print("Paso 1: Parseo XML completado.")
print("Paso 2: Buscando el bloque SW.Blocks.FC...")
fc_block_list = root.xpath("//*[local-name()='SW.Blocks.FC']")
if not fc_block_list:
print("Error Crítico: No se encontró <SW.Blocks.FC>.")
return
fc_block = fc_block_list[0]
print(f"Paso 2: Bloque SW.Blocks.FC encontrado (ID={fc_block.get('ID')}).")
print("Paso 3: Extrayendo atributos del bloque...")
attribute_list_node = fc_block.xpath("./*[local-name()='AttributeList']")
block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown" block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown"
if attribute_list_node: if attribute_list_node:
attr_list = attribute_list_node[0] attr_list = attribute_list_node[0]
name_node = attr_list.xpath("./*[local-name()='Name']/text()"); block_name_val = name_node[0].strip() if name_node else block_name_val name_node = attr_list.xpath("./*[local-name()='Name']/text()")
num_node = attr_list.xpath("./*[local-name()='Number']/text()"); block_number_val = int(num_node[0]) if num_node and num_node[0].isdigit() else block_number_val block_name_val = name_node[0].strip() if name_node else block_name_val
lang_node = attr_list.xpath("./*[local-name()='ProgrammingLanguage']/text()"); block_lang_val = lang_node[0].strip() if lang_node else block_lang_val num_node = attr_list.xpath("./*[local-name()='Number']/text()")
print(f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'") block_number_val = (
else: print("Advertencia: No se encontró AttributeList para el bloque FC.") int(num_node[0])
block_comment_val = ""; comment_node_list = fc_block.xpath("./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']") if num_node and num_node[0].isdigit()
if comment_node_list: block_comment_val = get_multilingual_text(comment_node_list[0]); print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'") else block_number_val
result = {"block_name": block_name_val, "block_number": block_number_val, "language": block_lang_val, "block_comment": block_comment_val, "interface": {}, "networks": []} )
lang_node = attr_list.xpath(
"./*[local-name()='ProgrammingLanguage']/text()"
)
block_lang_val = lang_node[0].strip() if lang_node else block_lang_val
print(
f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'"
)
else:
print("Advertencia: No se encontró AttributeList para el bloque FC.")
block_comment_val = ""
comment_node_list = fc_block.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
if comment_node_list:
block_comment_val = get_multilingual_text(comment_node_list[0])
print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
result = {
"block_name": block_name_val,
"block_number": block_number_val,
"language": block_lang_val,
"block_comment": block_comment_val,
"interface": {},
"networks": [],
}
print("Paso 4: Extrayendo la interfaz del bloque...") print("Paso 4: Extrayendo la interfaz del bloque...")
if attribute_list_node: if attribute_list_node:
interface_node_list = attribute_list_node[0].xpath("./*[local-name()='Interface']") interface_node_list = attribute_list_node[0].xpath(
if interface_node_list: "./*[local-name()='Interface']"
interface_node = interface_node_list[0]; print("Paso 4: Nodo Interface encontrado.") )
for section in interface_node.xpath(".//iface:Section", namespaces=ns): if interface_node_list:
section_name = section.get('Name'); members = [] interface_node = interface_node_list[0]
for member in section.xpath("./iface:Member", namespaces=ns): print("Paso 4: Nodo Interface encontrado.")
member_name = member.get('Name'); member_dtype = member.get('Datatype') for section in interface_node.xpath(".//iface:Section", namespaces=ns):
if member_name and member_dtype: members.append({"name": member_name, "datatype": member_dtype}) section_name = section.get("Name")
if members: result["interface"][section_name] = members members = []
if not result["interface"]: print("Advertencia: Interface sin secciones iface:Section válidas.") for member in section.xpath("./iface:Member", namespaces=ns):
else: print("Advertencia: No se encontró <Interface> DENTRO de <AttributeList>.") member_name = member.get("Name")
if not result["interface"]: print("Advertencia: No se pudo extraer información de la interfaz.") member_dtype = member.get("Datatype")
if member_name and member_dtype:
members.append(
{"name": member_name, "datatype": member_dtype}
)
if members:
result["interface"][section_name] = members
if not result["interface"]:
print("Advertencia: Interface sin secciones iface:Section válidas.")
else:
print(
"Advertencia: No se encontró <Interface> DENTRO de <AttributeList>."
)
if not result["interface"]:
print("Advertencia: No se pudo extraer información de la interfaz.")
print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...") print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...")
networks_processed_count = 0 networks_processed_count = 0
object_list_node = fc_block.xpath("./*[local-name()='ObjectList']") object_list_node = fc_block.xpath("./*[local-name()='ObjectList']")
if object_list_node: if object_list_node:
compile_units = object_list_node[0].xpath("./*[local-name()='SW.Blocks.CompileUnit']") compile_units = object_list_node[0].xpath(
print(f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit.") "./*[local-name()='SW.Blocks.CompileUnit']"
)
print(
f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit."
)
for network_elem in compile_units: for network_elem in compile_units:
networks_processed_count += 1 networks_processed_count += 1
# print(f"DEBUG: Procesando red #{networks_processed_count} (ID={network_elem.get('ID')})...") # print(f"DEBUG: Procesando red #{networks_processed_count} (ID={network_elem.get('ID')})...")
parsed_network = parse_network(network_elem) # Llamada a la función modificada parsed_network = parse_network(
if parsed_network and parsed_network.get('error') is None: result["networks"].append(parsed_network) network_elem
elif parsed_network: print(f"Error: Falló parseo red ID={parsed_network.get('id')}: {parsed_network.get('error')}"); result["networks"].append(parsed_network) ) # Llamada a la función modificada
else: print(f"Error: parse_network devolvió None para CompileUnit (ID={network_elem.get('ID')}).") if parsed_network and parsed_network.get("error") is None:
if networks_processed_count == 0: print("Advertencia: ObjectList sin SW.Blocks.CompileUnit.") result["networks"].append(parsed_network)
else: print("Advertencia: No se encontró ObjectList.") elif parsed_network:
print(
f"Error: Falló parseo red ID={parsed_network.get('id')}: {parsed_network.get('error')}"
)
result["networks"].append(parsed_network)
else:
print(
f"Error: parse_network devolvió None para CompileUnit (ID={network_elem.get('ID')})."
)
if networks_processed_count == 0:
print("Advertencia: ObjectList sin SW.Blocks.CompileUnit.")
else:
print("Advertencia: No se encontró ObjectList.")
print("Paso 6: Escribiendo el resultado en el archivo JSON...") print("Paso 6: Escribiendo el resultado en el archivo JSON...")
if not result["interface"]: print("ADVERTENCIA FINAL: 'interface' está vacía.") if not result["interface"]:
if not result["networks"]: print("ADVERTENCIA FINAL: 'networks' está vacía.") print("ADVERTENCIA FINAL: 'interface' está vacía.")
if not result["networks"]:
print("ADVERTENCIA FINAL: 'networks' está vacía.")
# else: # Chequeo ENO logic # else: # Chequeo ENO logic
# eno_logic_found = any(instr.get('eno_logic') for net in result.get('networks', []) if net.get('error') is None for instr in net.get('logic', [])) # eno_logic_found = any(instr.get('eno_logic') for net in result.get('networks', []) if net.get('error') is None for instr in net.get('logic', []))
# if eno_logic_found: print("INFO FINAL: Lógica ENO interesante detectada.") # if eno_logic_found: print("INFO FINAL: Lógica ENO interesante detectada.")
# else: print("INFO FINAL: No se detectó lógica ENO interesante.") # else: print("INFO FINAL: No se detectó lógica ENO interesante.")
try: try:
with open(json_filepath, 'w', encoding='utf-8') as f: json.dump(result, f, indent=4, ensure_ascii=False) with open(json_filepath, "w", encoding="utf-8") as f:
print(f"Paso 6: Escritura completada."); print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'") json.dump(result, f, indent=4, ensure_ascii=False)
except IOError as e: print(f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}") print(f"Paso 6: Escritura completada.")
except TypeError as e: print(f"Error Crítico: Problema al serializar a JSON. Error: {e}") print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'")
except IOError as e:
print(
f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"
)
except TypeError as e:
print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
except etree.XMLSyntaxError as e:
print(f"Error Crítico: Sintaxis XML en '{xml_filepath}'. Detalles: {e}")
except Exception as e:
print(f"Error Crítico: Error inesperado: {e}")
print("--- Traceback ---")
traceback.print_exc()
print("--- Fin Traceback ---")
except etree.XMLSyntaxError as e: print(f"Error Crítico: Sintaxis XML en '{xml_filepath}'. Detalles: {e}")
except Exception as e: print(f"Error Crítico: Error inesperado: {e}"); print("--- Traceback ---"); traceback.print_exc(); print("--- Fin Traceback ---")
# --- Punto de Entrada Principal --- # --- Punto de Entrada Principal ---
if __name__ == "__main__": if __name__ == "__main__":
xml_file = 'BlenderCtrl_ProdModeInit.xml' xml_file = "BlenderCtrl__Main.xml" # CAMBIAR AL NUEVO ARCHIVO XML
json_file = 'BlenderCtrl_ProdModeInit_simplified.json' json_file = xml_file.replace(
".xml", "_simplified.json"
) # Nombre de salida dinámico
convert_xml_to_json(xml_file, json_file) convert_xml_to_json(xml_file, json_file)

View File

@ -98,6 +98,29 @@ def get_scl_representation(source_info, network_id, scl_map, access_map):
return f"_ERR_INVALID_SRC_TYPE_" return f"_ERR_INVALID_SRC_TYPE_"
def format_variable_name(name):
"""Limpia el nombre de la variable quitando comillas y espacios."""
if not name:
return "_INVALID_NAME_"
# Quita comillas dobles iniciales/finales
name = name.strip('"')
# Reemplaza comillas dobles internas y puntos por guión bajo
# ¡Cuidado! Reemplazar '.' puede ser problemático para accesos a DBs/UDTs
# Quizás solo reemplazar si está dentro de comillas? Más complejo.
# Por ahora, mantenemos el reemplazo simple, asumiendo nombres de bloque limpios.
name = name.replace('"."', "_").replace(
".", "_"
) # <-- REVISAR SI ESTO ES SEGURO PARA TODOS LOS NOMBRES
# Quita comillas restantes (si las hubiera)
name = name.replace('"', "")
# Asegurarse de que no empiece con número
if name and name[0].isdigit():
name = "_" + name
# Devolver entre comillas si el nombre original las tenía o contiene caracteres no estándar
# (Simplificación: por ahora no añadimos comillas automáticas aquí)
return name
def generate_temp_var_name(network_id, instr_uid, pin_name): def generate_temp_var_name(network_id, instr_uid, pin_name):
net_id_clean = str(network_id).replace("-", "_") net_id_clean = str(network_id).replace("-", "_")
instr_uid_clean = str(instr_uid).replace("-", "_") instr_uid_clean = str(instr_uid).replace("-", "_")
@ -566,6 +589,82 @@ def process_o(instruction, network_id, scl_map, access_map):
return True return True
def process_call(instruction, network_id, scl_map, access_map):
"""Traduce una llamada a FC/FB a SCL."""
instr_uid = instruction["instruction_uid"]
instr_type = instruction["type"] # Será 'Call'
if instruction.get("type", "").endswith(SCL_SUFFIX) or "_error" in instruction.get(
"type", ""
):
return False # Usar get con default
block_name_scl = format_variable_name(
instruction.get("block_name", "UnknownCall")
) # Limpiar nombre
block_type = instruction.get("block_type")
instance_db = instruction.get("instance_db") # Será None para FCs
# print(f"DEBUG: Intentando procesar CALL - UID: {instr_uid} Block: {block_name_scl}")
# --- Manejo de EN ---
en_input = instruction["inputs"].get("en")
en_scl = (
get_scl_representation(en_input, network_id, scl_map, access_map)
if en_input
else "TRUE"
)
if en_scl is None:
# print(f"DEBUG: Dependencia EN no resuelta para CALL UID: {instr_uid}")
return False
# --- Fin Manejo de EN ---
# --- Construcción de la Llamada SCL ---
scl_call_params = []
# TODO: Procesar otros parámetros de entrada/salida si existieran
# para FC/FB con parámetros, se necesitaría iterar sobre instruction['inputs']
# y instruction['outputs'] buscando conexiones a pines específicos de la llamada.
scl_final_call = ""
if block_type == "FB" and instance_db:
# Llamada a FB con DB de instancia
scl_final_call = (
f"{instance_db}();" # Simplificado, añadir parámetros si los hay
)
elif block_type == "FC":
# Llamada a FC
scl_final_call = (
f"{block_name_scl}();" # Simplificado, añadir parámetros si los hay
)
else:
print(
f"Advertencia: Tipo de bloque no soportado para Call UID {instr_uid}: {block_type}"
)
scl_final_call = f"// ERROR: Call a bloque no soportado: {block_name_scl}"
instruction["type"] += "_error" # Marcar como error parcial
# --- Aplicar Condición EN ---
scl_final = ""
if en_scl != "TRUE":
# Indentar la llamada dentro del IF
indented_call = "\n".join([f" {line}" for line in scl_final_call.splitlines()])
scl_final = f"IF {en_scl} THEN\n{indented_call}\nEND_IF;"
else:
scl_final = scl_final_call
# --- Actualizar JSON y Mapa SCL ---
instruction["scl"] = scl_final
# Cambiar el tipo para marcar como procesado
instruction["type"] = f"Call_{block_type}_scl" # Ej: Call_FC_scl
# Actualizar scl_map con el estado ENO (igual a EN para llamadas simples)
map_key_eno = (network_id, instr_uid, "eno")
scl_map[map_key_eno] = en_scl
# print(f"INFO: Call UID: {instr_uid} procesado. SCL: {scl_final.splitlines()[0]}...")
return True
# --- NUEVO: Procesador de Agrupación (Refinado) --- # --- NUEVO: Procesador de Agrupación (Refinado) ---
def process_group_ifs(instruction, network_id, scl_map, access_map): def process_group_ifs(instruction, network_id, scl_map, access_map):
""" """
@ -753,6 +852,7 @@ def process_json_to_scl(json_filepath):
process_pbox, process_pbox,
process_add, process_add,
process_move, process_move,
process_call,
process_coil, process_coil,
] ]
processor_map = { processor_map = {
@ -842,7 +942,7 @@ def process_json_to_scl(json_filepath):
print(f"\n--- Límite de {max_passes} pases alcanzado. ---") print(f"\n--- Límite de {max_passes} pases alcanzado. ---")
# --- Guardar JSON Final --- # --- Guardar JSON Final ---
output_filename = json_filepath.replace(".json", "_scl_processed.json") output_filename = json_filepath.replace(".json", "_processed.json")
print(f"\nGuardando JSON procesado en: {output_filename}") print(f"\nGuardando JSON procesado en: {output_filename}")
try: try:
with open(output_filename, "w", encoding="utf-8") as f: with open(output_filename, "w", encoding="utf-8") as f:
@ -854,5 +954,8 @@ def process_json_to_scl(json_filepath):
# --- Ejecución --- # --- Ejecución ---
if __name__ == "__main__": if __name__ == "__main__":
input_json_file = "BlenderCtrl_ProdModeInit_simplified.json" # Asegúrate que este es el generado por x1_to_json.py MODIFICADO xml_file = "BlenderCtrl__Main.xml" # CAMBIAR AL NUEVO ARCHIVO XML
input_json_file = xml_file.replace(
".xml", "_simplified.json"
) # Nombre de salida dinámico
process_json_to_scl(input_json_file) process_json_to_scl(input_json_file)

View File

@ -196,7 +196,13 @@ def generate_scl(processed_json_filepath, output_scl_filepath):
# --- Ejecución --- # --- Ejecución ---
if __name__ == "__main__": if __name__ == "__main__":
input_json_file = 'BlenderRun_ProdTime_simplified_scl_processed.json' # Usar el JSON procesado
output_scl_file = input_json_file.replace('_simplified_scl_processed.json', '.scl') # Nombre de salida xml_file = "BlenderCtrl__Main.xml" # CAMBIAR AL NUEVO ARCHIVO XML
input_json_file = xml_file.replace(
".xml", "_simplified_processed.json"
) # Nombre de salida dinámico
output_scl_file = input_json_file.replace(
".json", ".scl"
) # Nombre de salida dinámico
generate_scl(input_json_file, output_scl_file) generate_scl(input_json_file, output_scl_file)