Simatic_XML_Parser_to_SCL/x1_to_json.py

1380 lines
59 KiB
Python

# -*- coding: utf-8 -*-
import json
import argparse
import os
import re
from lxml import etree
import traceback
from collections import defaultdict
# --- Namespaces ---
# Se añade el namespace 'st' para Structured Text
ns = {
"iface": "http://www.siemens.com/automation/Openness/SW/Interface/v5",
"flg": "http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4",
"st": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StructuredText/v3",
"stl": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StatementList/v4",
}
# --- Helper Functions ---
def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"):
# (Sin cambios respecto a la versión anterior)
if element is None:
return ""
try:
xpath_expr = (
f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{default_lang}']]"
f"/*[local-name()='AttributeList']/*[local-name()='Text']"
)
text_items = element.xpath(xpath_expr)
if text_items and text_items[0].text is not None:
return text_items[0].text.strip()
xpath_expr = (
f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{fallback_lang}']]"
f"/*[local-name()='AttributeList']/*[local-name()='Text']"
)
text_items = element.xpath(xpath_expr)
if text_items and text_items[0].text is not None:
return text_items[0].text.strip()
xpath_expr = f".//*[local-name()='MultilingualTextItem']/*[local-name()='AttributeList']/*[local-name()='Text']"
text_items = element.xpath(xpath_expr)
if text_items and text_items[0].text is not None:
return text_items[0].text.strip()
return ""
except Exception as e:
print(f"Advertencia: Error extrayendo MultilingualText: {e}")
return ""
def get_symbol_name(symbol_element):
# (Sin cambios respecto a la versión anterior)
if symbol_element is None:
return None
try:
components = symbol_element.xpath("./*[local-name()='Component']/@Name")
return ".".join(f'"{c}"' for c in components) if components else None
except Exception as e:
print(f"Advertencia: Excepción en get_symbol_name: {e}")
return None
def parse_access(access_element):
# (Sin cambios respecto a la versión anterior)
if access_element is None:
return None
uid = access_element.get("UId")
scope = access_element.get("Scope")
info = {"uid": uid, "scope": scope, "type": "unknown"}
symbol = access_element.xpath("./*[local-name()='Symbol']")
constant = access_element.xpath("./*[local-name()='Constant']")
if symbol:
info["type"] = "variable"
info["name"] = get_symbol_name(symbol[0])
if info["name"] is None:
info["type"] = "error_parsing_symbol"
print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}")
return info
elif constant:
info["type"] = "constant"
const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']")
const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']")
info["datatype"] = (
const_type_elem[0].text
if const_type_elem and const_type_elem[0].text is not None
else "Unknown"
)
value_str = (
const_val_elem[0].text
if const_val_elem and const_val_elem[0].text is not None
else None
)
if value_str is None:
info["type"] = "error_parsing_constant"
info["value"] = None
print(f"Error: Constante sin valor Access UID={uid}")
return info
if info["datatype"] == "Unknown":
val_lower = value_str.lower()
if val_lower in ["true", "false"]:
info["datatype"] = "Bool"
elif value_str.isdigit() or (
value_str.startswith("-") and value_str[1:].isdigit()
):
info["datatype"] = "Int"
elif "." in value_str:
try:
float(value_str)
info["datatype"] = "Real"
except ValueError:
pass
elif "#" in value_str:
info["datatype"] = "TypedConstant"
info["value"] = value_str
dtype_lower = info["datatype"].lower()
val_str_processed = value_str.split("#")[-1] if "#" in value_str else value_str
try:
if dtype_lower in [
"int",
"dint",
"udint",
"sint",
"usint",
"lint",
"ulint",
"word",
"dword",
"lword",
"byte",
]:
info["value"] = int(val_str_processed)
elif dtype_lower == "bool":
info["value"] = (
val_str_processed.lower() == "true" or val_str_processed == "1"
)
elif dtype_lower in ["real", "lreal"]:
info["value"] = float(val_str_processed)
elif dtype_lower == "typedconstant":
info["value"] = value_str
except (ValueError, TypeError) as e:
print(
f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}"
)
info["value"] = value_str
else:
info["type"] = "unknown_structure"
print(f"Advertencia: Access UID={uid} no es Symbol ni Constant.")
return info
if info["type"] == "variable" and info.get("name") is None:
print(f"Error Interno: parse_access var sin nombre UID {uid}.")
info["type"] = "error_no_name"
return info
return info
def parse_part(part_element):
# (Sin cambios respecto a la versión anterior)
if part_element is None:
return None
uid = part_element.get("UId")
name = part_element.get("Name")
if not uid or not name:
print(
f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}"
)
return None
template_values = {}
try:
for tv in part_element.xpath("./*[local-name()='TemplateValue']"):
tv_name = tv.get("Name")
tv_type = tv.get("Type")
if tv_name and tv_type:
template_values[tv_name] = tv_type
except Exception as e:
print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}")
negated_pins = {}
try:
for negated_elem in part_element.xpath("./*[local-name()='Negated']"):
negated_pin_name = negated_elem.get("Name")
if negated_pin_name:
negated_pins[negated_pin_name] = True
except Exception as e:
print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}")
return {
"uid": uid,
"type": name,
"template_values": template_values,
"negated_pins": negated_pins,
}
def parse_call(call_element):
# (Mantiene la corrección para DB de instancia)
if call_element is None:
return None
uid = call_element.get("UId")
if not uid:
print(
f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"
)
return None
call_info_elem = call_element.xpath("./*[local-name()='CallInfo']")
if not call_info_elem:
print(f"Error: Call UID {uid} sin elemento CallInfo.")
return None
call_info = call_info_elem[0]
block_name = call_info.get("Name")
block_type = call_info.get("BlockType")
instance_name = None
instance_scope = None
if not block_name or not block_type:
print(f"Error: CallInfo para UID {uid} sin Name o BlockType.")
return None
if block_type == "FB":
instance_elem_list = call_info.xpath("./*[local-name()='Instance']")
if instance_elem_list:
instance_elem = instance_elem_list[0]
instance_scope = instance_elem.get("Scope")
component_elem_list = instance_elem.xpath(
"./*[local-name()='Component']"
) # Busca Component directo
if component_elem_list:
component_elem = component_elem_list[0]
db_name_raw = component_elem.get("Name")
if db_name_raw:
instance_name = f'"{db_name_raw}"' # Añade comillas
else:
print(
f"Advertencia: <Component> dentro de <Instance> para FB Call UID {uid} no tiene atributo 'Name'."
)
else:
print(
f"Advertencia: No se encontró <Component> dentro de <Instance> para FB Call UID {uid}. No se pudo obtener el nombre del DB."
)
else:
print(
f"Advertencia: FB Call '{block_name}' UID {uid} no tiene elemento <Instance>."
)
call_data = {
"uid": uid,
"type": "Call",
"block_name": block_name,
"block_type": block_type,
}
if instance_name:
call_data["instance_db"] = instance_name
if instance_scope:
call_data["instance_scope"] = instance_scope
return call_data
# SCL (Structured Text) Parser
def reconstruct_scl_from_tokens(st_node):
"""
Reconstruye SCL desde <StructuredText>, mejorando el manejo de
variables, constantes literales, tokens básicos, espacios y saltos de línea.
"""
if st_node is None:
return "// Error: StructuredText node not found.\n"
scl_parts = []
children = st_node.xpath("./st:*", namespaces=ns)
for elem in children:
tag = etree.QName(elem.tag).localname
if tag == "Token":
scl_parts.append(elem.get("Text", ""))
elif tag == "Blank":
# Añadir espacios simples, evitar múltiples si ya hay uno antes/después
if not scl_parts or not scl_parts[-1].endswith(' '):
scl_parts.append(" " * int(elem.get("Num", 1)))
elif int(elem.get("Num", 1)) > 1: # Añadir extras si son más de 1
scl_parts.append(" " * (int(elem.get("Num", 1))-1))
elif tag == "NewLine":
# Limpiar espacios antes del salto de línea real
if scl_parts:
scl_parts[-1] = scl_parts[-1].rstrip()
scl_parts.append("\n")
elif tag == "Access":
scope = elem.get("Scope")
access_str = f"/*_ERR_Scope_{scope}_*/" # Fallback más informativo
if scope in ["GlobalVariable", "LocalVariable", "TempVariable", "InOutVariable", "InputVariable", "OutputVariable", "ConstantVariable"]: # Tipos comunes de variables
symbol_elem = elem.xpath("./st:Symbol", namespaces=ns)
if symbol_elem:
components = symbol_elem[0].xpath("./st:Component", namespaces=ns)
symbol_text_parts = []
for i, comp in enumerate(components):
name = comp.get("Name", "_ERR_COMP_")
# Añadir punto si no es el primer componente
if i > 0:
symbol_text_parts.append(".")
# Reconstrucción de comillas (heurística)
has_quotes_elem = comp.xpath("../st:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns)
has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true"
is_temp = name.startswith('#')
if has_quotes or (i == 0 and not is_temp): # Comillas si HasQuotes o primer componente (no temp)
symbol_text_parts.append(f'"{name}"')
else:
symbol_text_parts.append(name)
# Manejar índices de array (RECURSIVO)
index_access = comp.xpath("./st:Access", namespaces=ns)
if index_access:
# Llama recursivamente para obtener el texto de cada índice
indices_text = [reconstruct_scl_from_tokens(idx_node) for idx_node in index_access]
symbol_text_parts.append(f"[{','.join(indices_text)}]")
access_str = "".join(symbol_text_parts)
elif scope == "LiteralConstant":
constant_elem = elem.xpath("./st:Constant", namespaces=ns)
if constant_elem:
val_elem = constant_elem[0].xpath("./st:ConstantValue/text()", namespaces=ns)
type_elem = constant_elem[0].xpath("./st:ConstantType/text()", namespaces=ns)
const_type = type_elem[0] if type_elem else ""
const_val = val_elem[0] if val_elem else "_ERR_CONSTVAL_"
# **CORRECCIÓN CLAVE**: Usar el valor extraído
access_str = const_val
# Opcional: añadir prefijos T#, L#, etc. si es necesario
# if const_type == "Time": access_str = f"T#{const_val}"
# elif const_type == "LTime": access_str = f"LT#{const_val}"
# ... otros tipos ...
else:
access_str = "/*_ERR_NOCONST_*/"
# --- Añadir más manejo de scopes aquí si es necesario ---
# elif scope == "Call": access_str = reconstruct_call(elem)
# elif scope == "Expression": access_str = reconstruct_expression(elem)
scl_parts.append(access_str)
elif tag == "Comment" or tag == "LineComment":
comment_text = "".join(elem.xpath(".//text()")).strip()
if tag == "Comment":
scl_parts.append(f"(* {comment_text} *)")
else:
scl_parts.append(f"// {comment_text}")
# else: Ignorar otros nodos
# Unir partes, limpiar espacios extra alrededor de operadores y saltos de línea
full_scl = "".join(scl_parts)
# Re-indentar líneas después de IF/THEN, etc. (Simplificado)
output_lines = []
indent_level = 0
for line in full_scl.split('\n'):
line = line.strip()
if not line: continue # Saltar líneas vacías
# Reducir indentación antes de procesar END_IF, ELSE, etc. (simplificado)
if line.startswith(('END_IF', 'END_WHILE', 'END_FOR', 'END_CASE', 'ELSE', 'ELSIF')):
indent_level = max(0, indent_level - 1)
output_lines.append(" " * indent_level + line) # Aplicar indentación
# Aumentar indentación después de IF, WHILE, FOR, CASE, ELSE, ELSIF (simplificado)
if line.endswith('THEN') or line.endswith('DO') or line.endswith('OF') or line == 'ELSE':
indent_level += 1
# Nota: Esto no maneja bloques BEGIN/END dentro de SCL
return "\n".join(output_lines)
# STL (Statement List) Parser
def get_access_text(access_element):
"""Reconstruye una representación textual simple de un Access en STL."""
if access_element is None:
return "_ERR_ACCESS_"
scope = access_element.get("Scope")
# Intenta reconstruir el símbolo
# CORREGIDO: Añadido namespaces=ns
symbol_elem = access_element.xpath("./stl:Symbol", namespaces=ns)
if symbol_elem:
# CORREGIDO: Añadido namespaces=ns
components = symbol_elem[0].xpath("./stl:Component", namespaces=ns)
parts = []
for comp in components:
name = comp.get("Name", "_ERR_COMP_")
# CORREGIDO: Añadido namespaces=ns
has_quotes_elem = comp.xpath("../stl:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns)
has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true"
# Usar nombre tal cual por ahora
parts.append(name)
# Añadir índices si existen
# CORREGIDO: Añadido namespaces=ns
index_access = comp.xpath("./stl:Access", namespaces=ns)
if index_access:
indices = [get_access_text(ia) for ia in index_access]
parts.append(f"[{','.join(indices)}]")
return ".".join(parts)
# Intenta reconstruir constante
# CORREGIDO: Añadido namespaces=ns
constant_elem = access_element.xpath("./stl:Constant", namespaces=ns)
if constant_elem:
# CORREGIDO: Añadido namespaces=ns
val_elem = constant_elem[0].xpath("./stl:ConstantValue/text()", namespaces=ns)
type_elem = constant_elem[0].xpath("./stl:ConstantType/text()", namespaces=ns) # Obtener tipo para mejor formato
const_type = type_elem[0] if type_elem else ""
const_val = val_elem[0] if val_elem else "_ERR_CONST_"
# Añadir prefijo de tipo si es necesario (ej. T# , L#) - Simplificado
if const_type == "Time": return f"T#{const_val}"
if const_type == "ARef": return f"{const_val}" # No necesita prefijo
# Añadir más tipos si es necesario
return const_val # Valor directo para otros tipos
# Intenta reconstruir etiqueta
# CORREGIDO: Añadido namespaces=ns
label_elem = access_element.xpath("./stl:Label", namespaces=ns)
if label_elem:
name = label_elem[0].get("Name", "_ERR_LABEL_")
return name
# Intenta reconstruir acceso indirecto (simplificado)
# CORREGIDO: Añadido namespaces=ns
indirect_elem = access_element.xpath("./stl:Indirect", namespaces=ns)
if indirect_elem:
reg = indirect_elem[0].get("Register", "AR?")
offset_str = indirect_elem[0].get("BitOffset", "0")
area = indirect_elem[0].get("Area", "DB")
width = indirect_elem[0].get("Width", "X")
# Convertir BitOffset a formato P#Byte.Bit
try:
bit_offset = int(offset_str)
byte_offset = bit_offset // 8
bit_in_byte = bit_offset % 8
p_format_offset = f"P#{byte_offset}.{bit_in_byte}"
except ValueError:
p_format_offset = "P#?.?"
# Formatear ancho
width_map = {"Bit": "X", "Byte": "B", "Word": "W", "Double": "D"}
width_char = width_map.get(width, width[0] if width else "?") # Usa primera letra si no mapeado
return f"{area}{width_char}[{reg},{p_format_offset}]"
# Intenta reconstruir dirección absoluta
# CORREGIDO: Añadido namespaces=ns
address_elem = access_element.xpath("./stl:Address", namespaces=ns)
if address_elem:
area = address_elem[0].get("Area", "??")
bit_offset_str = address_elem[0].get("BitOffset", "0")
addr_type_str = address_elem[0].get("Type", "Bool") # Obtener tipo para ancho
try:
bit_offset = int(bit_offset_str)
byte_offset = bit_offset // 8
bit_in_byte = bit_offset % 8
# Determinar ancho basado en tipo (simplificación)
addr_width = "X" # Default a Bit
if addr_type_str == "Byte": addr_width = "B"
elif addr_type_str == "Word": addr_width = "W"
elif addr_type_str in ["DWord", "DInt"]: addr_width = "D"
# Añadir más tipos si es necesario (Real, etc.)
# Mapear Area para STL estándar
area_map = { "Input": "I", "Output": "Q", "Memory": "M",
"PeripheryInput": "PI", "PeripheryOutput": "PQ",
"DB": "DB", "DI": "DI", "Local": "L", # L no siempre válido aquí
"Timer": "T", "Counter": "C" }
stl_area = area_map.get(area, area)
# Manejar DB/DI que necesitan número de bloque
if stl_area in ["DB", "DI"]:
block_num = address_elem[0].get("BlockNumber")
if block_num:
return f"{stl_area}{block_num}.{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DB1.DBX0.1
else: # Acceso con registro DB/DI
return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DBX0.1
elif stl_area in ["T", "C"]:
return f"{stl_area}{byte_offset}" # Los timers/contadores solo usan el número
else: # I, Q, M, L, PI, PQ
return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: M10.1, I0.0
except ValueError:
return f"{area}?{bit_offset_str}?"
return f"_{scope}_?" # Fallback
def get_comment_text(comment_element):
"""Extrae texto de un LineComment o Comment."""
if comment_element is None: return ""
# Usar get_multilingual_text si los comentarios son multilingües
# Si no, extraer texto directamente
ml_texts = comment_element.xpath(".//mlt:MultilingualTextItem/mlt:AttributeList/mlt:Text/text()",
namespaces={'mlt': "http://www.siemens.com/automation/Openness/SW/Interface/v5"}) # Asumiendo ns
if ml_texts:
# Podrías intentar obtener un idioma específico o simplemente el primero
return ml_texts[0].strip() if ml_texts else ""
# Fallback a texto directo si no hay estructura multilingüe
text_nodes = comment_element.xpath("./text()")
return "".join(text_nodes).strip()
def reconstruct_stl_from_statementlist(statement_list_node):
"""Reconstruye el código STL como una cadena de texto desde <StatementList>."""
if statement_list_node is None:
return "// Error: StatementList node not found.\n"
stl_lines = []
# CORREGIDO: Añadido namespaces=ns
statements = statement_list_node.xpath("./stl:StlStatement", namespaces=ns)
for stmt in statements:
line_parts = []
line_comment = "" # Comentario al final de la línea
# 1. Comentarios al inicio de la línea (como líneas separadas //)
# CORREGIDO: Añadido namespaces=ns
initial_comments = stmt.xpath("child::stl:Comment | child::stl:LineComment", namespaces=ns)
for comm in initial_comments:
comment_text = get_comment_text(comm)
if comment_text:
# Dividir comentarios multilínea en varias líneas //
for comment_line in comment_text.splitlines():
stl_lines.append(f"// {comment_line}")
# 2. Etiqueta (si existe)
# CORREGIDO: Añadido namespaces=ns
label_decl = stmt.xpath("./stl:LabelDeclaration", namespaces=ns)
label_str = ""
if label_decl:
# CORREGIDO: Añadido namespaces=ns
label_name_nodes = label_decl[0].xpath("./stl:Label/@Name", namespaces=ns)
if label_name_nodes:
label_str = f"{label_name_nodes[0]}:"
# Buscar comentarios DENTRO de LabelDeclaration pero después de Label
# CORREGIDO: Añadido namespaces=ns
label_comments = label_decl[0].xpath("./stl:Comment | ./stl:LineComment", namespaces=ns)
for lcomm in label_comments:
comment_text = get_comment_text(lcomm)
if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea
# 3. Token de Instrucción STL
# CORREGIDO: Añadido namespaces=ns
instruction_token = stmt.xpath("./stl:StlToken", namespaces=ns)
instruction_str = ""
if instruction_token:
token_text = instruction_token[0].get("Text", "_ERR_TOKEN_")
instruction_str = token_text
# Comentarios asociados directamente al token
# CORREGIDO: Añadido namespaces=ns
token_comments = instruction_token[0].xpath("./stl:Comment | ./stl:LineComment", namespaces=ns)
for tcomm in token_comments:
comment_text = get_comment_text(tcomm)
if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea
# 4. Acceso/Operando STL
# CORREGIDO: Añadido namespaces=ns
access_elem = stmt.xpath("./stl:Access", namespaces=ns)
access_str = ""
if access_elem:
access_text = get_access_text(access_elem[0])
access_str = access_text
# Comentarios DENTRO del Access (pueden ser de línea o bloque)
# CORREGIDO: Añadido namespaces=ns
access_comments = access_elem[0].xpath("child::stl:LineComment | child::stl:Comment", namespaces=ns)
for acc_comm in access_comments:
comment_text = get_comment_text(acc_comm)
if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea
# Construir la línea: Etiqueta (si hay) + Tab + Instrucción + Espacio + Operando (si hay) + Comentario(s)
current_line = ""
if label_str:
current_line += label_str
if instruction_str:
if current_line: # Si ya había etiqueta, añadir tabulador
current_line += "\t"
current_line += instruction_str
if access_str:
if current_line: # Si ya había algo, añadir espacio
current_line += " "
current_line += access_str
if line_comment:
# Añadir espacio antes del comentario si hay código en la línea
if current_line.strip():
current_line += f" {line_comment}"
else: # Si la línea estaba vacía (solo comentarios iniciales), poner el comentario de línea
current_line = line_comment
# Añadir la línea construida solo si no está vacía
if current_line.strip():
stl_lines.append(current_line.rstrip()) # Eliminar espacios finales
return "\n".join(stl_lines)
# --- Main Parsing Function ---
def parse_network(network_element):
"""
Parsea una red, extrae lógica y añade conexiones EN implícitas.
Maneja wires con múltiples destinos.
"""
if network_element is None:
return {
"id": "ERROR",
"title": "Invalid Network Element",
"comment": "",
"logic": [],
"error": "Input element was None",
}
network_id = network_element.get("ID")
# --- Extracción Título/Comentario (sin cambios respecto a la última versión) ---
title_element = network_element.xpath(
".//*[local-name()='MultilingualText'][@CompositionName='Title']"
)
network_title = (
get_multilingual_text(title_element[0])
if title_element
else f"Network {network_id}"
)
comment_element = network_element.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
network_comment = (
get_multilingual_text(comment_element[0]) if comment_element else ""
)
flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns)
if not flgnet_list:
# print(f"Advertencia: FlgNet no encontrado en Red ID={network_id}. Puede estar vacía o ser comentario.")
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": [],
"error": "FlgNet not found",
}
flgnet = flgnet_list[0]
# 1. Parsear Access, Parts y Calls (sin cambios)
access_map = {
acc_info["uid"]: acc_info
for acc in flgnet.xpath(".//flg:Access", namespaces=ns)
if (acc_info := parse_access(acc)) and acc_info["type"] != "unknown"
}
parts_and_calls_map = {}
instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns)
for element in instruction_elements:
parsed_info = None
tag_name = etree.QName(
element.tag
).localname # Obtener nombre local de la etiqueta
if tag_name == "Part":
parsed_info = parse_part(element)
elif tag_name == "Call":
parsed_info = parse_call(element)
if parsed_info and "uid" in parsed_info:
parts_and_calls_map[parsed_info["uid"]] = parsed_info
else:
print(
f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}"
)
# --- 2. Parsear Wires (MODIFICADO para multi-destino) ---
wire_connections = defaultdict(
list
) # (dest_uid, dest_pin) -> [(src_uid, src_pin), ...]
source_connections = defaultdict(
list
) # (src_uid, src_pin) -> [(dest_uid, dest_pin), ...]
eno_outputs = defaultdict(
list
) # src_uid -> [(dest_uid, dest_pin), ...] (conexiones DESDE eno)
flg_ns_uri = ns["flg"] # Cache namespace URI
qname_powerrail = etree.QName(flg_ns_uri, "Powerrail")
qname_identcon = etree.QName(flg_ns_uri, "IdentCon")
qname_namecon = etree.QName(flg_ns_uri, "NameCon")
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
children = wire.getchildren()
if len(children) < 2:
continue # Ignorar wires sin fuente y al menos un destino
source_elem = children[0]
source_uid, source_pin = None, None
# Determinar fuente
if source_elem.tag == qname_powerrail:
source_uid, source_pin = "POWERRAIL", "out"
elif source_elem.tag == qname_identcon:
source_uid, source_pin = (
source_elem.get("UId"),
"value",
) # Acceso a variable/constante
elif source_elem.tag == qname_namecon:
source_uid, source_pin = source_elem.get("UId"), source_elem.get(
"Name"
) # Salida de instrucción
if source_uid is None:
continue # No se pudo determinar la fuente
source_info = (source_uid, source_pin) # Par de fuente
# Iterar sobre TODOS los posibles destinos (desde el segundo hijo en adelante)
for dest_elem in children[1:]:
dest_uid, dest_pin = None, None
# Determinar destino
if dest_elem.tag == qname_identcon:
dest_uid, dest_pin = (
dest_elem.get("UId"),
"value",
) # Entrada a variable/constante (Coil, etc.)
elif dest_elem.tag == qname_namecon:
dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get(
"Name"
) # Entrada a instrucción
# Guardar conexiones si son válidas
if dest_uid is not None and dest_pin is not None:
# Mapa de Conexiones (Destino -> [Fuentes])
dest_key = (dest_uid, dest_pin)
if source_info not in wire_connections[dest_key]:
wire_connections[dest_key].append(source_info)
# Mapa de Fuentes (Fuente -> [Destinos])
source_key = (source_uid, source_pin)
dest_info = (dest_uid, dest_pin)
if dest_info not in source_connections[source_key]:
source_connections[source_key].append(dest_info)
# Registrar conexiones que SALEN de un pin 'eno'
if source_pin == "eno" and source_uid in parts_and_calls_map:
if dest_info not in eno_outputs[source_uid]:
eno_outputs[source_uid].append(dest_info)
# else: # Debug opcional si un elemento no es destino válido
# print(f"Advertencia: Elemento en Wire {wire.get('UId')} no es destino válido: {etree.tostring(dest_elem)}")
# --- FIN MODIFICACIÓN Wire ---
# 3. Construcción Lógica Inicial (sin cambios)
all_logic_steps = {}
functional_block_types = [
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call",
"Se",
"Sd",
"BLKMOV",
]
rlo_generators = [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"And",
"Xor",
"PBox",
"NBox",
]
for instruction_uid, instruction_info in parts_and_calls_map.items():
# Copiar info básica
instruction_repr = {"instruction_uid": instruction_uid, **instruction_info}
instruction_repr["inputs"] = {}
instruction_repr["outputs"] = {}
# --- INICIO: Manejo Especial SdCoil y otros Timers ---
original_type = instruction_info["type"]
current_type = original_type
input_pin_mapping = {} # Mapa XML pin -> JSON pin
output_pin_mapping = {} # Mapa XML pin -> JSON pin
if original_type == "SdCoil":
print(
f" Advertencia: Reinterpretando 'SdCoil' (UID: {instruction_uid}) como 'Se' (Pulse Timer)."
)
current_type = "Se" # Tratarlo como Se (TP)
input_pin_mapping = {
"in": "s", # Pin XML 'in' mapea a JSON 's' (Start)
"operand": "timer", # Pin XML 'operand' mapea a JSON 'timer' (Instance)
"value": "tv", # Pin XML 'value' mapea a JSON 'tv' (Time Value)
}
output_pin_mapping = {"out": "q"} # Pin XML 'out' mapea a JSON 'q' (Output)
elif original_type in ["Se", "Sd"]:
# Mapear pines estándar de Se/Sd para consistencia con TP/TON
input_pin_mapping = {"s": "s", "tv": "tv", "r": "r", "timer": "timer"}
output_pin_mapping = {
"q": "q",
"rt": "rt", # "rtbcd": "rtbcd" (ignorar BCD)
}
# Añadir otros mapeos si son necesarios para otros bloques (ej. Contadores)
# elif original_type == "CTU":
# input_pin_mapping = {"cu": "cu", "r": "r", "pv": "pv", "counter": "counter"} # 'counter' es inventado para instancia
# output_pin_mapping = {"qu": "qu", "cv": "cv"}
# --- FIN Manejo Especial ---
instruction_repr["type"] = current_type # Actualizar el tipo si se cambió
# Mapear Entradas usando el mapeo de pines
possible_input_pins = set(
[
"en",
"in",
"in1",
"in2",
"s",
"r",
"tv",
"value",
"operand",
"timer",
"bit",
"clk",
"pv",
"cu",
"cd",
"ld",
"pre",
"SRCBLK",
]
) # Ampliar con pines conocidos
for xml_pin_name in possible_input_pins:
dest_key = (instruction_uid, xml_pin_name)
if dest_key in wire_connections:
sources_list = wire_connections[dest_key]
input_sources_repr = []
# ... (lógica existente para obtener input_sources_repr de sources_list) ...
for source_uid, source_pin in sources_list:
if source_uid == "POWERRAIL":
input_sources_repr.append({"type": "powerrail"})
elif source_uid in access_map:
input_sources_repr.append(access_map[source_uid])
elif source_uid in parts_and_calls_map:
source_instr_info = parts_and_calls_map[source_uid]
source_original_type = source_instr_info["type"]
# Obtener el mapeo de salida para el tipo de la fuente (si existe)
source_output_mapping = {}
if source_original_type == "SdCoil":
source_output_mapping = {"out": "q"}
elif source_original_type in ["Se", "Sd"]:
source_output_mapping = {"q": "q", "rt": "rt"}
# Usar el pin mapeado si existe, sino el original
mapped_source_pin = source_output_mapping.get(source_pin, source_pin)
input_sources_repr.append({
"type": "connection",
"source_instruction_type": source_original_type, # Guardar tipo original puede ser útil
"source_instruction_uid": source_uid,
"source_pin": mapped_source_pin # <-- USAR PIN MAPEADO
})
else:
input_sources_repr.append(
{"type": "unknown_source", "uid": source_uid}
)
# Usar el nombre de pin mapeado para el JSON
json_pin_name = input_pin_mapping.get(xml_pin_name, xml_pin_name)
if len(input_sources_repr) == 1:
instruction_repr["inputs"][json_pin_name] = input_sources_repr[0]
elif len(input_sources_repr) > 1:
instruction_repr["inputs"][json_pin_name] = input_sources_repr
# Mapear Salidas usando el mapeo de pines
possible_output_pins = set(
[
"out",
"out1",
"Q",
"q",
"eno",
"RET_VAL",
"DSTBLK",
"rt",
"rtbcd",
"cv",
"cvbcd",
"QU",
"QD",
]
)
for xml_pin_name in possible_output_pins:
source_key = (instruction_uid, xml_pin_name)
if source_key in source_connections:
# Usar el nombre de pin mapeado para el JSON
json_pin_name = output_pin_mapping.get(xml_pin_name, xml_pin_name)
if json_pin_name not in instruction_repr["outputs"]:
instruction_repr["outputs"][json_pin_name] = []
for dest_uid, dest_pin in source_connections[source_key]:
if dest_uid in access_map:
if (
access_map[dest_uid]
not in instruction_repr["outputs"][json_pin_name]
):
instruction_repr["outputs"][json_pin_name].append(
access_map[dest_uid]
)
all_logic_steps[instruction_uid] = instruction_repr
# 4. Inferencia EN (sin cambios)
processed_blocks_en_inference = set()
something_changed = True
inference_passes = 0
max_inference_passes = len(all_logic_steps) + 5
try:
sorted_uids_for_en = sorted(
all_logic_steps.keys(),
key=lambda x: int(x) if x.isdigit() else float("inf"),
)
except ValueError:
sorted_uids_for_en = sorted(all_logic_steps.keys())
ordered_logic_list_for_en = [
all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps
]
while something_changed and inference_passes < max_inference_passes:
something_changed = False
inference_passes += 1
for i, instruction in enumerate(ordered_logic_list_for_en):
part_uid = instruction["instruction_uid"]
part_type_original = (
instruction["type"].replace("_scl", "").replace("_error", "")
)
if (
part_type_original in functional_block_types
and "en" not in instruction["inputs"]
and part_uid not in processed_blocks_en_inference
):
inferred_en_source = None
if i > 0:
for j in range(i - 1, -1, -1):
prev_instr = ordered_logic_list_for_en[j]
prev_uid = prev_instr["instruction_uid"]
prev_type_original = (
prev_instr["type"].replace("_scl", "").replace("_error", "")
)
if prev_type_original in rlo_generators:
inferred_en_source = {
"type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type_original,
"source_pin": "out",
}
break
elif prev_type_original in functional_block_types:
source_key_eno = (prev_uid, "eno")
if source_key_eno in source_connections:
inferred_en_source = {
"type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type_original,
"source_pin": "eno",
}
break
else:
continue
elif prev_type_original in [
"Coil",
"SCoil",
"RCoil",
"SetCoil",
"ResetCoil",
"SdCoil",
]:
break
if inferred_en_source:
all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source
processed_blocks_en_inference.add(part_uid)
something_changed = True
# 5. Añadir lógica ENO interesante (sin cambios)
for source_instr_uid, eno_destinations in eno_outputs.items():
if source_instr_uid not in all_logic_steps:
continue
interesting_eno_logic = []
for dest_uid, dest_pin in eno_destinations:
is_direct_en_connection = False
if dest_uid in parts_and_calls_map and dest_pin == "en":
try:
source_idx = sorted_uids_for_en.index(source_instr_uid)
dest_idx = sorted_uids_for_en.index(dest_uid)
if (
dest_idx == source_idx + 1
and parts_and_calls_map[dest_uid]["type"]
in functional_block_types
):
is_direct_en_connection = True
except ValueError:
pass
if not is_direct_en_connection:
target_info = {"target_pin": dest_pin}
if dest_uid in parts_and_calls_map:
target_info.update(
{
"target_type": "instruction",
"target_uid": dest_uid,
"target_name": parts_and_calls_map[dest_uid].get(
"name", parts_and_calls_map[dest_uid].get("type")
),
}
)
elif dest_uid in access_map:
target_info.update(
{
"target_type": "operand",
"target_details": access_map[dest_uid],
}
)
else:
target_info.update(
{"target_type": "unknown", "target_uid": dest_uid}
)
interesting_eno_logic.append(target_info)
if interesting_eno_logic:
all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic
# 6. Ordenar y Devolver (sin cambios)
network_logic_final = [
all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps
]
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": network_logic_final,
}
def convert_xml_to_json(xml_filepath, json_filepath):
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
if not os.path.exists(xml_filepath):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return
try:
print("Paso 1: Parseando archivo XML...")
parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(xml_filepath, parser)
root = tree.getroot()
print("Paso 1: Parseo XML completado.")
print("Paso 2: Buscando el bloque SW.Blocks.FC...") # Asume FC primero
block_list = root.xpath("//*[local-name()='SW.Blocks.FC']")
block_type_found = "FC"
if not block_list:
block_list = root.xpath(
"//*[local-name()='SW.Blocks.FB']"
) # Busca FB si no hay FC
block_type_found = "FB"
if not block_list:
print("Error Crítico: No se encontró <SW.Blocks.FC> ni <SW.Blocks.FB>.")
return
else:
print(
"Advertencia: Se encontró <SW.Blocks.FB> en lugar de <SW.Blocks.FC>."
)
the_block = block_list[0]
print(
f"Paso 2: Bloque SW.Blocks.{block_type_found} encontrado (ID={the_block.get('ID')})."
)
print("Paso 3: Extrayendo atributos del bloque...")
attribute_list_node = the_block.xpath("./*[local-name()='AttributeList']")
block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown"
if attribute_list_node:
attr_list = attribute_list_node[0]
name_node = attr_list.xpath("./*[local-name()='Name']/text()")
block_name_val = name_node[0].strip() if name_node else block_name_val
num_node = attr_list.xpath("./*[local-name()='Number']/text()")
try:
block_number_val = int(num_node[0]) if num_node else None
except ValueError:
block_number_val = None
lang_node = attr_list.xpath(
"./*[local-name()='ProgrammingLanguage']/text()"
)
block_lang_val = lang_node[0].strip() if lang_node else block_lang_val
print(
f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'"
)
else:
print(
f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}."
)
block_comment_val = ""
comment_node_list = the_block.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
if comment_node_list:
block_comment_val = get_multilingual_text(comment_node_list[0])
print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
result = {
"block_name": block_name_val,
"block_number": block_number_val,
"language": block_lang_val,
"block_comment": block_comment_val,
"interface": {},
"networks": [],
}
print("Paso 4: Extrayendo la interfaz del bloque...")
if attribute_list_node:
interface_node_list = attribute_list_node[0].xpath(
".//*[local-name()='Interface']"
)
if interface_node_list:
interface_node = interface_node_list[0]
print("Paso 4: Nodo Interface encontrado.")
for section in interface_node.xpath(".//iface:Section", namespaces=ns):
section_name = section.get("Name")
if not section_name:
continue
members = []
for member in section.xpath("./iface:Member", namespaces=ns):
member_name = member.get("Name")
member_dtype = member.get("Datatype")
if member_name and member_dtype:
members.append(
{"name": member_name, "datatype": member_dtype}
)
if members:
result["interface"][section_name] = members
if not result["interface"]:
print("Advertencia: Interface sin secciones iface:Section válidas.")
else:
print(
"Advertencia: No se encontró <Interface> dentro de <AttributeList>."
)
if not result["interface"]:
print("Advertencia: No se pudo extraer información de la interfaz.")
print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...")
networks_processed_count = 0
result["networks"] = [] # Initialize networks list here
object_list_node = the_block.xpath("./*[local-name()='ObjectList']")
if object_list_node:
compile_units = object_list_node[0].xpath(
"./*[local-name()='SW.Blocks.CompileUnit']"
)
print(
f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit."
)
for network_elem in compile_units:
networks_processed_count += 1
network_id = network_elem.get("ID")
if not network_id:
print(" Advertencia: Se encontró CompileUnit sin ID. Saltando.")
continue
# --- Detectar lenguaje de la red ---
attribute_list = network_elem.xpath("./*[local-name()='AttributeList']")
programming_language = "LAD" # Default a LAD si no se especifica
network_source_node = None # Nodo <NetworkSource>
if attribute_list:
lang_node = attribute_list[0].xpath(
"./*[local-name()='ProgrammingLanguage']/text()"
)
if lang_node:
programming_language = lang_node[0].strip()
# Obtener el nodo NetworkSource para pasarlo a los parsers
network_source_list = attribute_list[0].xpath(
"./*[local-name()='NetworkSource']"
)
if network_source_list:
network_source_node = network_source_list[0]
print(
f" - Procesando Red ID={network_id}, Lenguaje={programming_language}"
)
# --- Extraer título y comentario (común) ---
title_element = network_elem.xpath(
".//*[local-name()='MultilingualText'][@CompositionName='Title']"
)
network_title = (
get_multilingual_text(title_element[0])
if title_element
else f"Network {network_id}"
)
comment_element = network_elem.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
network_comment = (
get_multilingual_text(comment_element[0]) if comment_element else ""
)
# --- Procesar según el lenguaje ---
parsed_network_data = None
if programming_language == "SCL":
structured_text_node = (
network_source_node.xpath("./st:StructuredText", namespaces=ns)
if network_source_node is not None
else None
)
reconstructed_scl = f"// SCL extraction failed for Network {network_id}: StructuredText node not found.\n"
if structured_text_node:
print(
f" Reconstruyendo SCL desde tokens para red {network_id}..."
)
reconstructed_scl = reconstruct_scl_from_tokens(
structured_text_node[0]
)
# print(f" ... SCL reconstruido (parcial):\n{reconstructed_scl[:200]}...") # Preview opcional
else:
print(
f" Advertencia: No se encontró nodo <StructuredText> para red SCL {network_id}."
)
parsed_network_data = {
"id": network_id,
"title": network_title,
"comment": network_comment,
"language": "SCL",
"logic": [
{
"instruction_uid": f"SCL_{network_id}", # UID inventado
"type": "RAW_SCL_CHUNK",
"scl": reconstructed_scl,
}
],
}
# --- NUEVO MANEJO STL ---
elif programming_language == "STL":
statement_list_node = (
network_source_node.xpath("./stl:StatementList", namespaces=ns)
if network_source_node is not None
else None
)
reconstructed_stl = f"// STL extraction failed for Network {network_id}: StatementList node not found.\n"
if statement_list_node:
print(f" Reconstruyendo STL desde StatementList para red {network_id}...")
# Llama a la nueva función de reconstrucción STL
reconstructed_stl = reconstruct_stl_from_statementlist(statement_list_node[0])
# print(f" ... STL reconstruido (parcial):\n{reconstructed_stl[:200]}...") # Preview opcional
else:
print(f" Advertencia: No se encontró nodo <StatementList> para red STL {network_id}.")
# Guardar como un chunk de texto crudo
parsed_network_data = {
"id": network_id,
"title": network_title,
"comment": network_comment,
"language": "STL", # Indicar que es STL
"logic": [
{
"instruction_uid": f"STL_{network_id}", # UID inventado
"type": "RAW_STL_CHUNK", # Nuevo tipo para identificarlo
"stl": reconstructed_stl, # Guardar el texto reconstruido
}
],
}
elif programming_language in ["LAD", "FBD"]:
# Para LAD/FBD, llamar a parse_network (que espera FlgNet dentro de NetworkSource)
# parse_network ya maneja su propio título/comentario si es necesario, pero podemos pasar los extraídos
# Nota: parse_network espera el *CompileUnit* element, no el NetworkSource
parsed_network_data = parse_network(network_elem)
if parsed_network_data:
parsed_network_data["language"] = (
programming_language # Asegurar que el lenguaje se guarda
)
if parsed_network_data.get("error"):
print(
f" Error al parsear red {programming_language} ID={network_id}: {parsed_network_data['error']}"
)
# parsed_network_data = None # Descomentar para omitir redes con error
else:
print(
f" Error: parse_network devolvió None para red {programming_language} ID={network_id}"
)
else:
# Manejar otros lenguajes o casos inesperados
print(
f" Advertencia: Lenguaje no soportado '{programming_language}' en red ID={network_id}. Creando placeholder."
)
parsed_network_data = {
"id": network_id,
"title": network_title,
"comment": network_comment,
"language": programming_language,
"logic": [
{
"instruction_uid": f"UNS_{network_id}",
"type": "UNSUPPORTED_LANG",
"scl": f"// Network {network_id} uses unsupported language: {programming_language}\n",
}
],
}
# Añadir la red procesada (si es válida) al resultado
if parsed_network_data:
result["networks"].append(parsed_network_data)
# --- Fin del bucle for network_elem ---
if networks_processed_count == 0:
print(
"Advertencia: ObjectList no contenía elementos SW.Blocks.CompileUnit."
)
else:
print("Advertencia: No se encontró ObjectList para el bloque.")
print("Paso 6: Escribiendo el resultado en el archivo JSON...")
if not result["interface"]:
print("ADVERTENCIA FINAL: 'interface' está vacía.")
if not result["networks"]:
print("ADVERTENCIA FINAL: 'networks' está vacía.")
try:
with open(json_filepath, "w", encoding="utf-8") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print("Paso 6: Escritura completada.")
print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'")
except IOError as e:
print(
f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"
)
except TypeError as e:
print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
except etree.XMLSyntaxError as e:
print(
f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}"
)
except Exception as e:
print(f"Error Crítico: Error inesperado durante la conversión: {e}")
print("--- Traceback ---")
traceback.print_exc()
print("--- Fin Traceback ---")
if __name__ == "__main__":
# Imports necesarios solo para la ejecución como script principal
import argparse
import os
import sys
parser = argparse.ArgumentParser(
description="Convert Simatic XML LAD/FBD to simplified JSON."
)
parser.add_argument(
"xml_filepath",
nargs="?", # Argumento opcional
default="TestLAD.xml", # Valor por defecto si se ejecuta sin argumentos
help="Path to the input XML file (default: TestLAD.xml)",
)
args = parser.parse_args()
xml_input_file = args.xml_filepath
# Verificar si el archivo de entrada existe
if not os.path.exists(xml_input_file):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_input_file}'")
sys.exit(1) # Salir si el archivo no existe
# Derivar nombre base para archivo de salida JSON
# os.path.basename obtiene el nombre del archivo de la ruta
# os.path.splitext divide el nombre y la extensión
xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0]
# Construir la ruta de salida en el mismo directorio que el script o el XML
output_dir = os.path.dirname(
xml_input_file
) # O usar os.path.dirname(__file__) para el directorio del script
json_output_file = os.path.join(output_dir, f"{xml_filename_base}_simplified.json")
# Llamar a la función principal con los nombres de archivo derivados
convert_xml_to_json(xml_input_file, json_output_file)