283 lines
12 KiB
Python
283 lines
12 KiB
Python
import json
|
|
import os
|
|
from lxml import etree
|
|
|
|
# --- Namespaces ---
|
|
# Define los namespaces con prefijos explícitos. Quitamos 'default'.
|
|
ns = {
|
|
'sw': 'http://www.siemens.com/automation/Openness/SW/Interface/v5',
|
|
'flg': 'http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4'
|
|
# No incluimos un prefijo para elementos sin namespace o en el namespace por defecto
|
|
}
|
|
|
|
# --- Helper Functions ---
|
|
|
|
def get_multilingual_text(element, default_lang='en-US', fallback_lang='it-IT'):
|
|
"""Intenta extraer texto de un MultilingualText, priorizando idiomas."""
|
|
if element is None:
|
|
return ""
|
|
try:
|
|
# Accedemos a los elementos sin prefijo directamente
|
|
text_item = element.xpath(f".//MultilingualTextItem[AttributeList/Culture='{default_lang}']/AttributeList/Text", namespaces=ns)
|
|
if text_item:
|
|
return text_item[0].text.strip() if text_item[0].text else ""
|
|
|
|
text_item = element.xpath(f".//MultilingualTextItem[AttributeList/Culture='{fallback_lang}']/AttributeList/Text", namespaces=ns)
|
|
if text_item:
|
|
return text_item[0].text.strip() if text_item[0].text else ""
|
|
|
|
text_item = element.xpath(".//MultilingualTextItem/AttributeList/Text", namespaces=ns)
|
|
if text_item:
|
|
return text_item[0].text.strip() if text_item[0].text else ""
|
|
|
|
except Exception as e:
|
|
print(f"Advertencia: Error extrayendo MultilingualText: {e}")
|
|
pass
|
|
return ""
|
|
|
|
def get_symbol_name(symbol_element):
|
|
"""Construye el nombre completo del símbolo a partir de sus componentes."""
|
|
if symbol_element is None:
|
|
return None
|
|
# Accedemos a Component sin prefijo
|
|
components = symbol_element.xpath("./Component/@Name")
|
|
return ".".join(f'"{c}"' if ' ' in c else c for c in components)
|
|
|
|
def parse_access(access_element):
|
|
"""Parsea un elemento Access para obtener información de variable o constante."""
|
|
info = {
|
|
'uid': access_element.get('UId'),
|
|
'scope': access_element.get('Scope')
|
|
}
|
|
# Accedemos a Symbol y Constant sin prefijo
|
|
symbol = access_element.xpath("./Symbol", namespaces=ns)
|
|
constant = access_element.xpath("./Constant", namespaces=ns)
|
|
|
|
if symbol:
|
|
info['type'] = 'variable'
|
|
info['name'] = get_symbol_name(symbol[0])
|
|
elif constant:
|
|
# Los hijos de Constant tampoco tienen prefijo aparente
|
|
const_type_elem = constant[0].xpath("./ConstantType", namespaces=ns)
|
|
const_val_elem = constant[0].xpath("./ConstantValue", namespaces=ns)
|
|
info['datatype'] = const_type_elem[0].text if const_type_elem else 'Unknown'
|
|
info['value_str'] = const_val_elem[0].text if const_val_elem else None # Guardamos original
|
|
info['value'] = info['value_str'] # Valor procesado
|
|
info['type'] = 'constant'
|
|
|
|
# Intenta convertir el valor si es numérico o booleano
|
|
if info['value'] is not None:
|
|
if info['datatype'].lower() in ['int', 'dint', 'udint', 'sint', 'usint', 'lint', 'ulint', 'word', 'dword', 'lword']:
|
|
# Manejar DINT#60, etc.
|
|
val_str = info['value'].split('#')[-1] if '#' in info['value'] else info['value']
|
|
try:
|
|
info['value'] = int(val_str)
|
|
except (ValueError, TypeError):
|
|
info['value'] = info['value_str'] # Mantener como string si falla
|
|
elif info['datatype'].lower() == 'bool':
|
|
info['value'] = info['value'].lower() == 'true' or info['value'] == '1'
|
|
elif info['datatype'].lower() in ['real', 'lreal']:
|
|
try:
|
|
info['value'] = float(info['value'])
|
|
except (ValueError, TypeError):
|
|
info['value'] = info['value_str'] # Mantener como string si falla
|
|
# Añadir más conversiones de tipos si es necesario
|
|
else:
|
|
# Podría ser TypedConstant, que también tiene un <Constant> dentro
|
|
# Si llegamos aquí, es algo inesperado
|
|
info['type'] = 'unknown_access'
|
|
|
|
return info
|
|
|
|
|
|
def parse_part(part_element):
|
|
"""Parsea un elemento Part (instrucción)."""
|
|
return {
|
|
'uid': part_element.get('UId'),
|
|
'name': part_element.get('Name'),
|
|
# TemplateValue no parece tener prefijo
|
|
'template_values': {tv.get('Name'): tv.get('Type')
|
|
for tv in part_element.xpath("./TemplateValue", namespaces=ns)}
|
|
}
|
|
|
|
# --- Main Parsing Logic ---
|
|
|
|
def parse_network(network_element):
|
|
"""Parsea una red (CompileUnit) y extrae su lógica simplificada."""
|
|
network_logic = []
|
|
network_id = network_element.get('ID')
|
|
# Accedemos a ObjectList y MultilingualText sin prefijo
|
|
title_element = network_element.xpath("./ObjectList/MultilingualText[@CompositionName='Title']", namespaces=ns)
|
|
network_title = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}"
|
|
|
|
# Usamos el prefijo 'flg' para FlgNet
|
|
flgnet = network_element.xpath(".//flg:FlgNet", namespaces=ns)
|
|
if not flgnet:
|
|
return {'id': network_id, 'title': network_title, 'logic': [], 'error': 'FlgNet not found'}
|
|
|
|
flgnet = flgnet[0]
|
|
|
|
# 1. Mapear todos los Access y Parts por su UId
|
|
# Usamos prefijo flg: para Access y Part dentro de FlgNet
|
|
access_map = {}
|
|
for acc in flgnet.xpath(".//flg:Access", namespaces=ns):
|
|
acc_info = parse_access(acc)
|
|
access_map[acc_info['uid']] = acc_info
|
|
|
|
parts_map = {}
|
|
for part in flgnet.xpath(".//flg:Part", namespaces=ns):
|
|
part_info = parse_part(part)
|
|
parts_map[part_info['uid']] = part_info
|
|
|
|
# 2. Construir mapa de conexiones (destino -> fuente)
|
|
wire_connections = {}
|
|
# Usamos prefijo flg: para Wire y sus hijos
|
|
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
|
|
source_uid, source_pin = None, None
|
|
dest_uid, dest_pin = None, None
|
|
|
|
children = wire.getchildren()
|
|
if not children: continue # Ignorar wires vacíos si los hubiera
|
|
|
|
source_elem = children[0]
|
|
dest_elem = children[1] if len(children) > 1 else None
|
|
|
|
# Usamos QName para comparar tags con namespace correctamente
|
|
flg_ns_uri = ns['flg']
|
|
if source_elem.tag == etree.QName(flg_ns_uri, 'Powerrail'):
|
|
source_uid, source_pin = 'POWERRAIL', 'out'
|
|
elif source_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'):
|
|
source_uid = source_elem.get('UId')
|
|
source_pin = 'value' # Pin implícito para Access
|
|
elif source_elem.tag == etree.QName(flg_ns_uri, 'NameCon'):
|
|
source_uid = source_elem.get('UId')
|
|
source_pin = source_elem.get('Name')
|
|
|
|
if dest_elem is not None:
|
|
if dest_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'):
|
|
dest_uid = dest_elem.get('UId')
|
|
dest_pin = 'value'
|
|
elif dest_elem.tag == etree.QName(flg_ns_uri, 'NameCon'):
|
|
dest_uid = dest_elem.get('UId')
|
|
dest_pin = dest_elem.get('Name')
|
|
|
|
if dest_uid and dest_pin and (source_uid is not None or source_pin == 'out'):
|
|
wire_connections[(dest_uid, dest_pin)] = (source_uid, source_pin)
|
|
|
|
|
|
# 3. Iterar sobre las instrucciones (Parts) y encontrar sus conexiones
|
|
for part_uid, part_info in parts_map.items():
|
|
instruction_repr = {
|
|
'instruction_uid': part_uid,
|
|
'type': part_info['name'],
|
|
'inputs': {},
|
|
'outputs': {}
|
|
}
|
|
|
|
connected_inputs = {k[1]: v for k, v in wire_connections.items() if k[0] == part_uid}
|
|
|
|
for dest_pin, (source_uid, source_pin) in connected_inputs.items():
|
|
if source_uid == 'POWERRAIL':
|
|
instruction_repr['inputs'][dest_pin] = {'type': 'powerrail'}
|
|
elif source_uid in access_map:
|
|
instruction_repr['inputs'][dest_pin] = access_map[source_uid]
|
|
elif source_uid in parts_map:
|
|
instruction_repr['inputs'][dest_pin] = {
|
|
'type': 'connection',
|
|
'source_instruction_uid': source_uid,
|
|
'source_instruction_type': parts_map[source_uid]['name'],
|
|
'source_pin': source_pin
|
|
}
|
|
else:
|
|
# Podría ser una conexión rota o no encontrada
|
|
instruction_repr['inputs'][dest_pin] = {'type': 'unknown_source', 'uid': source_uid}
|
|
|
|
# Encontrar salidas conectadas a Access
|
|
for (conn_dest_uid, conn_dest_pin), (conn_source_uid, conn_source_pin) in wire_connections.items():
|
|
if conn_source_uid == part_uid and conn_dest_uid in access_map:
|
|
if conn_source_pin not in instruction_repr['outputs']:
|
|
instruction_repr['outputs'][conn_source_pin] = []
|
|
# Añadimos la info del Access que es el destino
|
|
instruction_repr['outputs'][conn_source_pin].append(access_map[conn_dest_uid])
|
|
|
|
network_logic.append(instruction_repr)
|
|
|
|
return {'id': network_id, 'title': network_title, 'logic': network_logic}
|
|
|
|
|
|
def convert_xml_to_json(xml_filepath, json_filepath):
|
|
"""Función principal para convertir el XML a JSON."""
|
|
if not os.path.exists(xml_filepath):
|
|
print(f"Error: Archivo XML no encontrado en {xml_filepath}")
|
|
return
|
|
|
|
try:
|
|
tree = etree.parse(xml_filepath)
|
|
root = tree.getroot()
|
|
|
|
# Buscar SW.Blocks.FC usando local-name() para robustez inicial
|
|
# O asumir que no tiene namespace si está bajo Document/Engineering
|
|
fc_block = root.xpath("//*[local-name()='SW.Blocks.FC']")
|
|
if not fc_block:
|
|
print("Error: No se encontró el elemento <SW.Blocks.FC>")
|
|
return
|
|
fc_block = fc_block[0]
|
|
|
|
# --- Extraer Info General (sin prefijos) ---
|
|
block_name = fc_block.xpath("./AttributeList/Name/text()")
|
|
block_number = fc_block.xpath("./AttributeList/Number/text()")
|
|
block_lang = fc_block.xpath("./AttributeList/ProgrammingLanguage/text()")
|
|
|
|
result = {
|
|
"block_name": block_name[0].strip() if block_name else "Unknown",
|
|
"block_number": int(block_number[0]) if block_number and block_number[0].isdigit() else None,
|
|
"language": block_lang[0].strip() if block_lang else "Unknown",
|
|
"interface": {},
|
|
"networks": []
|
|
}
|
|
|
|
# --- Extraer Interfaz (con prefijo sw:) ---
|
|
interface_sections = fc_block.xpath(".//sw:Interface/sw:Sections/sw:Section", namespaces=ns)
|
|
for section in interface_sections:
|
|
section_name = section.get('Name')
|
|
members = []
|
|
for member in section.xpath("./sw:Member", namespaces=ns):
|
|
members.append({
|
|
"name": member.get('Name'),
|
|
"datatype": member.get('Datatype')
|
|
})
|
|
result["interface"][section_name] = members
|
|
|
|
# --- Extraer Lógica de Redes (CompileUnit sin prefijo, contenido con flg:) ---
|
|
# Buscamos SW.Blocks.CompileUnit sin prefijo, asumiendo que es hijo directo de ObjectList
|
|
networks = fc_block.xpath("./ObjectList/SW.Blocks.CompileUnit", namespaces=ns)
|
|
network_count = 0
|
|
for network_elem in networks:
|
|
network_count += 1
|
|
parsed_network = parse_network(network_elem)
|
|
result["networks"].append(parsed_network)
|
|
|
|
if network_count == 0:
|
|
print("Advertencia: No se encontraron redes (SW.Blocks.CompileUnit). Verifica la estructura del XML.")
|
|
|
|
|
|
# --- Escribir resultado a JSON ---
|
|
with open(json_filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(result, f, indent=4, ensure_ascii=False)
|
|
|
|
print(f"Conversión completada. Archivo JSON guardado en: {json_filepath}")
|
|
|
|
except etree.XMLSyntaxError as e:
|
|
print(f"Error de sintaxis XML: {e}")
|
|
except Exception as e:
|
|
print(f"Ocurrió un error inesperado durante el procesamiento: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# --- Ejecución ---
|
|
if __name__ == "__main__":
|
|
xml_file = 'BlenderRun_ProdTime.xml'
|
|
json_file = 'BlenderRun_ProdTime_simplified.json'
|
|
|
|
convert_xml_to_json(xml_file, json_file) |