Simatic_XML_Parser_to_SCL/x1_to_json.py

721 lines
30 KiB
Python

# -*- coding: utf-8 -*-
import json
import os
from lxml import etree
import traceback
from collections import defaultdict
# --- Namespaces ---
ns = {
"iface": "http://www.siemens.com/automation/Openness/SW/Interface/v5",
"flg": "http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4",
}
# --- Helper Functions ---
# (get_multilingual_text, get_symbol_name, parse_access, parse_part - sin cambios)
# ... (código de estas funciones aquí) ...
def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"):
if element is None:
return ""
try:
xpath_expr = (
f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{default_lang}']]"
f"/*[local-name()='AttributeList']/*[local-name()='Text']"
)
text_items = element.xpath(xpath_expr)
if text_items and text_items[0].text is not None:
return text_items[0].text.strip()
xpath_expr = (
f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{fallback_lang}']]"
f"/*[local-name()='AttributeList']/*[local-name()='Text']"
)
text_items = element.xpath(xpath_expr)
if text_items and text_items[0].text is not None:
return text_items[0].text.strip()
xpath_expr = f".//*[local-name()='MultilingualTextItem']/*[local-name()='AttributeList']/*[local-name()='Text']"
text_items = element.xpath(xpath_expr)
if text_items and text_items[0].text is not None:
return text_items[0].text.strip()
return ""
except Exception as e:
print(f"Advertencia: Error extrayendo MultilingualText: {e}")
return ""
def get_symbol_name(symbol_element):
if symbol_element is None:
return None
try:
components = symbol_element.xpath("./*[local-name()='Component']/@Name")
return ".".join(f'"{c}"' for c in components) if components else None
except Exception as e:
print(f"Advertencia: Excepción en get_symbol_name: {e}")
return None
def parse_access(access_element):
if access_element is None:
return None
uid = access_element.get("UId")
scope = access_element.get("Scope")
info = {"uid": uid, "scope": scope, "type": "unknown"}
symbol = access_element.xpath("./*[local-name()='Symbol']")
constant = access_element.xpath("./*[local-name()='Constant']")
if symbol:
info["type"] = "variable"
info["name"] = get_symbol_name(symbol[0])
if info["name"] is None:
info["type"] = "error_parsing_symbol"
print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}")
return info
elif constant:
info["type"] = "constant"
const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']")
const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']")
info["datatype"] = (
const_type_elem[0].text
if const_type_elem and const_type_elem[0].text is not None
else "Unknown"
)
value_str = (
const_val_elem[0].text
if const_val_elem and const_val_elem[0].text is not None
else None
)
if value_str is None:
info["type"] = "error_parsing_constant"
info["value"] = None
print(f"Error: Constante sin valor Access UID={uid}")
return info
if info["datatype"] == "Unknown":
val_lower = value_str.lower()
if val_lower in ["true", "false"]:
info["datatype"] = "Bool"
elif value_str.isdigit() or (
value_str.startswith("-") and value_str[1:].isdigit()
):
info["datatype"] = "Int"
elif "." in value_str:
try:
float(value_str)
info["datatype"] = "Real"
except ValueError:
pass
elif "#" in value_str:
info["datatype"] = "TypedConstant"
info["value"] = value_str
dtype_lower = info["datatype"].lower()
val_str_processed = value_str.split("#")[-1] if "#" in value_str else value_str
try:
if dtype_lower in [
"int",
"dint",
"udint",
"sint",
"usint",
"lint",
"ulint",
"word",
"dword",
"lword",
"byte",
]:
info["value"] = int(val_str_processed)
elif dtype_lower == "bool":
info["value"] = (
val_str_processed.lower() == "true" or val_str_processed == "1"
)
elif dtype_lower in ["real", "lreal"]:
info["value"] = float(val_str_processed)
elif dtype_lower == "typedconstant":
info["value"] = value_str
except (ValueError, TypeError) as e:
print(
f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}"
)
info["value"] = value_str
else:
info["type"] = "unknown_structure"
print(f"Advertencia: Access UID={uid} no es Symbol ni Constant.")
return info
if info["type"] == "variable" and info.get("name") is None:
print(f"Error Interno: parse_access var sin nombre UID {uid}.")
info["type"] = "error_no_name"
return info
return info
def parse_part(part_element):
"""
Parsea un elemento Part (instrucción) y extrae UID, nombre,
valores de plantilla y pines negados.
"""
if part_element is None: return None
uid = part_element.get('UId'); name = part_element.get('Name')
if not uid or not name: print(f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}"); return None
template_values = {}
try:
for tv in part_element.xpath("./*[local-name()='TemplateValue']"):
tv_name = tv.get('Name'); tv_type = tv.get('Type')
if tv_name and tv_type: template_values[tv_name] = tv_type
except Exception as e: print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}")
# --- INICIO NUEVA LÓGICA PARA NEGACIÓN ---
negated_pins = {} # Diccionario para pines negados: {pin_name: True}
try:
for negated_elem in part_element.xpath("./*[local-name()='Negated']"):
negated_pin_name = negated_elem.get('Name')
if negated_pin_name:
negated_pins[negated_pin_name] = True
# print(f"DEBUG: Pin negado detectado: {name} UID {uid}, Pin: {negated_pin_name}") # Debug
except Exception as e: print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}")
return {
'uid': uid,
'type': name, # Usar 'type' para consistencia
'template_values': template_values,
'negated_pins': negated_pins # Añadir diccionario de pines negados
}
# --- NUEVA FUNCIÓN parse_call ---
def parse_call(call_element):
"""
Parsea un elemento Call (llamada a FC/FB) y extrae su información.
"""
if call_element is None:
return None
uid = call_element.get("UId")
if not uid:
print(
f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"
)
return None
call_info_elem = call_element.xpath("./*[local-name()='CallInfo']")
if not call_info_elem:
print(f"Error: Call UID {uid} sin elemento CallInfo.")
return None
call_info = call_info_elem[0]
block_name = call_info.get("Name")
block_type = call_info.get("BlockType") # FC, FB, etc.
instance_name = None
instance_scope = None
# Si es una llamada a FB, puede tener información de instancia
instance_elem = call_info.xpath("./*[local-name()='Instance']")
if instance_elem:
instance = instance_elem[0]
instance_scope = instance.get("Scope") # Ej: GlobalVariable, InstanceDB
# El nombre de la instancia DB suele estar en Component dentro de Symbol
symbol_elem = instance.xpath("./*[local-name()='Symbol']")
if symbol_elem:
instance_name = get_symbol_name(symbol_elem[0])
if not block_name or not block_type:
print(f"Error: CallInfo para UID {uid} sin Name o BlockType.")
return None
call_data = {
"uid": uid,
"type": "Call", # Tipo genérico para nuestra estructura JSON
"block_name": block_name,
"block_type": block_type,
}
if instance_name:
call_data["instance_db"] = instance_name
if instance_scope:
call_data["instance_scope"] = instance_scope
return call_data
# --- FIN NUEVA FUNCIÓN ---
# --- Función parse_network MODIFICADA ---
def parse_network(network_element):
"""
Parsea una red, extrae lógica (incluyendo Calls) y añade conexiones EN implícitas.
"""
if network_element is None:
return {
"id": "ERROR",
"title": "Invalid Network Element",
"comment": "",
"logic": [],
"error": "Input element was None",
}
network_id = network_element.get("ID")
title_element = network_element.xpath(
".//*[local-name()='MultilingualText'][@CompositionName='Title']"
)
network_title = (
get_multilingual_text(title_element[0])
if title_element
else f"Network {network_id}"
)
comment_title_element = network_element.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
network_comment = (
get_multilingual_text(comment_title_element[0]) if comment_title_element else ""
)
flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns)
if not flgnet_list:
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": [],
"error": "FlgNet not found",
}
flgnet = flgnet_list[0]
# 1. Parsear Access, Parts y Calls
access_map = {
acc_info["uid"]: acc_info
for acc in flgnet.xpath(".//flg:Access", namespaces=ns)
if (acc_info := parse_access(acc))
}
# --- MODIFICADO: Unir Parts y Calls en un solo mapa ---
parts_and_calls_map = {}
instruction_elements = flgnet.xpath(
".//flg:Part | .//flg:Call", namespaces=ns
) # Buscar ambos tipos
for element in instruction_elements:
parsed_info = None
if element.tag.endswith("Part"):
parsed_info = parse_part(element)
elif element.tag.endswith("Call"):
parsed_info = parse_call(element)
if parsed_info and "uid" in parsed_info:
parts_and_calls_map[parsed_info["uid"]] = parsed_info
else:
print(
f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}"
)
# --- FIN MODIFICADO ---
# 2. Parsear Wires y construir mapas de conexiones
wire_connections = defaultdict(list)
source_connections = defaultdict(list)
eno_outputs = defaultdict(list)
flg_ns_uri = ns["flg"]
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
source_uid, source_pin, dest_uid, dest_pin = None, None, None, None
children = wire.getchildren()
if len(children) < 2:
continue
source_elem, dest_elem = children[0], children[1]
if source_elem.tag == etree.QName(flg_ns_uri, "Powerrail"):
source_uid, source_pin = "POWERRAIL", "out"
elif source_elem.tag == etree.QName(flg_ns_uri, "IdentCon"):
source_uid, source_pin = source_elem.get("UId"), "value"
elif source_elem.tag == etree.QName(flg_ns_uri, "NameCon"):
source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name")
if dest_elem.tag == etree.QName(flg_ns_uri, "IdentCon"):
dest_uid, dest_pin = dest_elem.get("UId"), "value"
elif dest_elem.tag == etree.QName(flg_ns_uri, "NameCon"):
dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name")
if dest_uid and dest_pin and source_uid is not None:
dest_key = (dest_uid, dest_pin)
source_key = (source_uid, source_pin)
source_info = (source_uid, source_pin)
dest_info = (dest_uid, dest_pin)
if source_info not in wire_connections[dest_key]:
wire_connections[dest_key].append(source_info)
if dest_info not in source_connections[source_key]:
source_connections[source_key].append(dest_info)
if (
source_pin == "eno" and source_uid in parts_and_calls_map
): # Usar mapa combinado
if dest_info not in eno_outputs[source_uid]:
eno_outputs[source_uid].append(dest_info)
# 3. Construir la representación lógica INICIAL
all_logic_steps = {}
functional_block_types = [
"Move",
"Add",
"Sub",
"Mul",
"Div",
"Mod",
"Convert",
"Call",
] # Añadir Call
rlo_generators = [
"Contact",
"O",
"Eq",
"Ne",
"Gt",
"Lt",
"Ge",
"Le",
"And",
"Xor",
"PBox",
]
# --- MODIFICADO: Iterar sobre el mapa combinado ---
for instruction_uid, instruction_info in parts_and_calls_map.items():
# Usar directamente la info parseada (part_info o call_info)
instruction_repr = {
"instruction_uid": instruction_uid,
**instruction_info,
} # Copiar datos base
instruction_repr["inputs"] = {}
instruction_repr["outputs"] = {}
# --- FIN MODIFICADO ---
# Rellenar inputs explícitos
# Añadir más pines si las llamadas a FB los usan (ej: parámetros FC/FB)
possible_pins = set(
["en", "in", "in1", "in2", "in3", "in4", "bit", "operand", "pre", "clk"]
)
# Añadir pines específicos de la llamada si es un FB? Más complejo.
for dest_pin_name in possible_pins:
dest_key = (instruction_uid, dest_pin_name)
if dest_key in wire_connections:
sources_list = wire_connections[dest_key]
input_sources_repr = []
for source_uid, source_pin in sources_list:
if source_uid == "POWERRAIL":
input_sources_repr.append({"type": "powerrail"})
elif source_uid in access_map:
input_sources_repr.append(access_map[source_uid])
elif source_uid in parts_and_calls_map: # Usar mapa combinado
input_sources_repr.append(
{
"type": "connection",
# Usar el tipo del mapa combinado
"source_instruction_type": parts_and_calls_map[
source_uid
]["type"],
"source_instruction_uid": source_uid,
"source_pin": source_pin,
}
)
else:
input_sources_repr.append(
{"type": "unknown_source", "uid": source_uid}
)
if len(input_sources_repr) == 1:
instruction_repr["inputs"][dest_pin_name] = input_sources_repr[0]
elif len(input_sources_repr) > 1:
instruction_repr["inputs"][dest_pin_name] = input_sources_repr
# Rellenar outputs explícitos (hacia Access)
for source_pin_name in [
"out",
"out1",
"Q",
"eno",
]: # Añadir salidas comunes de FC/FB si es necesario
source_key = (instruction_uid, source_pin_name)
if source_key in source_connections:
for dest_uid, dest_pin in source_connections[source_key]:
if dest_uid in access_map:
if source_pin_name not in instruction_repr["outputs"]:
instruction_repr["outputs"][source_pin_name] = []
if (
access_map[dest_uid]
not in instruction_repr["outputs"][source_pin_name]
):
instruction_repr["outputs"][source_pin_name].append(
access_map[dest_uid]
)
all_logic_steps[instruction_uid] = instruction_repr
# --- 4. INFERENCIA Y PROPAGACIÓN DE CONEXIONES 'EN' IMPLÍCITAS ---
# (Esta lógica probablemente necesite ajustes para considerar 'Call' como bloque funcional)
# print(f"DEBUG: Iniciando inferencia EN para Red {network_id}...")
processed_blocks_en_inference = set()
something_changed = True
inference_passes = 0
max_inference_passes = len(all_logic_steps) + 5
while something_changed and inference_passes < max_inference_passes:
something_changed = False
inference_passes += 1
try:
sorted_uids_for_pass = sorted(
all_logic_steps.keys(),
key=lambda x: int(x) if x.isdigit() else float("inf"),
)
except ValueError:
sorted_uids_for_pass = sorted(all_logic_steps.keys())
current_logic_list = [
all_logic_steps[uid]
for uid in sorted_uids_for_pass
if uid in all_logic_steps
]
for i, instruction in enumerate(
current_logic_list
): # Usar enumerate para obtener índice
part_uid = instruction["instruction_uid"]
part_type = instruction["type"] # Ahora puede ser 'Call'
# Si es un bloque funcional (incluyendo Call) sin 'en' explícito
if (
part_type in functional_block_types
and "en" not in instruction["inputs"]
and part_uid not in processed_blocks_en_inference
):
inferred_en_source = None
my_index = i # Ya tenemos el índice
if my_index > 0:
for j in range(my_index - 1, -1, -1): # Buscar hacia atrás
prev_instr = current_logic_list[j]
prev_uid = prev_instr["instruction_uid"]
prev_type = prev_instr["type"]
if prev_type in rlo_generators:
inferred_en_source = {
"type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type,
"source_pin": "out",
}
break
elif prev_type in functional_block_types:
source_key_eno = (prev_uid, "eno")
if source_key_eno in source_connections:
inferred_en_source = {
"type": "connection",
"source_instruction_uid": prev_uid,
"source_instruction_type": prev_type,
"source_pin": "eno",
}
break
if inferred_en_source:
# Asegurarse de que 'instruction' se refiera al diccionario original
all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source
processed_blocks_en_inference.add(part_uid)
something_changed = True
# --- 5. Añadir lógica ENO interesante ---
# (Necesita usar parts_and_calls_map ahora)
for source_instr_uid, eno_destinations in eno_outputs.items():
if source_instr_uid not in all_logic_steps:
continue
interesting_eno_logic = []
for dest_uid, dest_pin in eno_destinations:
is_direct_en_connection = (
dest_uid in parts_and_calls_map and dest_pin == "en"
) # Check en mapa combinado
if not is_direct_en_connection:
target_info = {"target_pin": dest_pin}
if dest_uid in parts_and_calls_map:
target_info.update(
{
"target_type": "instruction",
"target_uid": dest_uid,
"target_name": parts_and_calls_map[dest_uid].get(
"name", parts_and_calls_map[dest_uid].get("type")
),
}
) # Usar 'name' si existe (Part) o 'type' (Call)
elif dest_uid in access_map:
target_info.update(
{
"target_type": "operand",
"target_details": access_map[dest_uid],
}
)
else:
target_info.update(
{"target_type": "unknown", "target_uid": dest_uid}
)
interesting_eno_logic.append(target_info)
if interesting_eno_logic:
all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic
# --- 6. Ordenar Lógica Final y Devolver ---
try:
sorted_uids = sorted(
all_logic_steps.keys(),
key=lambda x: int(x) if x.isdigit() else float("inf"),
)
except ValueError:
print(f"Advertencia: UIDs no numéricos red {network_id}. Orden alfabético.")
sorted_uids = sorted(all_logic_steps.keys())
network_logic = [
all_logic_steps[uid] for uid in sorted_uids if uid in all_logic_steps
]
return {
"id": network_id,
"title": network_title,
"comment": network_comment,
"logic": network_logic,
}
# --- Función Principal convert_xml_to_json (sin cambios) ---
def convert_xml_to_json(xml_filepath, json_filepath):
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
if not os.path.exists(xml_filepath):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return
try:
print("Paso 1: Parseando archivo XML...")
parser = etree.XMLParser(remove_blank_text=True)
tree = etree.parse(xml_filepath, parser)
root = tree.getroot()
print("Paso 1: Parseo XML completado.")
print("Paso 2: Buscando el bloque SW.Blocks.FC...")
fc_block_list = root.xpath("//*[local-name()='SW.Blocks.FC']")
if not fc_block_list:
print("Error Crítico: No se encontró <SW.Blocks.FC>.")
return
fc_block = fc_block_list[0]
print(f"Paso 2: Bloque SW.Blocks.FC encontrado (ID={fc_block.get('ID')}).")
print("Paso 3: Extrayendo atributos del bloque...")
attribute_list_node = fc_block.xpath("./*[local-name()='AttributeList']")
block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown"
if attribute_list_node:
attr_list = attribute_list_node[0]
name_node = attr_list.xpath("./*[local-name()='Name']/text()")
block_name_val = name_node[0].strip() if name_node else block_name_val
num_node = attr_list.xpath("./*[local-name()='Number']/text()")
block_number_val = (
int(num_node[0])
if num_node and num_node[0].isdigit()
else block_number_val
)
lang_node = attr_list.xpath(
"./*[local-name()='ProgrammingLanguage']/text()"
)
block_lang_val = lang_node[0].strip() if lang_node else block_lang_val
print(
f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'"
)
else:
print("Advertencia: No se encontró AttributeList para el bloque FC.")
block_comment_val = ""
comment_node_list = fc_block.xpath(
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
)
if comment_node_list:
block_comment_val = get_multilingual_text(comment_node_list[0])
print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
result = {
"block_name": block_name_val,
"block_number": block_number_val,
"language": block_lang_val,
"block_comment": block_comment_val,
"interface": {},
"networks": [],
}
print("Paso 4: Extrayendo la interfaz del bloque...")
if attribute_list_node:
interface_node_list = attribute_list_node[0].xpath(
"./*[local-name()='Interface']"
)
if interface_node_list:
interface_node = interface_node_list[0]
print("Paso 4: Nodo Interface encontrado.")
for section in interface_node.xpath(".//iface:Section", namespaces=ns):
section_name = section.get("Name")
members = []
for member in section.xpath("./iface:Member", namespaces=ns):
member_name = member.get("Name")
member_dtype = member.get("Datatype")
if member_name and member_dtype:
members.append(
{"name": member_name, "datatype": member_dtype}
)
if members:
result["interface"][section_name] = members
if not result["interface"]:
print("Advertencia: Interface sin secciones iface:Section válidas.")
else:
print(
"Advertencia: No se encontró <Interface> DENTRO de <AttributeList>."
)
if not result["interface"]:
print("Advertencia: No se pudo extraer información de la interfaz.")
print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...")
networks_processed_count = 0
object_list_node = fc_block.xpath("./*[local-name()='ObjectList']")
if object_list_node:
compile_units = object_list_node[0].xpath(
"./*[local-name()='SW.Blocks.CompileUnit']"
)
print(
f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit."
)
for network_elem in compile_units:
networks_processed_count += 1
# print(f"DEBUG: Procesando red #{networks_processed_count} (ID={network_elem.get('ID')})...")
parsed_network = parse_network(
network_elem
) # Llamada a la función modificada
if parsed_network and parsed_network.get("error") is None:
result["networks"].append(parsed_network)
elif parsed_network:
print(
f"Error: Falló parseo red ID={parsed_network.get('id')}: {parsed_network.get('error')}"
)
result["networks"].append(parsed_network)
else:
print(
f"Error: parse_network devolvió None para CompileUnit (ID={network_elem.get('ID')})."
)
if networks_processed_count == 0:
print("Advertencia: ObjectList sin SW.Blocks.CompileUnit.")
else:
print("Advertencia: No se encontró ObjectList.")
print("Paso 6: Escribiendo el resultado en el archivo JSON...")
if not result["interface"]:
print("ADVERTENCIA FINAL: 'interface' está vacía.")
if not result["networks"]:
print("ADVERTENCIA FINAL: 'networks' está vacía.")
# else: # Chequeo ENO logic
# eno_logic_found = any(instr.get('eno_logic') for net in result.get('networks', []) if net.get('error') is None for instr in net.get('logic', []))
# if eno_logic_found: print("INFO FINAL: Lógica ENO interesante detectada.")
# else: print("INFO FINAL: No se detectó lógica ENO interesante.")
try:
with open(json_filepath, "w", encoding="utf-8") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print(f"Paso 6: Escritura completada.")
print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'")
except IOError as e:
print(
f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}"
)
except TypeError as e:
print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
except etree.XMLSyntaxError as e:
print(f"Error Crítico: Sintaxis XML en '{xml_filepath}'. Detalles: {e}")
except Exception as e:
print(f"Error Crítico: Error inesperado: {e}")
print("--- Traceback ---")
traceback.print_exc()
print("--- Fin Traceback ---")
# --- Punto de Entrada Principal ---
if __name__ == "__main__":
xml_file = "TestLAD.xml" # CAMBIAR AL NUEVO ARCHIVO XML
json_file = xml_file.replace(
".xml", "_simplified.json"
) # Nombre de salida dinámico
convert_xml_to_json(xml_file, json_file)