Simatic_XML_Parser_to_SCL/x1_to_json.py

1653 lines
67 KiB
Python

# -*- coding: utf-8 -*-
import json
import argparse
import os
import re
from lxml import etree
import traceback
from collections import defaultdict
import copy # Importar copy para deepcopy
# --- Namespaces ---
ns = {
"iface": "http://www.siemens.com/automation/Openness/SW/Interface/v5",
"flg": "http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4",
"st": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StructuredText/v3",
"stl": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StatementList/v4",
}
# --- (Funciones helper SIN CAMBIOS: get_multilingual_text, get_symbol_name, parse_access, parse_part, parse_call, reconstruct_scl_from_tokens, etc.) ---
# --- (Incluye aquí tus funciones helper sin cambios) ---
def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"):
# (Sin cambios respecto a la versión anterior)
if element is None:
return ""
try:
# Intenta buscar el idioma por defecto
xpath_expr_default = f".//iface:MultilingualTextItem[iface:AttributeList/iface:Culture='{default_lang}']/iface:AttributeList/iface:Text"
text_items_default = element.xpath(xpath_expr_default, namespaces=ns)
if text_items_default and text_items_default[0].text is not None:
return text_items_default[0].text.strip()
# Intenta buscar el idioma de fallback
xpath_expr_fallback = f".//iface:MultilingualTextItem[iface:AttributeList/iface:Culture='{fallback_lang}']/iface:AttributeList/iface:Text"
text_items_fallback = element.xpath(xpath_expr_fallback, namespaces=ns)
if text_items_fallback and text_items_fallback[0].text is not None:
return text_items_fallback[0].text.strip()
# Si no encuentra ninguno, toma el primer texto que encuentre
xpath_expr_any = ".//iface:MultilingualTextItem/iface:AttributeList/iface:Text"
text_items_any = element.xpath(xpath_expr_any, namespaces=ns)
if text_items_any and text_items_any[0].text is not None:
return text_items_any[0].text.strip()
# Fallback si MultilingualText está vacío o tiene una estructura inesperada
return ""
except Exception as e:
print(f"Advertencia: Error extrayendo MultilingualText: {e}")
# traceback.print_exc() # Descomentar para más detalles del error
return ""
def get_symbol_name(symbol_element):
# Adaptado para usar namespace flg
if symbol_element is None:
return None
try:
# Asume que Component está dentro de Symbol y ambos están en el namespace flg
components = symbol_element.xpath("./flg:Component/@Name", namespaces=ns)
# Formatear correctamente con comillas dobles
return ".".join(f'"{c}"' for c in components) if components else None
except Exception as e:
print(f"Advertencia: Excepción en get_symbol_name: {e}")
return None
def parse_access(access_element):
# Adaptado para usar namespace flg
if access_element is None:
return None
uid = access_element.get("UId")
scope = access_element.get("Scope")
info = {"uid": uid, "scope": scope, "type": "unknown"}
# Buscar Symbol o Constant usando el namespace flg
symbol = access_element.xpath("./flg:Symbol", namespaces=ns)
constant = access_element.xpath("./flg:Constant", namespaces=ns)
if symbol:
info["type"] = "variable"
# Llamar a get_symbol_name que ahora espera flg:Symbol
info["name"] = get_symbol_name(symbol[0])
if info["name"] is None:
info["type"] = "error_parsing_symbol"
print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}")
return info
elif constant:
info["type"] = "constant"
# Buscar ConstantType y ConstantValue usando el namespace flg
const_type_elem = constant[0].xpath("./flg:ConstantType", namespaces=ns)
const_val_elem = constant[0].xpath("./flg:ConstantValue", namespaces=ns)
# Extraer texto
info["datatype"] = (
const_type_elem[0].text.strip()
if const_type_elem and const_type_elem[0].text is not None
else "Unknown"
)
value_str = (
const_val_elem[0].text.strip()
if const_val_elem and const_val_elem[0].text is not None
else None
)
if value_str is None:
info["type"] = "error_parsing_constant"
info["value"] = None
print(f"Error: Constante sin valor Access UID={uid}")
return info
# Inferir tipo si es Unknown (igual que antes)
if info["datatype"] == "Unknown":
val_lower = value_str.lower()
if val_lower in ["true", "false"]:
info["datatype"] = "Bool"
elif value_str.isdigit() or (
value_str.startswith("-") and value_str[1:].isdigit()
):
info["datatype"] = "Int"
elif "." in value_str:
try:
float(value_str)
info["datatype"] = "Real"
except ValueError:
pass
elif "#" in value_str:
info["datatype"] = "TypedConstant"
info["value"] = value_str # Guardar valor original
# Intentar conversión numérica/booleana (igual que antes)
dtype_lower = info["datatype"].lower()
val_str_processed = value_str.split("#")[-1] if "#" in value_str else value_str
try:
if dtype_lower in [
"int",
"dint",
"udint",
"sint",
"usint",
"lint",
"ulint",
"word",
"dword",
"lword",
"byte",
]:
info["value"] = int(val_str_processed)
elif dtype_lower == "bool":
info["value"] = (
val_str_processed.lower() == "true" or val_str_processed == "1"
)
elif dtype_lower in ["real", "lreal"]:
info["value"] = float(val_str_processed)
# Mantener string para TypedConstant y otros
except (ValueError, TypeError) as e:
print(
f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}"
)
info["value"] = value_str # Mantener string original
else:
info["type"] = "unknown_structure"
print(f"Advertencia: Access UID={uid} no es Symbol ni Constant.")
return info
if info["type"] == "variable" and info.get("name") is None:
print(f"Error Interno: parse_access var sin nombre UID {uid}.")
info["type"] = "error_no_name"
return info
return info
def parse_part(part_element):
# Asume que Part está en namespace flg
if part_element is None:
return None
uid = part_element.get("UId")
name = part_element.get("Name")
if not uid or not name:
print(
f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}"
)
return None
template_values = {}
try:
# TemplateValue parece NO tener namespace flg
for tv in part_element.xpath("./TemplateValue"):
tv_name = tv.get("Name")
tv_type = tv.get("Type")
if tv_name and tv_type:
template_values[tv_name] = tv_type
except Exception as e:
print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}")
negated_pins = {}
try:
# Negated parece NO tener namespace flg
for negated_elem in part_element.xpath("./Negated"):
negated_pin_name = negated_elem.get("Name")
if negated_pin_name:
negated_pins[negated_pin_name] = True
except Exception as e:
print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}")
return {
"uid": uid,
"type": name,
"template_values": template_values,
"negated_pins": negated_pins,
}
def parse_call(call_element):
# Asume que Call está en namespace flg
if call_element is None:
return None
uid = call_element.get("UId")
if not uid:
print(
f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"
)
return None
# << CORRECCIÓN: CallInfo y sus hijos están en el namespace por defecto (flg) >>
call_info_elem = call_element.xpath("./flg:CallInfo", namespaces=ns)
if not call_info_elem:
print(f"Error: Call UID {uid} sin elemento flg:CallInfo.")
# Intentar sin namespace como fallback por si acaso
call_info_elem_no_ns = call_element.xpath("./CallInfo")
if not call_info_elem_no_ns:
print(
f"Error: Call UID {uid} sin elemento CallInfo (probado sin NS tambien)."
)
return None
else:
# Si se encontró sin NS, usar ese (menos probable pero posible)
print(f"Advertencia: Call UID {uid} encontró CallInfo SIN namespace.")
call_info = call_info_elem_no_ns[0]
else:
call_info = call_info_elem[0] # Usar el encontrado con namespace
block_name = call_info.get("Name")
block_type = call_info.get("BlockType")
if not block_name or not block_type:
print(f"Error: CallInfo para UID {uid} sin Name o BlockType.")
return None
instance_name = None
instance_scope = None
# Buscar Instance y Component (que también deberían estar en namespace flg)
if block_type == "FB":
instance_elem_list = call_info.xpath("./flg:Instance", namespaces=ns)
if instance_elem_list:
instance_elem = instance_elem_list[0]
instance_scope = instance_elem.get("Scope")
# Buscar Component dentro de Instance
component_elem_list = instance_elem.xpath("./flg:Component", namespaces=ns)
if component_elem_list:
component_elem = component_elem_list[0]
db_name_raw = component_elem.get("Name")
if db_name_raw:
instance_name = f'"{db_name_raw}"' # Añadir comillas
else:
print(
f"Advertencia: <flg:Component> en <flg:Instance> FB Call UID {uid} sin 'Name'."
)
else:
print(
f"Advertencia: No se encontró <flg:Component> en <flg:Instance> FB Call UID {uid}."
)
else:
print(f"Advertencia: FB Call '{block_name}' UID {uid} sin <flg:Instance>.")
call_data = {
"uid": uid,
"type": "Call",
"block_name": block_name,
"block_type": block_type,
}
if instance_name:
call_data["instance_db"] = instance_name
if instance_scope:
call_data["instance_scope"] = instance_scope
return call_data
def reconstruct_scl_from_tokens(st_node):
"""
Reconstruye SCL desde <StructuredText>, mejorando el manejo de
variables, constantes literales, tokens básicos, espacios y saltos de línea.
"""
if st_node is None:
return "// Error: StructuredText node not found.\n"
scl_parts = []
children = st_node.xpath("./st:*", namespaces=ns)
for elem in children:
tag = etree.QName(elem.tag).localname
if tag == "Token":
scl_parts.append(elem.get("Text", ""))
elif tag == "Blank":
if not scl_parts or not scl_parts[-1].endswith(" "):
scl_parts.append(" " * int(elem.get("Num", 1)))
elif int(elem.get("Num", 1)) > 1:
scl_parts.append(" " * (int(elem.get("Num", 1)) - 1))
elif tag == "NewLine":
if scl_parts:
scl_parts[-1] = scl_parts[-1].rstrip()
scl_parts.append("\n")
elif tag == "Access":
scope = elem.get("Scope")
access_str = f"/*_ERR_Scope_{scope}_*/"
if scope in [
"GlobalVariable",
"LocalVariable",
"TempVariable",
"InOutVariable",
"InputVariable",
"OutputVariable",
"ConstantVariable",
]:
symbol_elem = elem.xpath("./st:Symbol", namespaces=ns)
if symbol_elem:
components = symbol_elem[0].xpath("./st:Component", namespaces=ns)
symbol_text_parts = []
for i, comp in enumerate(components):
name = comp.get("Name", "_ERR_COMP_")
if i > 0:
symbol_text_parts.append(".")
# Check for HasQuotes attribute (adjust namespace if needed)
has_quotes_elem = comp.xpath(
"ancestor::st:Access/st:BooleanAttribute[@Name='HasQuotes']/text()",
namespaces=ns,
) # Check attribute on Access parent
# print(f"DEBUG HasQuotes check for {name}: {has_quotes_elem}") # Debug
has_quotes = (
has_quotes_elem and has_quotes_elem[0].lower() == "true"
)
is_temp = name.startswith("#")
# Apply quotes based on HasQuotes or if it's the first component and not temp
if has_quotes or (
i == 0 and not is_temp and '"' not in name
): # Avoid double quotes
symbol_text_parts.append(f'"{name}"')
else:
symbol_text_parts.append(name)
index_access = comp.xpath("./st:Access", namespaces=ns)
if index_access:
indices_text = [
reconstruct_scl_from_tokens(idx_node)
for idx_node in index_access
]
symbol_text_parts.append(f"[{','.join(indices_text)}]")
access_str = "".join(symbol_text_parts)
elif scope == "LiteralConstant":
constant_elem = elem.xpath("./st:Constant", namespaces=ns)
if constant_elem:
val_elem = constant_elem[0].xpath(
"./st:ConstantValue/text()", namespaces=ns
)
type_elem = constant_elem[0].xpath(
"./st:ConstantType/text()", namespaces=ns
)
const_type = (
type_elem[0].strip()
if type_elem and type_elem[0] is not None
else ""
)
const_val = (
val_elem[0].strip()
if val_elem and val_elem[0] is not None
else "_ERR_CONSTVAL_"
)
# Format based on type
if const_type.lower() == "bool":
access_str = const_val.upper()
elif const_type.lower() == "string":
replaced_val = const_val.replace("'", "''")
access_str = f"'{replaced_val}'"
elif const_type.lower() == "char":
replaced_val = const_val.replace("'", "''")
access_str = f"'{replaced_val}'"
elif const_type.lower() == "time":
access_str = f"T#{const_val}"
elif const_type.lower() == "ltime":
access_str = f"LT#{const_val}"
elif const_type.lower() == "s5time":
access_str = f"S5T#{const_val}"
elif const_type.lower() == "date":
access_str = f"D#{const_val}"
elif const_type.lower() == "dtl":
access_str = f"DTL#{const_val}"
elif const_type.lower() == "dt":
access_str = f"DT#{const_val}"
elif const_type.lower() == "tod":
access_str = f"TOD#{const_val}"
else:
access_str = const_val # For Int, Real, etc.
else:
access_str = "/*_ERR_NOCONST_*/"
# Add more scope handling if needed
scl_parts.append(access_str)
elif tag == "Comment" or tag == "LineComment":
# Corrected comment extraction using get_multilingual_text
comment_text = get_multilingual_text(
elem
) # Pass the <Comment> or <LineComment> element itself
if tag == "Comment":
scl_parts.append(f"(* {comment_text} *)")
else:
scl_parts.append(f"// {comment_text}")
full_scl = "".join(scl_parts)
# Re-indentation (simple approach)
output_lines = []
indent_level = 0
indent_str = " " # Two spaces per indent level
for line in full_scl.split("\n"):
trimmed_line = line.strip()
if not trimmed_line:
# output_lines.append("") # Keep empty lines for spacing? Optional.
continue
# Adjust indent before processing line
if trimmed_line.startswith(("END_", "UNTIL", "ELSE", "ELSIF")):
indent_level = max(0, indent_level - 1)
output_lines.append(indent_str * indent_level + trimmed_line)
# Adjust indent after processing line
if (
trimmed_line.endswith(("THEN", "DO", "OF"))
or trimmed_line == "ELSE"
or trimmed_line.startswith("FOR")
or trimmed_line.startswith("WHILE")
or trimmed_line.startswith("CASE")
or trimmed_line.startswith("REPEAT")
):
indent_level += 1
# Handle BEGIN for block structures if necessary (more complex)
return "\n".join(output_lines)
# STL Parser (using namespace stl)
def get_access_text(access_element):
"""Reconstruye una representación textual simple de un Access en STL."""
if access_element is None:
return "_ERR_ACCESS_"
scope = access_element.get("Scope")
symbol_elem = access_element.xpath("./stl:Symbol", namespaces=ns)
if symbol_elem:
components = symbol_elem[0].xpath("./stl:Component", namespaces=ns)
parts = []
for i, comp in enumerate(components):
name = comp.get("Name", "_ERR_COMP_")
# Check for HasQuotes attribute (usually on Access, check parent?)
has_quotes_elem = comp.xpath(
"ancestor::stl:Access/stl:BooleanAttribute[@Name='HasQuotes']/text()",
namespaces=ns,
)
has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true"
is_temp = name.startswith("#")
if i > 0:
parts.append(".") # Add dot separator
if has_quotes or (i == 0 and not is_temp and '"' not in name):
parts.append(f'"{name}"')
else:
parts.append(name)
index_access = comp.xpath("./stl:Access", namespaces=ns)
if index_access:
indices = [get_access_text(ia) for ia in index_access]
parts.append(f"[{','.join(indices)}]")
return "".join(parts)
constant_elem = access_element.xpath("./stl:Constant", namespaces=ns)
if constant_elem:
val_elem = constant_elem[0].xpath("./stl:ConstantValue/text()", namespaces=ns)
type_elem = constant_elem[0].xpath("./stl:ConstantType/text()", namespaces=ns)
const_type = (
type_elem[0].strip() if type_elem and type_elem[0] is not None else ""
)
const_val = (
val_elem[0].strip()
if val_elem and val_elem[0] is not None
else "_ERR_CONST_"
)
if const_type.lower() == "time":
return f"T#{const_val}"
if const_type.lower() == "s5time":
return f"S5T#{const_val}"
if const_type.lower() == "date":
return f"D#{const_val}"
if const_type.lower() == "dt":
return f"DT#{const_val}" # Added DT
# Add more type prefixes if needed (LTIME, TOD, DTL...)
return const_val
label_elem = access_element.xpath("./stl:Label", namespaces=ns)
if label_elem:
return label_elem[0].get("Name", "_ERR_LABEL_")
indirect_elem = access_element.xpath("./stl:Indirect", namespaces=ns)
if indirect_elem:
reg = indirect_elem[0].get("Register", "AR?")
offset_str = indirect_elem[0].get("BitOffset", "0")
area = indirect_elem[0].get("Area", "DB")
width = indirect_elem[0].get("Width", "X")
try:
bit_offset = int(offset_str)
byte_offset = bit_offset // 8
bit_in_byte = bit_offset % 8
p_format_offset = f"P#{byte_offset}.{bit_in_byte}"
except ValueError:
p_format_offset = "P#?.?"
width_map = {
"Bit": "X",
"Byte": "B",
"Word": "W",
"Double": "D",
"Long": "D",
} # Added Long->D
width_char = width_map.get(width, width[0] if width else "?")
return f"{area}{width_char}[{reg},{p_format_offset}]"
address_elem = access_element.xpath("./stl:Address", namespaces=ns)
if address_elem:
area = address_elem[0].get("Area", "??")
bit_offset_str = address_elem[0].get("BitOffset", "0")
addr_type_str = address_elem[0].get("Type", "Bool")
try:
bit_offset = int(bit_offset_str)
byte_offset = bit_offset // 8
bit_in_byte = bit_offset % 8
addr_width = "X" # Default
if addr_type_str == "Byte":
addr_width = "B"
elif addr_type_str == "Word":
addr_width = "W"
elif addr_type_str in ["DWord", "DInt", "Real", "Time", "DT"]:
addr_width = "D" # Added types
elif addr_type_str in ["LReal", "LTime", "LWord", "LInt", "ULInt"]:
addr_width = "L" # Handle 64-bit? Assume L? Needs check.
area_map = {
"Input": "I",
"Output": "Q",
"Memory": "M",
"PeripheryInput": "PI",
"PeripheryOutput": "PQ",
"DB": "DB",
"DI": "DI",
"Local": "L",
"Timer": "T",
"Counter": "C",
}
stl_area = area_map.get(area, area)
if stl_area in ["DB", "DI"]:
block_num = address_elem[0].get("BlockNumber")
if block_num:
return f"{stl_area}{block_num}.{stl_area}{addr_width}{byte_offset}.{bit_in_byte}"
else:
return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Register access DBX, DIX etc.
elif stl_area in ["T", "C"]:
return f"{stl_area}{byte_offset}" # T 5, C 10
else:
return (
f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # M10.1, I0.0
)
except ValueError:
return f"{area}?{bit_offset_str}?"
call_info_elem = access_element.xpath("./stl:CallInfo", namespaces=ns)
if call_info_elem: # Handle Call as operand (e.g., CALL FC10)
name = call_info_elem[0].get("Name", "_ERR_CALL_")
btype = call_info_elem[0].get("BlockType", "FC")
instance_node = call_info_elem[0].xpath(
"./stl:Instance/stl:Component/@Name", namespaces=ns
)
if btype == "FB" and instance_node:
return f'"{instance_node[0]}"' # Return DB name for FB call operand
else:
return f'{btype} "{name}"' # Return FC "Name" or similar
return f"_{scope}_?" # Fallback
def get_comment_text_stl(comment_element):
"""Extrae texto de un LineComment o Comment para STL."""
if comment_element is None:
return ""
# STL Comments are directly under the element, not usually Multilingual
text_nodes = comment_element.xpath("./stl:Text/text()", namespaces=ns)
if text_nodes:
return text_nodes[0].strip()
# Fallback if structure is different
# return "".join(comment_element.xpath(".//text()")).strip()
return "" # Return empty if no <Text> found
def reconstruct_stl_from_statementlist(statement_list_node):
"""Reconstruye el código STL como una cadena de texto desde <StatementList>."""
if statement_list_node is None:
return "// Error: StatementList node not found.\n"
stl_lines = []
statements = statement_list_node.xpath("./stl:StlStatement", namespaces=ns)
for stmt in statements:
line_parts = []
inline_comment = "" # Comments after code on the same line
# 1. Initial Comments (full line //)
initial_comments = stmt.xpath(
"child::stl:Comment[not(@Inserted='true')] | child::stl:LineComment[not(@Inserted='true')]",
namespaces=ns,
)
for comm in initial_comments:
comment_text = get_comment_text_stl(comm)
if comment_text:
for comment_line in comment_text.splitlines():
stl_lines.append(
f"// {comment_line}"
) # Add as separate comment lines
# 2. Label
label_decl = stmt.xpath("./stl:LabelDeclaration", namespaces=ns)
label_str = ""
if label_decl:
label_name = label_decl[0].xpath("./stl:Label/@Name", namespaces=ns)
if label_name:
label_str = f"{label_name[0]}:"
# Get comments after label but before instruction
label_comments = label_decl[0].xpath(
"./stl:Comment[@Inserted='true'] | ./stl:LineComment[@Inserted='true']",
namespaces=ns,
)
for lcomm in label_comments:
inline_comment += f" // {get_comment_text_stl(lcomm)}"
if label_str:
line_parts.append(label_str)
# 3. Instruction Token
instruction_token = stmt.xpath("./stl:StlToken", namespaces=ns)
instruction_str = ""
if instruction_token:
token_text = instruction_token[0].get("Text", "_ERR_TOKEN_")
# Check if it's an empty line marker
if token_text == "EMPTY_LINE":
stl_lines.append("") # Add an empty line
continue # Skip rest of processing for this statement
elif token_text == "COMMENT": # Handle full-line comment marker if needed
pass # Already handled by initial comments? Check XML example.
else:
instruction_str = token_text
# Comments directly associated with the token
token_comments = instruction_token[0].xpath(
"./stl:Comment[@Inserted='true'] | ./stl:LineComment[@Inserted='true']",
namespaces=ns,
)
for tcomm in token_comments:
inline_comment += f" // {get_comment_text_stl(tcomm)}"
if instruction_str:
# Add tab if label exists
line_parts.append("\t" + instruction_str if label_str else instruction_str)
# 4. Access/Operand
access_elem = stmt.xpath("./stl:Access", namespaces=ns)
access_str = ""
if access_elem:
access_text = get_access_text(access_elem[0])
access_str = access_text
# Comments inside Access (can be block or line)
access_comments = access_elem[0].xpath(
"child::stl:Comment[@Inserted='true'] | child::stl:LineComment[@Inserted='true']",
namespaces=ns,
)
for acc_comm in access_comments:
inline_comment += f" // {get_comment_text_stl(acc_comm)}"
if access_str:
line_parts.append(access_str)
# Build the line
current_line = " ".join(line_parts) # Join parts with space
if inline_comment:
current_line += f"\t{inline_comment.strip()}" # Add comment with tab
if current_line.strip():
stl_lines.append(current_line.rstrip())
return "\n".join(stl_lines)
# DB Parser (using namespace iface)
def parse_interface_members(member_elements):
"""
Parsea recursivamente una lista de elementos <Member> de una interfaz o estructura.
Maneja miembros simples, structs anidados y arrays con valores iniciales.
"""
members_data = []
if not member_elements:
return members_data
for member in member_elements:
member_name = member.get("Name")
member_dtype = member.get("Datatype")
member_remanence = member.get("Remanence", "NonRetain")
member_accessibility = member.get("Accessibility", "Public")
if not member_name or not member_dtype:
print(
"Advertencia: Miembro sin nombre o tipo de dato encontrado. Saltando."
)
continue
member_info = {
"name": member_name,
"datatype": member_dtype,
"remanence": member_remanence,
"accessibility": member_accessibility,
"start_value": None,
"comment": None,
"children": [],
"array_elements": {},
}
comment_node = member.xpath("./iface:Comment", namespaces=ns)
if comment_node:
member_info["comment"] = get_multilingual_text(comment_node[0])
start_value_node = member.xpath("./iface:StartValue", namespaces=ns)
if start_value_node:
constant_name = start_value_node[0].get("ConstantName")
member_info["start_value"] = (
constant_name
if constant_name
else (
start_value_node[0].text
if start_value_node[0].text is not None
else ""
)
)
# --- Structs Anidados ---
nested_sections = member.xpath(
"./iface:Sections/iface:Section/iface:Member", namespaces=ns
)
if nested_sections:
member_info["children"] = parse_interface_members(nested_sections)
# --- Arrays ---
if isinstance(member_dtype, str) and member_dtype.lower().startswith("array["):
subelements = member.xpath("./iface:Subelement", namespaces=ns)
for sub in subelements:
path = sub.get("Path") # Path is usually the index '0', '1', ...
sub_start_value_node = sub.xpath("./iface:StartValue", namespaces=ns)
if path and sub_start_value_node:
constant_name = sub_start_value_node[0].get("ConstantName")
value = (
constant_name
if constant_name
else (
sub_start_value_node[0].text
if sub_start_value_node[0].text is not None
else ""
)
)
member_info["array_elements"][path] = value
# Optionally parse subelement comments if needed
members_data.append(member_info)
return members_data
# --- Main Network Parsing Function ---
def parse_network(network_element):
"""
Parsea una red LAD/FBD, extrae lógica y añade conexiones EN implícitas.
Devuelve None o un diccionario con 'error' si falla FlgNet.
"""
if network_element is None:
return {
"id": "ERROR",
"title": "Invalid Network Element",
"logic": [],
"error": "Input element was None",
}
network_id = network_element.get("ID")
title_element = network_element.xpath(
".//iface:MultilingualText[@CompositionName='Title']", namespaces=ns
)
network_title = (
get_multilingual_text(title_element[0])
if title_element
else f"Network {network_id}"
)
comment_element = network_element.xpath(
"./ObjectList/MultilingualText[@CompositionName='Comment']", namespaces=ns
) # Corrected path?
network_comment = (
get_multilingual_text(comment_element[0]) if comment_element else ""
)
# Buscar NetworkSource y luego FlgNet (ambos usan namespace flg)
network_source_node = network_element.xpath(".//flg:NetworkSource", namespaces=ns)
if not network_source_node:
# Try finding FlgNet directly under CompileUnit if NetworkSource is missing (less common)
flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns)
if not flgnet_list:
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": [],
"error": "NetworkSource/FlgNet not found",
}
else:
flgnet = flgnet_list[0]
else:
flgnet_list = network_source_node[0].xpath("./flg:FlgNet", namespaces=ns)
if not flgnet_list:
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": [],
"error": "FlgNet not found inside NetworkSource",
}
else:
flgnet = flgnet_list[0]
# 1. Parse Access, Parts, Calls (use namespace flg)
access_map = {
acc_info["uid"]: acc_info
for acc in flgnet.xpath(".//flg:Access", namespaces=ns)
if (acc_info := parse_access(acc)) and acc_info["type"] != "unknown"
}
parts_and_calls_map = {}
instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns)
for element in instruction_elements:
parsed_info = None
tag_name = etree.QName(element.tag).localname
if tag_name == "Part":
parsed_info = parse_part(element)
elif tag_name == "Call":
parsed_info = parse_call(element) # parse_call ahora busca flg:CallInfo
if parsed_info and "uid" in parsed_info:
# Verifica si parse_call tuvo éxito (si no, devuelve None)
if tag_name == "Call" and parsed_info is None:
print(
f"Advertencia: Falló el parseo de Call UID={element.get('UId')}. Ignorando."
)
continue # Saltar esta instrucción si parse_call falló
parts_and_calls_map[parsed_info["uid"]] = parsed_info
elif tag_name == "Call" and parsed_info is None:
# Si parse_call devolvió None directamente
print(
f"Advertencia: Part/Call inválido ignorado en red {network_id} (UID={element.get('UId')})"
)
# 2. Parse Wires (use namespace flg)
wire_connections = defaultdict(list)
source_connections = defaultdict(list)
eno_outputs = defaultdict(list)
qname_powerrail = etree.QName(ns["flg"], "Powerrail")
qname_identcon = etree.QName(ns["flg"], "IdentCon")
qname_namecon = etree.QName(ns["flg"], "NameCon")
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
children = wire.getchildren()
if len(children) < 2:
continue
source_elem = children[0]
source_uid, source_pin = None, None
if source_elem.tag == qname_powerrail:
source_uid, source_pin = "POWERRAIL", "out"
elif source_elem.tag == qname_identcon:
source_uid, source_pin = (
source_elem.get("UId"),
"value",
) # IdentCon usually represents an Access node output
elif source_elem.tag == qname_namecon:
source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name")
if source_uid is None:
continue
source_info = (source_uid, source_pin)
for dest_elem in children[1:]:
dest_uid, dest_pin = None, None
# Destination can also be IdentCon (Access node input) or NameCon (Instruction pin input)
if dest_elem.tag == qname_identcon:
dest_uid, dest_pin = (
dest_elem.get("UId"),
"value",
) # Input to an Access node? Unlikely. Usually NameCon. Let's assume NameCon primarily for destination.
elif dest_elem.tag == qname_namecon:
dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name")
if dest_uid is not None and dest_pin is not None:
dest_key = (dest_uid, dest_pin)
# Check if dest_uid is an instruction or an access node
# This logic seems okay, maps source to destination key
if source_info not in wire_connections[dest_key]:
wire_connections[dest_key].append(source_info)
# Build reverse map: source -> list of destinations
source_key = (source_uid, source_pin)
dest_info = (dest_uid, dest_pin)
if dest_info not in source_connections[source_key]:
source_connections[source_key].append(dest_info)
# Track ENO outputs specifically
if source_pin == "eno" and source_uid in parts_and_calls_map:
if dest_info not in eno_outputs[source_uid]:
eno_outputs[source_uid].append(dest_info)
# 3. Build Initial Logic Structure
all_logic_steps = {}
SCL_SUFFIX = "_sympy_processed" # Define suffix
functional_block_types = [
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call",
"Se",
"Sd",
"BLKMOV",
"TON",
"TOF",
"TP",
"CTU",
"CTD",
"CTUD",
]
rlo_generators = [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"And",
"Xor",
"PBox",
"NBox",
"Not",
]
# --- CORRECCIÓN: Iterar sobre los UIDs que SÍ están en parts_and_calls_map ---
# --- Esto evita procesar UIDs de Calls que fallaron en parse_call ---
valid_instruction_uids = list(parts_and_calls_map.keys())
for instruction_uid in valid_instruction_uids:
instruction_info = parts_and_calls_map[instruction_uid]
# Make a deep copy to avoid modifying the original map entry
instruction_repr = copy.deepcopy(instruction_info)
instruction_repr["instruction_uid"] = instruction_uid # Ensure UID is present
instruction_repr["inputs"] = {}
instruction_repr["outputs"] = {}
original_type = instruction_repr["type"] # Type from parse_part/parse_call
current_type = original_type
input_pin_mapping = {}
output_pin_mapping = {}
# Base set of possible pins - can be expanded
possible_input_pins = set(["en", "in", "in1", "in2", "pre"])
# Dynamically add pins based on instruction type (simplified list)
if original_type in ["Contact", "Coil", "SCoil", "RCoil"]:
possible_input_pins.add("operand")
elif original_type in [
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
]:
possible_input_pins.update(["in1", "in2"])
elif original_type in ["TON", "TOF", "TP", "Se", "Sd", "SdCoil"]:
possible_input_pins.update(
["s", "tv", "r", "timer", "pt", "value", "operand"]
)
elif original_type in ["CTU", "CTD", "CTUD"]:
possible_input_pins.update(["cu", "cd", "r", "ld", "pv", "counter"])
elif original_type in ["PBox", "NBox"]:
possible_input_pins.update(["bit", "clk"])
elif original_type == "BLKMOV":
possible_input_pins.add("SRCBLK")
# Special Handling for Call Parameters
elif original_type == "Call":
# Find the original XML element for this call using the correct namespace
call_xml_element_list = flgnet.xpath(
f".//flg:Call[@UId='{instruction_uid}']", namespaces=ns
)
if call_xml_element_list:
call_xml_element = call_xml_element_list[0]
# --- USAR flg:CallInfo y flg:Parameter ---
call_info_node_list = call_xml_element.xpath(
"./flg:CallInfo", namespaces=ns
)
if call_info_node_list:
call_info_node = call_info_node_list[0]
call_param_names = call_info_node.xpath(
"./flg:Parameter/@Name", namespaces=ns
)
if call_param_names:
possible_input_pins.update(call_param_names)
# print(f"DEBUG Call UID={instruction_uid}: Found params: {call_param_names}. Possible pins now: {possible_input_pins}")
else:
print(
f"Advertencia: Call UID={instruction_uid}: No <flg:Parameter> tags found under <flg:CallInfo>."
)
else:
# Try without namespace as fallback? Unlikely needed if flg is default
call_info_node_list_no_ns = call_xml_element.xpath("./CallInfo")
if call_info_node_list_no_ns:
print(
f"Advertencia: Call UID={instruction_uid}: Found <CallInfo> WITHOUT namespace."
)
call_param_names_no_ns = call_info_node_list_no_ns[0].xpath(
"./Parameter/@Name"
)
if call_param_names_no_ns:
possible_input_pins.update(call_param_names_no_ns)
print(
f"DEBUG Call UID={instruction_uid} (no NS): Found params: {call_param_names_no_ns}. Possible pins now: {possible_input_pins}"
)
else:
print(
f"Advertencia: Call UID={instruction_uid}: No <Parameter> tags found under <CallInfo> (no NS)."
)
else:
print(
f"Error: Call UID={instruction_uid}: No <flg:CallInfo> (or CallInfo) element found."
)
else:
print(
f"Error: No se pudo encontrar el elemento <flg:Call> para UID={instruction_uid} en el XPath."
)
# Populate Inputs from Wire Connections
for pin_name in possible_input_pins:
dest_key = (instruction_uid, pin_name)
if dest_key in wire_connections:
sources_list = wire_connections[dest_key]
input_sources_repr = []
# print(f"DEBUG Wire Input: Instr={instruction_uid}, Pin={pin_name}, Sources={sources_list}") # Debug
for source_uid, source_pin in sources_list:
if source_uid == "POWERRAIL":
input_sources_repr.append({"type": "powerrail"})
elif source_uid in access_map:
input_sources_repr.append(copy.deepcopy(access_map[source_uid]))
elif (
source_uid in parts_and_calls_map
): # Check if source is a valid instruction
source_instr_info = parts_and_calls_map[source_uid]
input_sources_repr.append(
{
"type": "connection",
"source_instruction_type": source_instr_info["type"],
"source_instruction_uid": source_uid,
"source_pin": source_pin, # Use the actual source pin name
}
)
else:
# Source UID not found in instructions or access nodes
print(
f"Advertencia: Fuente desconocida UID={source_uid} conectada a {instruction_uid}.{pin_name}"
)
input_sources_repr.append(
{"type": "unknown_source", "uid": source_uid}
)
# Apply input pin mapping if needed (e.g. SdCoil)
json_pin_name = input_pin_mapping.get(pin_name, pin_name)
instruction_repr["inputs"][json_pin_name] = (
input_sources_repr[0]
if len(input_sources_repr) == 1
else input_sources_repr
)
# print(f"DEBUG Populated Input: Instr={instruction_uid}, Pin={json_pin_name}, Value={instruction_repr['inputs'][json_pin_name]}") # Debug
# Populate Outputs (Simplified - just record direct variable assignments)
possible_output_pins = set(
[
"out",
"out1",
"Q",
"q",
"eno",
"RET_VAL",
"DSTBLK",
"rt",
"cv",
"QU",
"QD",
"ET",
]
)
if original_type == "BLKMOV":
possible_output_pins.add("DSTBLK")
for pin_name in possible_output_pins:
source_key = (instruction_uid, pin_name)
if source_key in source_connections:
json_pin_name = output_pin_mapping.get(pin_name, pin_name)
if json_pin_name not in instruction_repr["outputs"]:
instruction_repr["outputs"][json_pin_name] = []
for dest_uid, dest_pin in source_connections[source_key]:
if (
dest_uid in access_map
): # Only track connections to variables/constants
dest_operand_copy = copy.deepcopy(access_map[dest_uid])
if (
dest_operand_copy
not in instruction_repr["outputs"][json_pin_name]
):
instruction_repr["outputs"][json_pin_name].append(
dest_operand_copy
)
all_logic_steps[instruction_uid] = instruction_repr
# 4. EN Inference (Simplified logic as before)
# --- (Esta sección puede permanecer igual, opera sobre all_logic_steps) ---
processed_blocks_en_inference = set()
try:
sorted_uids_for_en = sorted(
all_logic_steps.keys(),
key=lambda x: int(x) if x.isdigit() else float("inf"),
)
except ValueError:
sorted_uids_for_en = sorted(all_logic_steps.keys()) # Fallback sort
ordered_logic_list_for_en = [
all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps
]
for i, instruction in enumerate(ordered_logic_list_for_en):
part_uid = instruction["instruction_uid"]
# Leer el tipo actual de la instrucción ya parseada
part_type_original = (
instruction.get("type", "").replace(SCL_SUFFIX, "").replace("_error", "")
)
# La lógica de inferencia EN no cambia
if (
part_type_original in functional_block_types
and "en" not in instruction["inputs"]
and part_uid not in processed_blocks_en_inference
):
inferred_en_source = None
if i > 0:
# Look backwards
for j in range(i - 1, -1, -1):
prev_instr = ordered_logic_list_for_en[j]
prev_uid = prev_instr["instruction_uid"]
prev_type_original = (
prev_instr.get("type", "")
.replace(SCL_SUFFIX, "")
.replace("_error", "")
)
if prev_type_original in rlo_generators: # Found RLO source
inferred_en_source = {
"type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type_original,
"source_pin": "out",
}
break
elif (
prev_type_original in functional_block_types
): # Found block with potential ENO
if (prev_uid, "eno") in source_connections:
inferred_en_source = {
"type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type_original,
"source_pin": "eno",
}
break # Stop searching
elif prev_type_original in [
"Coil",
"SCoil",
"RCoil",
"SetCoil",
"ResetCoil",
"SdCoil",
]:
break # Coils terminate flow
if inferred_en_source is None:
inferred_en_source = {"type": "powerrail"}
# Update the instruction in the main dictionary
if part_uid in all_logic_steps:
all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source
processed_blocks_en_inference.add(part_uid)
# 5. ENO Logic (Simplified as before)
# --- (Esta sección puede permanecer igual) ---
for source_instr_uid, eno_destinations in eno_outputs.items():
if source_instr_uid not in all_logic_steps:
continue
all_logic_steps[source_instr_uid]["eno_destinations"] = eno_destinations
# 6. Order and Return
# --- (Esta sección puede permanecer igual) ---
final_logic_list = [
all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps
]
network_lang = "Unknown"
if network_element is not None:
attr_list_net = network_element.xpath("./AttributeList")
if attr_list_net:
lang_node_net = attr_list_net[0].xpath("./ProgrammingLanguage/text()")
if lang_node_net:
network_lang = lang_node_net[0].strip()
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"language": network_lang,
"logic": final_logic_list,
}
# --- Main Conversion Function (convert_xml_to_json) ---
# --- (Mantén la función convert_xml_to_json como antes, ---
# --- asegurándote de que llama a la versión actualizada de parse_network) ---
def convert_xml_to_json(xml_filepath, json_filepath):
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
if not os.path.exists(xml_filepath):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return
try:
print("Paso 1: Parseando archivo XML...")
parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(xml_filepath, parser)
root = tree.getroot()
print("Paso 1: Parseo XML completado.")
# --- Buscar bloque principal (FC, FB, GlobalDB, OB) ---
print(
"Paso 2: Buscando el bloque SW.Blocks.FC, SW.Blocks.FB, SW.Blocks.GlobalDB o SW.Blocks.OB..."
)
block_list = root.xpath(
"//*[local-name()='SW.Blocks.FC' or local-name()='SW.Blocks.FB' or local-name()='SW.Blocks.GlobalDB' or local-name()='SW.Blocks.OB']"
)
block_type_found = None
the_block = None
if block_list:
the_block = block_list[0]
block_tag_name = etree.QName(the_block.tag).localname
if block_tag_name == "SW.Blocks.FC":
block_type_found = "FC"
elif block_tag_name == "SW.Blocks.FB":
block_type_found = "FB"
elif block_tag_name == "SW.Blocks.GlobalDB":
block_type_found = "GlobalDB"
elif block_tag_name == "SW.Blocks.OB":
block_type_found = "OB"
print(
f"Paso 2: Bloque {block_tag_name} encontrado (ID={the_block.get('ID')})."
)
else:
print(
"Error Crítico: No se encontró el elemento raíz del bloque (<SW.Blocks.FC>, <SW.Blocks.FB>, <SW.Blocks.GlobalDB> o <SW.Blocks.OB>) usando XPath."
)
# ... (debug info) ...
return
# --- Extraer atributos del bloque ---
print("Paso 3: Extrayendo atributos del bloque...")
# AttributeList no parece tener namespace en los ejemplos
attribute_list_node = the_block.xpath("./AttributeList")
block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown"
if attribute_list_node:
attr_list = attribute_list_node[0]
# Name, Number, ProgrammingLanguage no parecen tener namespace
name_node = attr_list.xpath("./Name/text()")
block_name_val = name_node[0].strip() if name_node else block_name_val
num_node = attr_list.xpath("./Number/text()")
try:
block_number_val = int(num_node[0]) if num_node else None
except ValueError:
block_number_val = None
lang_node = attr_list.xpath("./ProgrammingLanguage/text()")
block_lang_val = (
lang_node[0].strip()
if lang_node
else ("DB" if block_type_found == "GlobalDB" else "Unknown")
)
print(
f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'"
)
else:
print(
f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}."
)
if block_type_found == "GlobalDB":
block_lang_val = "DB"
# --- Extraer comentario del bloque ---
# ObjectList y MultilingualText no parecen tener namespace
block_comment_val = ""
comment_node_list = the_block.xpath(
"./ObjectList/MultilingualText[@CompositionName='Comment']"
)
if comment_node_list:
block_comment_val = get_multilingual_text(
comment_node_list[0]
) # Usa namespaces iface internamente
print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
# --- Crear diccionario resultado ---
result = {
"block_name": block_name_val,
"block_number": block_number_val,
"language": block_lang_val,
"block_type": block_type_found,
"block_comment": block_comment_val,
"interface": {},
"networks": [],
}
# --- Extraer interfaz ---
print("Paso 4: Extrayendo la interfaz del bloque...")
# Interface está dentro de AttributeList, no tiene namespace. Sections/Member sí usan iface.
interface_node_list = (
attribute_list_node[0].xpath("./Interface") if attribute_list_node else []
)
if interface_node_list:
interface_node = interface_node_list[0]
print("Paso 4: Nodo Interface encontrado.")
# Sections/Section/Member usan namespace iface
all_sections = interface_node.xpath(".//iface:Section", namespaces=ns)
if all_sections:
for section in all_sections:
section_name = section.get("Name")
if not section_name:
continue
members_in_section = section.xpath("./iface:Member", namespaces=ns)
if members_in_section:
result["interface"][section_name] = parse_interface_members(
members_in_section
)
else:
print(
"Advertencia: Nodo Interface no contiene secciones <iface:Section>."
)
if not result["interface"]:
print(
"Advertencia: Interface encontrada pero sin secciones procesables."
)
else: # Manejo especial para DB si no hay <Interface>
if block_type_found == "GlobalDB":
static_members = the_block.xpath(
".//iface:Section[@Name='Static']/iface:Member", namespaces=ns
)
if static_members:
print("Paso 4: Encontrada sección Static para GlobalDB.")
result["interface"]["Static"] = parse_interface_members(
static_members
)
else:
print("Advertencia: No se encontró sección 'Static' para GlobalDB.")
else:
print(
f"Advertencia: No se encontró <Interface> para bloque {block_type_found}."
)
if not result["interface"]:
print("Advertencia: No se pudo extraer información de la interfaz.")
# --- Procesar redes (CompileUnits) ---
print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...")
networks_processed_count = 0
result["networks"] = []
# ObjectList no parece tener namespace, SW.Blocks.CompileUnit tampoco
object_list_node = the_block.xpath("./ObjectList")
if object_list_node:
compile_units = object_list_node[0].xpath("./SW.Blocks.CompileUnit")
print(
f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit."
)
for network_elem in compile_units:
networks_processed_count += 1
network_id = network_elem.get("ID")
if not network_id:
continue
# Detectar lenguaje de la red (AttributeList/ProgrammingLanguage sin namespace)
attribute_list = network_elem.xpath("./AttributeList")
programming_language = "LAD" # Default
if attribute_list:
lang_node = attribute_list[0].xpath("./ProgrammingLanguage/text()")
if lang_node:
programming_language = lang_node[0].strip()
print(
f" - Procesando Red ID={network_id}, Lenguaje={programming_language}"
)
# Procesar según lenguaje
parsed_network_data = None
if programming_language in ["LAD", "FBD", "GRAPH"]:
# Llamar a parse_network (que ahora maneja errores de FlgNet)
parsed_network_data = parse_network(network_elem)
if parsed_network_data and not parsed_network_data.get("error"):
parsed_network_data["language"] = programming_language
elif parsed_network_data and parsed_network_data.get("error"):
print(
f" Error parseando Red {network_id}: {parsed_network_data['error']}"
)
# Mantener la red con el error para x2/x3
parsed_network_data["language"] = (
programming_language # Asegurar lenguaje
)
else: # parse_network devolvió None (error inesperado)
print(
f" Error fatal: parse_network devolvió None para Red {network_id}"
)
parsed_network_data = {
"id": network_id,
"language": programming_language,
"logic": [],
"error": "parse_network failed",
}
elif programming_language == "SCL":
network_source_node = network_elem.xpath(
".//flg:NetworkSource", namespaces=ns
) # NetworkSource sí usa flg
structured_text_node = (
network_source_node[0].xpath(
"./st:StructuredText", namespaces=ns
)
if network_source_node
else None
)
reconstructed_scl = f"// SCL extraction failed: Node not found.\n"
if structured_text_node:
reconstructed_scl = reconstruct_scl_from_tokens(
structured_text_node[0]
)
parsed_network_data = {
"id": network_id,
"language": "SCL",
"logic": [
{
"instruction_uid": f"SCL_{network_id}",
"type": "RAW_SCL_CHUNK",
"scl": reconstructed_scl,
}
],
}
elif programming_language == "STL":
network_source_node = network_elem.xpath(
".//flg:NetworkSource", namespaces=ns
)
statement_list_node = (
network_source_node[0].xpath(
"./stl:StatementList", namespaces=ns
)
if network_source_node
else None
)
reconstructed_stl = f"// STL extraction failed: Node not found.\n"
if statement_list_node:
reconstructed_stl = reconstruct_stl_from_statementlist(
statement_list_node[0]
)
parsed_network_data = {
"id": network_id,
"language": "STL",
"logic": [
{
"instruction_uid": f"STL_{network_id}",
"type": "RAW_STL_CHUNK",
"stl": reconstructed_stl,
}
],
}
else: # Lenguaje no soportado
parsed_network_data = {
"id": network_id,
"language": programming_language,
"logic": [
{
"instruction_uid": f"UNS_{network_id}",
"type": "UNSUPPORTED_LANG",
"info": f"Language {programming_language} not supported",
}
],
"error": "Unsupported language",
}
# Añadir título y comentario a la red parseada
if parsed_network_data:
title_element = network_elem.xpath(
".//iface:MultilingualText[@CompositionName='Title']",
namespaces=ns,
)
parsed_network_data["title"] = (
get_multilingual_text(title_element[0])
if title_element
else f"Network {network_id}"
)
comment_element = network_elem.xpath(
"./ObjectList/MultilingualText[@CompositionName='Comment']",
namespaces=ns,
) # Path relativo a CompileUnit
parsed_network_data["comment"] = (
get_multilingual_text(comment_element[0])
if comment_element
else ""
)
result["networks"].append(parsed_network_data)
if networks_processed_count == 0 and block_type_found != "GlobalDB":
print(
f"Advertencia: ObjectList para {block_type_found} sin SW.Blocks.CompileUnit."
)
elif block_type_found == "GlobalDB":
print("Paso 5: Saltando búsqueda de CompileUnits para GlobalDB.")
else:
print(
f"Advertencia: No se encontró ObjectList para el bloque {block_type_found}."
)
# --- Escribir JSON ---
print("Paso 6: Escribiendo el resultado en el archivo JSON...")
# ... (resto del código de escritura y manejo de errores igual) ...
if not result["interface"]:
print("ADVERTENCIA FINAL: 'interface' está vacía.")
if not result["networks"] and block_type_found != "GlobalDB":
print("ADVERTENCIA FINAL: 'networks' está vacía.")
try:
with open(json_filepath, "w", encoding="utf-8") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print("Paso 6: Escritura completada.")
print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'")
except IOError as e:
print(
f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"
)
except TypeError as e:
print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
# ... (debug de serialización) ...
except etree.XMLSyntaxError as e:
print(
f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}"
)
except Exception as e:
print(f"Error Crítico: Error inesperado durante la conversión: {e}")
traceback.print_exc()
if __name__ == "__main__":
# --- (La sección __main__ permanece igual que en la respuesta anterior) ---
import argparse
import os
import sys
import traceback
parser = argparse.ArgumentParser(
description="Convert Simatic XML (LAD/FBD/SCL/STL/OB/DB) to simplified JSON. Expects XML filepath as argument."
)
parser.add_argument(
"xml_filepath",
help="Path to the input XML file passed from the main script (x0_main.py).",
)
args = parser.parse_args()
xml_input_file = args.xml_filepath
if not os.path.exists(xml_input_file):
print(
f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'",
file=sys.stderr,
)
sys.exit(1)
xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0]
output_dir = os.path.dirname(xml_input_file)
os.makedirs(output_dir, exist_ok=True)
json_output_file = os.path.join(output_dir, f"{xml_filename_base}_simplified.json")
print(
f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'"
)
try:
convert_xml_to_json(xml_input_file, json_output_file)
sys.exit(0) # Éxito
except Exception as e:
print(
f"Error Crítico (x1) durante la conversión de '{xml_input_file}': {e}",
file=sys.stderr,
)
traceback.print_exc(file=sys.stderr)
sys.exit(1) # Fallo