526 lines
36 KiB
Python
526 lines
36 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
import os
|
|
from lxml import etree
|
|
import traceback
|
|
from collections import defaultdict
|
|
|
|
# --- Namespaces ---
|
|
ns = {
|
|
"iface": "http://www.siemens.com/automation/Openness/SW/Interface/v5",
|
|
"flg": "http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4",
|
|
}
|
|
|
|
|
|
# --- Helper Functions ---
|
|
def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"):
|
|
# (Sin cambios respecto a la versión anterior)
|
|
if element is None:
|
|
return ""
|
|
try:
|
|
xpath_expr = (
|
|
f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{default_lang}']]"
|
|
f"/*[local-name()='AttributeList']/*[local-name()='Text']"
|
|
)
|
|
text_items = element.xpath(xpath_expr)
|
|
if text_items and text_items[0].text is not None:
|
|
return text_items[0].text.strip()
|
|
xpath_expr = (
|
|
f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{fallback_lang}']]"
|
|
f"/*[local-name()='AttributeList']/*[local-name()='Text']"
|
|
)
|
|
text_items = element.xpath(xpath_expr)
|
|
if text_items and text_items[0].text is not None:
|
|
return text_items[0].text.strip()
|
|
xpath_expr = f".//*[local-name()='MultilingualTextItem']/*[local-name()='AttributeList']/*[local-name()='Text']"
|
|
text_items = element.xpath(xpath_expr)
|
|
if text_items and text_items[0].text is not None:
|
|
return text_items[0].text.strip()
|
|
return ""
|
|
except Exception as e:
|
|
print(f"Advertencia: Error extrayendo MultilingualText: {e}")
|
|
return ""
|
|
|
|
def get_symbol_name(symbol_element):
|
|
# (Sin cambios respecto a la versión anterior)
|
|
if symbol_element is None:
|
|
return None
|
|
try:
|
|
components = symbol_element.xpath("./*[local-name()='Component']/@Name")
|
|
return ".".join(f'"{c}"' for c in components) if components else None
|
|
except Exception as e:
|
|
print(f"Advertencia: Excepción en get_symbol_name: {e}")
|
|
return None
|
|
|
|
def parse_access(access_element):
|
|
# (Sin cambios respecto a la versión anterior)
|
|
if access_element is None: return None
|
|
uid = access_element.get("UId"); scope = access_element.get("Scope")
|
|
info = {"uid": uid, "scope": scope, "type": "unknown"}
|
|
symbol = access_element.xpath("./*[local-name()='Symbol']")
|
|
constant = access_element.xpath("./*[local-name()='Constant']")
|
|
if symbol:
|
|
info["type"] = "variable"
|
|
info["name"] = get_symbol_name(symbol[0])
|
|
if info["name"] is None: info["type"] = "error_parsing_symbol"; print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}"); return info
|
|
elif constant:
|
|
info["type"] = "constant"
|
|
const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']")
|
|
const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']")
|
|
info["datatype"] = const_type_elem[0].text if const_type_elem and const_type_elem[0].text is not None else "Unknown"
|
|
value_str = const_val_elem[0].text if const_val_elem and const_val_elem[0].text is not None else None
|
|
if value_str is None: info["type"] = "error_parsing_constant"; info["value"] = None; print(f"Error: Constante sin valor Access UID={uid}"); return info
|
|
if info["datatype"] == "Unknown":
|
|
val_lower = value_str.lower()
|
|
if val_lower in ["true", "false"]: info["datatype"] = "Bool"
|
|
elif value_str.isdigit() or (value_str.startswith('-') and value_str[1:].isdigit()): info["datatype"] = "Int"
|
|
elif '.' in value_str:
|
|
try: float(value_str); info["datatype"] = "Real"
|
|
except ValueError: pass
|
|
elif '#' in value_str: info["datatype"] = "TypedConstant"
|
|
info["value"] = value_str
|
|
dtype_lower = info["datatype"].lower()
|
|
val_str_processed = value_str.split('#')[-1] if '#' in value_str else value_str
|
|
try:
|
|
if dtype_lower in ['int', 'dint', 'udint', 'sint', 'usint', 'lint', 'ulint', 'word', 'dword', 'lword', 'byte']: info["value"] = int(val_str_processed)
|
|
elif dtype_lower == 'bool': info["value"] = (val_str_processed.lower() == 'true' or val_str_processed == '1')
|
|
elif dtype_lower in ['real', 'lreal']: info["value"] = float(val_str_processed)
|
|
elif dtype_lower == 'typedconstant': info["value"] = value_str
|
|
except (ValueError, TypeError) as e: print(f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}"); info["value"] = value_str
|
|
else: info["type"] = "unknown_structure"; print(f"Advertencia: Access UID={uid} no es Symbol ni Constant."); return info
|
|
if info['type'] == 'variable' and info.get('name') is None: print(f"Error Interno: parse_access var sin nombre UID {uid}."); info['type'] = "error_no_name"; return info
|
|
return info
|
|
|
|
def parse_part(part_element):
|
|
# (Sin cambios respecto a la versión anterior)
|
|
if part_element is None: return None
|
|
uid = part_element.get('UId'); name = part_element.get('Name')
|
|
if not uid or not name: print(f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}"); return None
|
|
template_values = {}
|
|
try:
|
|
for tv in part_element.xpath("./*[local-name()='TemplateValue']"):
|
|
tv_name = tv.get('Name'); tv_type = tv.get('Type')
|
|
if tv_name and tv_type: template_values[tv_name] = tv_type
|
|
except Exception as e: print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}")
|
|
negated_pins = {}
|
|
try:
|
|
for negated_elem in part_element.xpath("./*[local-name()='Negated']"):
|
|
negated_pin_name = negated_elem.get('Name')
|
|
if negated_pin_name: negated_pins[negated_pin_name] = True
|
|
except Exception as e: print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}")
|
|
return {'uid': uid, 'type': name, 'template_values': template_values, 'negated_pins': negated_pins}
|
|
|
|
def parse_call(call_element):
|
|
# (Mantiene la corrección para DB de instancia)
|
|
if call_element is None: return None
|
|
uid = call_element.get("UId")
|
|
if not uid: print(f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}"); return None
|
|
call_info_elem = call_element.xpath("./*[local-name()='CallInfo']")
|
|
if not call_info_elem: print(f"Error: Call UID {uid} sin elemento CallInfo."); return None
|
|
call_info = call_info_elem[0]
|
|
block_name = call_info.get("Name")
|
|
block_type = call_info.get("BlockType")
|
|
instance_name = None; instance_scope = None
|
|
if not block_name or not block_type: print(f"Error: CallInfo para UID {uid} sin Name o BlockType."); return None
|
|
if block_type == "FB":
|
|
instance_elem_list = call_info.xpath("./*[local-name()='Instance']")
|
|
if instance_elem_list:
|
|
instance_elem = instance_elem_list[0]
|
|
instance_scope = instance_elem.get("Scope")
|
|
component_elem_list = instance_elem.xpath("./*[local-name()='Component']") # Busca Component directo
|
|
if component_elem_list:
|
|
component_elem = component_elem_list[0]
|
|
db_name_raw = component_elem.get('Name')
|
|
if db_name_raw: instance_name = f'"{db_name_raw}"' # Añade comillas
|
|
else: print(f"Advertencia: <Component> dentro de <Instance> para FB Call UID {uid} no tiene atributo 'Name'.")
|
|
else: print(f"Advertencia: No se encontró <Component> dentro de <Instance> para FB Call UID {uid}. No se pudo obtener el nombre del DB.")
|
|
else: print(f"Advertencia: FB Call '{block_name}' UID {uid} no tiene elemento <Instance>.")
|
|
call_data = {"uid": uid, "type": "Call", "block_name": block_name, "block_type": block_type}
|
|
if instance_name: call_data["instance_db"] = instance_name
|
|
if instance_scope: call_data["instance_scope"] = instance_scope
|
|
return call_data
|
|
|
|
# --- Función parse_network con XPath corregido para Title/Comment ---
|
|
# --- Función parse_network MODIFICADA (maneja multi-destino en Wire) ---
|
|
def parse_network(network_element):
|
|
"""
|
|
Parsea una red, extrae lógica y añade conexiones EN implícitas.
|
|
Maneja wires con múltiples destinos.
|
|
"""
|
|
if network_element is None:
|
|
return {"id": "ERROR", "title": "Invalid Network Element", "comment": "", "logic": [], "error": "Input element was None"}
|
|
|
|
network_id = network_element.get("ID")
|
|
|
|
# --- Extracción Título/Comentario (sin cambios respecto a la última versión) ---
|
|
title_element = network_element.xpath(
|
|
".//*[local-name()='MultilingualText'][@CompositionName='Title']"
|
|
)
|
|
network_title = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}"
|
|
comment_element = network_element.xpath(
|
|
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']"
|
|
)
|
|
network_comment = get_multilingual_text(comment_element[0]) if comment_element else ""
|
|
|
|
flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns)
|
|
if not flgnet_list:
|
|
# print(f"Advertencia: FlgNet no encontrado en Red ID={network_id}. Puede estar vacía o ser comentario.")
|
|
return {"id": network_id, "title": network_title, "comment": network_comment, "logic": [], "error": "FlgNet not found"}
|
|
flgnet = flgnet_list[0]
|
|
|
|
# 1. Parsear Access, Parts y Calls (sin cambios)
|
|
access_map = {acc_info["uid"]: acc_info for acc in flgnet.xpath(".//flg:Access", namespaces=ns) if (acc_info := parse_access(acc)) and acc_info['type'] != 'unknown'}
|
|
parts_and_calls_map = {}
|
|
instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns)
|
|
for element in instruction_elements:
|
|
parsed_info = None
|
|
tag_name = etree.QName(element.tag).localname # Obtener nombre local de la etiqueta
|
|
if tag_name == "Part": parsed_info = parse_part(element)
|
|
elif tag_name == "Call": parsed_info = parse_call(element)
|
|
if parsed_info and "uid" in parsed_info: parts_and_calls_map[parsed_info["uid"]] = parsed_info
|
|
else: print(f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}")
|
|
|
|
# --- 2. Parsear Wires (MODIFICADO para multi-destino) ---
|
|
wire_connections = defaultdict(list) # (dest_uid, dest_pin) -> [(src_uid, src_pin), ...]
|
|
source_connections = defaultdict(list) # (src_uid, src_pin) -> [(dest_uid, dest_pin), ...]
|
|
eno_outputs = defaultdict(list) # src_uid -> [(dest_uid, dest_pin), ...] (conexiones DESDE eno)
|
|
|
|
flg_ns_uri = ns["flg"] # Cache namespace URI
|
|
qname_powerrail = etree.QName(flg_ns_uri, "Powerrail")
|
|
qname_identcon = etree.QName(flg_ns_uri, "IdentCon")
|
|
qname_namecon = etree.QName(flg_ns_uri, "NameCon")
|
|
|
|
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
|
|
children = wire.getchildren()
|
|
if len(children) < 2: continue # Ignorar wires sin fuente y al menos un destino
|
|
|
|
source_elem = children[0]
|
|
source_uid, source_pin = None, None
|
|
|
|
# Determinar fuente
|
|
if source_elem.tag == qname_powerrail: source_uid, source_pin = "POWERRAIL", "out"
|
|
elif source_elem.tag == qname_identcon: source_uid, source_pin = source_elem.get("UId"), "value" # Acceso a variable/constante
|
|
elif source_elem.tag == qname_namecon: source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name") # Salida de instrucción
|
|
|
|
if source_uid is None: continue # No se pudo determinar la fuente
|
|
|
|
source_info = (source_uid, source_pin) # Par de fuente
|
|
|
|
# Iterar sobre TODOS los posibles destinos (desde el segundo hijo en adelante)
|
|
for dest_elem in children[1:]:
|
|
dest_uid, dest_pin = None, None
|
|
|
|
# Determinar destino
|
|
if dest_elem.tag == qname_identcon: dest_uid, dest_pin = dest_elem.get("UId"), "value" # Entrada a variable/constante (Coil, etc.)
|
|
elif dest_elem.tag == qname_namecon: dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name") # Entrada a instrucción
|
|
|
|
# Guardar conexiones si son válidas
|
|
if dest_uid is not None and dest_pin is not None:
|
|
# Mapa de Conexiones (Destino -> [Fuentes])
|
|
dest_key = (dest_uid, dest_pin)
|
|
if source_info not in wire_connections[dest_key]:
|
|
wire_connections[dest_key].append(source_info)
|
|
|
|
# Mapa de Fuentes (Fuente -> [Destinos])
|
|
source_key = (source_uid, source_pin)
|
|
dest_info = (dest_uid, dest_pin)
|
|
if dest_info not in source_connections[source_key]:
|
|
source_connections[source_key].append(dest_info)
|
|
|
|
# Registrar conexiones que SALEN de un pin 'eno'
|
|
if source_pin == "eno" and source_uid in parts_and_calls_map:
|
|
if dest_info not in eno_outputs[source_uid]:
|
|
eno_outputs[source_uid].append(dest_info)
|
|
# else: # Debug opcional si un elemento no es destino válido
|
|
# print(f"Advertencia: Elemento en Wire {wire.get('UId')} no es destino válido: {etree.tostring(dest_elem)}")
|
|
# --- FIN MODIFICACIÓN Wire ---
|
|
|
|
# 3. Construcción Lógica Inicial (sin cambios)
|
|
all_logic_steps = {}
|
|
functional_block_types = ['Move', 'Add', 'Sub', 'Mul', 'Div', 'Mod', 'Convert', 'Call', 'Se', 'Sd', 'BLKMOV']
|
|
rlo_generators = ['Contact', 'O', 'Eq', 'Ne', 'Gt', 'Lt', 'Ge', 'Le', 'And', 'Xor', 'PBox', 'NBox']
|
|
for instruction_uid, instruction_info in parts_and_calls_map.items():
|
|
instruction_repr = {"instruction_uid": instruction_uid, **instruction_info}; instruction_repr["inputs"] = {}; instruction_repr["outputs"] = {}
|
|
possible_input_pins = set(['en', 'in', 'in1', 'in2', 'in3', 'in4', 's', 'r', 'clk', 'cu', 'cd', 'ld', 'pv', 'tv', 'bit', 'operand', 'pre', 'SRCBLK'])
|
|
for dest_pin_name in possible_input_pins:
|
|
dest_key = (instruction_uid, dest_pin_name)
|
|
if dest_key in wire_connections:
|
|
sources_list = wire_connections[dest_key]; input_sources_repr = []
|
|
for source_uid, source_pin in sources_list:
|
|
if source_uid == "POWERRAIL": input_sources_repr.append({"type": "powerrail"})
|
|
elif source_uid in access_map: input_sources_repr.append(access_map[source_uid])
|
|
elif source_uid in parts_and_calls_map: input_sources_repr.append({"type": "connection", "source_instruction_type": parts_and_calls_map[source_uid]["type"], "source_instruction_uid": source_uid, "source_pin": source_pin})
|
|
else: input_sources_repr.append({"type": "unknown_source", "uid": source_uid})
|
|
if len(input_sources_repr) == 1: instruction_repr["inputs"][dest_pin_name] = input_sources_repr[0]
|
|
elif len(input_sources_repr) > 1: instruction_repr["inputs"][dest_pin_name] = input_sources_repr
|
|
possible_output_pins = set(['out', 'out1', 'Q', 'eno', 'RET_VAL', 'DSTBLK', 'q', 'rt', 'rtbcd', 'cv', 'cvbcd'])
|
|
for source_pin_name in possible_output_pins:
|
|
source_key = (instruction_uid, source_pin_name)
|
|
if source_key in source_connections:
|
|
for dest_uid, dest_pin in source_connections[source_key]:
|
|
if dest_uid in access_map:
|
|
if source_pin_name not in instruction_repr["outputs"]: instruction_repr["outputs"][source_pin_name] = []
|
|
if access_map[dest_uid] not in instruction_repr["outputs"][source_pin_name]: instruction_repr["outputs"][source_pin_name].append(access_map[dest_uid])
|
|
all_logic_steps[instruction_uid] = instruction_repr
|
|
|
|
# 4. Inferencia EN (sin cambios)
|
|
processed_blocks_en_inference = set(); something_changed = True; inference_passes = 0; max_inference_passes = len(all_logic_steps) + 5
|
|
try: sorted_uids_for_en = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf'))
|
|
except ValueError: sorted_uids_for_en = sorted(all_logic_steps.keys())
|
|
ordered_logic_list_for_en = [all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps]
|
|
while something_changed and inference_passes < max_inference_passes:
|
|
something_changed = False; inference_passes += 1
|
|
for i, instruction in enumerate(ordered_logic_list_for_en):
|
|
part_uid = instruction["instruction_uid"]; part_type_original = instruction["type"].replace('_scl', '').replace('_error', '')
|
|
if (part_type_original in functional_block_types and "en" not in instruction["inputs"] and part_uid not in processed_blocks_en_inference):
|
|
inferred_en_source = None
|
|
if i > 0:
|
|
for j in range(i - 1, -1, -1):
|
|
prev_instr = ordered_logic_list_for_en[j]; prev_uid = prev_instr["instruction_uid"]; prev_type_original = prev_instr["type"].replace('_scl', '').replace('_error', '')
|
|
if prev_type_original in rlo_generators: inferred_en_source = {"type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "out"}; break
|
|
elif prev_type_original in functional_block_types:
|
|
source_key_eno = (prev_uid, "eno")
|
|
if source_key_eno in source_connections: inferred_en_source = {"type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "eno"}; break
|
|
else: continue
|
|
elif prev_type_original in ['Coil', 'SCoil', 'RCoil', 'SetCoil', 'ResetCoil', 'SdCoil']: break
|
|
if inferred_en_source: all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source; processed_blocks_en_inference.add(part_uid); something_changed = True
|
|
|
|
# 5. Añadir lógica ENO interesante (sin cambios)
|
|
for source_instr_uid, eno_destinations in eno_outputs.items():
|
|
if source_instr_uid not in all_logic_steps: continue
|
|
interesting_eno_logic = []
|
|
for dest_uid, dest_pin in eno_destinations:
|
|
is_direct_en_connection = False
|
|
if dest_uid in parts_and_calls_map and dest_pin == 'en':
|
|
try:
|
|
source_idx = sorted_uids_for_en.index(source_instr_uid); dest_idx = sorted_uids_for_en.index(dest_uid)
|
|
if dest_idx == source_idx + 1 and parts_and_calls_map[dest_uid]['type'] in functional_block_types: is_direct_en_connection = True
|
|
except ValueError: pass
|
|
if not is_direct_en_connection:
|
|
target_info = {"target_pin": dest_pin}
|
|
if dest_uid in parts_and_calls_map: target_info.update({"target_type": "instruction", "target_uid": dest_uid, "target_name": parts_and_calls_map[dest_uid].get("name", parts_and_calls_map[dest_uid].get("type"))})
|
|
elif dest_uid in access_map: target_info.update({"target_type": "operand", "target_details": access_map[dest_uid]})
|
|
else: target_info.update({"target_type": "unknown", "target_uid": dest_uid})
|
|
interesting_eno_logic.append(target_info)
|
|
if interesting_eno_logic: all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic
|
|
|
|
# 6. Ordenar y Devolver (sin cambios)
|
|
network_logic_final = [all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps]
|
|
return {"id": network_id, "title": network_title, "comment": network_comment, "logic": network_logic_final}
|
|
"""
|
|
Parsea una red, extrae lógica y añade conexiones EN implícitas.
|
|
"""
|
|
if network_element is None:
|
|
return {"id": "ERROR", "title": "Invalid Network Element", "comment": "", "logic": [], "error": "Input element was None"}
|
|
|
|
network_id = network_element.get("ID")
|
|
|
|
# --- CORRECCIÓN XPath para Title y Comment ---
|
|
# Usar local-name() como en la versión original que funcionaba para esto
|
|
title_element = network_element.xpath(
|
|
".//*[local-name()='MultilingualText'][@CompositionName='Title']" # Busca en cualquier nivel descendiente
|
|
)
|
|
network_title = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}"
|
|
|
|
comment_element = network_element.xpath(
|
|
"./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" # Busca directamente bajo ObjectList
|
|
)
|
|
network_comment = get_multilingual_text(comment_element[0]) if comment_element else ""
|
|
# --- FIN CORRECCIÓN XPath ---
|
|
|
|
flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns)
|
|
if not flgnet_list:
|
|
return {"id": network_id, "title": network_title, "comment": network_comment, "logic": [], "error": "FlgNet not found"}
|
|
flgnet = flgnet_list[0]
|
|
|
|
# 1. Parsear Access, Parts y Calls (usando parse_call corregido)
|
|
access_map = {acc_info["uid"]: acc_info for acc in flgnet.xpath(".//flg:Access", namespaces=ns) if (acc_info := parse_access(acc)) and acc_info['type'] != 'unknown'}
|
|
parts_and_calls_map = {}
|
|
instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns)
|
|
for element in instruction_elements:
|
|
parsed_info = None
|
|
if element.tag == etree.QName(ns["flg"], "Part"): parsed_info = parse_part(element)
|
|
elif element.tag == etree.QName(ns["flg"], "Call"): parsed_info = parse_call(element) # Usa la versión con fix DB
|
|
if parsed_info and "uid" in parsed_info: parts_and_calls_map[parsed_info["uid"]] = parsed_info
|
|
else: print(f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}")
|
|
|
|
# 2. Parsear Wires (sin cambios)
|
|
wire_connections = defaultdict(list); source_connections = defaultdict(list); eno_outputs = defaultdict(list)
|
|
flg_ns_uri = ns["flg"]
|
|
for wire in flgnet.xpath(".//flg:Wire", namespaces=ns):
|
|
source_uid, source_pin, dest_uid, dest_pin = None, None, None, None
|
|
children = wire.getchildren();
|
|
if len(children) < 2: continue
|
|
source_elem, dest_elem = children[0], children[1]
|
|
if source_elem.tag == etree.QName(flg_ns_uri, "Powerrail"): source_uid, source_pin = "POWERRAIL", "out"
|
|
elif source_elem.tag == etree.QName(flg_ns_uri, "IdentCon"): source_uid, source_pin = source_elem.get("UId"), "value"
|
|
elif source_elem.tag == etree.QName(flg_ns_uri, "NameCon"): source_uid, source_pin = source_elem.get("UId"), source_elem.get("Name")
|
|
if dest_elem.tag == etree.QName(flg_ns_uri, "IdentCon"): dest_uid, dest_pin = dest_elem.get("UId"), "value"
|
|
elif dest_elem.tag == etree.QName(flg_ns_uri, "NameCon"): dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get("Name")
|
|
if source_uid is not None and source_pin is not None and dest_uid is not None and dest_pin is not None:
|
|
dest_key = (dest_uid, dest_pin); source_info = (source_uid, source_pin)
|
|
if source_info not in wire_connections[dest_key]: wire_connections[dest_key].append(source_info)
|
|
source_key = (source_uid, source_pin); dest_info = (dest_uid, dest_pin)
|
|
if dest_info not in source_connections[source_key]: source_connections[source_key].append(dest_info)
|
|
if source_pin == "eno" and source_uid in parts_and_calls_map:
|
|
if dest_info not in eno_outputs[source_uid]: eno_outputs[source_uid].append(dest_info)
|
|
|
|
# 3. Construcción Lógica Inicial (sin cambios)
|
|
all_logic_steps = {}
|
|
functional_block_types = ['Move', 'Add', 'Sub', 'Mul', 'Div', 'Mod', 'Convert', 'Call', 'Se', 'Sd', 'BLKMOV']
|
|
rlo_generators = ['Contact', 'O', 'Eq', 'Ne', 'Gt', 'Lt', 'Ge', 'Le', 'And', 'Xor', 'PBox', 'NBox']
|
|
for instruction_uid, instruction_info in parts_and_calls_map.items():
|
|
instruction_repr = {"instruction_uid": instruction_uid, **instruction_info}; instruction_repr["inputs"] = {}; instruction_repr["outputs"] = {}
|
|
possible_input_pins = set(['en', 'in', 'in1', 'in2', 'in3', 'in4', 's', 'r', 'clk', 'cu', 'cd', 'ld', 'pv', 'tv', 'bit', 'operand', 'pre', 'SRCBLK'])
|
|
for dest_pin_name in possible_input_pins:
|
|
dest_key = (instruction_uid, dest_pin_name)
|
|
if dest_key in wire_connections:
|
|
sources_list = wire_connections[dest_key]; input_sources_repr = []
|
|
for source_uid, source_pin in sources_list:
|
|
if source_uid == "POWERRAIL": input_sources_repr.append({"type": "powerrail"})
|
|
elif source_uid in access_map: input_sources_repr.append(access_map[source_uid])
|
|
elif source_uid in parts_and_calls_map: input_sources_repr.append({"type": "connection", "source_instruction_type": parts_and_calls_map[source_uid]["type"], "source_instruction_uid": source_uid, "source_pin": source_pin})
|
|
else: input_sources_repr.append({"type": "unknown_source", "uid": source_uid})
|
|
if len(input_sources_repr) == 1: instruction_repr["inputs"][dest_pin_name] = input_sources_repr[0]
|
|
elif len(input_sources_repr) > 1: instruction_repr["inputs"][dest_pin_name] = input_sources_repr
|
|
possible_output_pins = set(['out', 'out1', 'Q', 'eno', 'RET_VAL', 'DSTBLK', 'q', 'rt', 'rtbcd', 'cv', 'cvbcd'])
|
|
for source_pin_name in possible_output_pins:
|
|
source_key = (instruction_uid, source_pin_name)
|
|
if source_key in source_connections:
|
|
for dest_uid, dest_pin in source_connections[source_key]:
|
|
if dest_uid in access_map:
|
|
if source_pin_name not in instruction_repr["outputs"]: instruction_repr["outputs"][source_pin_name] = []
|
|
if access_map[dest_uid] not in instruction_repr["outputs"][source_pin_name]: instruction_repr["outputs"][source_pin_name].append(access_map[dest_uid])
|
|
all_logic_steps[instruction_uid] = instruction_repr
|
|
|
|
# 4. Inferencia EN (sin cambios)
|
|
processed_blocks_en_inference = set(); something_changed = True; inference_passes = 0; max_inference_passes = len(all_logic_steps) + 5
|
|
try: sorted_uids_for_en = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf'))
|
|
except ValueError: sorted_uids_for_en = sorted(all_logic_steps.keys())
|
|
ordered_logic_list_for_en = [all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps]
|
|
while something_changed and inference_passes < max_inference_passes:
|
|
something_changed = False; inference_passes += 1
|
|
for i, instruction in enumerate(ordered_logic_list_for_en):
|
|
part_uid = instruction["instruction_uid"]; part_type_original = instruction["type"].replace('_scl', '').replace('_error', '')
|
|
if (part_type_original in functional_block_types and "en" not in instruction["inputs"] and part_uid not in processed_blocks_en_inference):
|
|
inferred_en_source = None
|
|
if i > 0:
|
|
for j in range(i - 1, -1, -1):
|
|
prev_instr = ordered_logic_list_for_en[j]; prev_uid = prev_instr["instruction_uid"]; prev_type_original = prev_instr["type"].replace('_scl', '').replace('_error', '')
|
|
if prev_type_original in rlo_generators: inferred_en_source = {"type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "out"}; break
|
|
elif prev_type_original in functional_block_types:
|
|
source_key_eno = (prev_uid, "eno")
|
|
if source_key_eno in source_connections: inferred_en_source = {"type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "eno"}; break
|
|
else: continue
|
|
elif prev_type_original in ['Coil', 'SCoil', 'RCoil', 'SetCoil', 'ResetCoil', 'SdCoil']: break
|
|
if inferred_en_source: all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source; processed_blocks_en_inference.add(part_uid); something_changed = True
|
|
|
|
# 5. Añadir lógica ENO interesante (sin cambios)
|
|
for source_instr_uid, eno_destinations in eno_outputs.items():
|
|
if source_instr_uid not in all_logic_steps: continue
|
|
interesting_eno_logic = []
|
|
for dest_uid, dest_pin in eno_destinations:
|
|
is_direct_en_connection = False
|
|
if dest_uid in parts_and_calls_map and dest_pin == 'en':
|
|
try:
|
|
source_idx = sorted_uids_for_en.index(source_instr_uid); dest_idx = sorted_uids_for_en.index(dest_uid)
|
|
if dest_idx == source_idx + 1 and parts_and_calls_map[dest_uid]['type'] in functional_block_types: is_direct_en_connection = True
|
|
except ValueError: pass
|
|
if not is_direct_en_connection:
|
|
target_info = {"target_pin": dest_pin}
|
|
if dest_uid in parts_and_calls_map: target_info.update({"target_type": "instruction", "target_uid": dest_uid, "target_name": parts_and_calls_map[dest_uid].get("name", parts_and_calls_map[dest_uid].get("type"))})
|
|
elif dest_uid in access_map: target_info.update({"target_type": "operand", "target_details": access_map[dest_uid]})
|
|
else: target_info.update({"target_type": "unknown", "target_uid": dest_uid})
|
|
interesting_eno_logic.append(target_info)
|
|
if interesting_eno_logic: all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic
|
|
|
|
# 6. Ordenar y Devolver (sin cambios)
|
|
network_logic_final = [all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps]
|
|
return {"id": network_id, "title": network_title, "comment": network_comment, "logic": network_logic_final}
|
|
|
|
# --- Función Principal convert_xml_to_json (sin cambios en su flujo general) ---
|
|
def convert_xml_to_json(xml_filepath, json_filepath):
|
|
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
|
|
if not os.path.exists(xml_filepath): print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'"); return
|
|
try:
|
|
print("Paso 1: Parseando archivo XML...")
|
|
parser = etree.XMLParser(remove_blank_text=True)
|
|
tree = etree.parse(xml_filepath, parser)
|
|
root = tree.getroot()
|
|
print("Paso 1: Parseo XML completado.")
|
|
print("Paso 2: Buscando el bloque SW.Blocks.FC...") # Asume FC primero
|
|
block_list = root.xpath("//*[local-name()='SW.Blocks.FC']")
|
|
block_type_found = "FC"
|
|
if not block_list:
|
|
block_list = root.xpath("//*[local-name()='SW.Blocks.FB']") # Busca FB si no hay FC
|
|
block_type_found = "FB"
|
|
if not block_list: print("Error Crítico: No se encontró <SW.Blocks.FC> ni <SW.Blocks.FB>."); return
|
|
else: print("Advertencia: Se encontró <SW.Blocks.FB> en lugar de <SW.Blocks.FC>.")
|
|
the_block = block_list[0]
|
|
print(f"Paso 2: Bloque SW.Blocks.{block_type_found} encontrado (ID={the_block.get('ID')}).")
|
|
print("Paso 3: Extrayendo atributos del bloque...")
|
|
attribute_list_node = the_block.xpath("./*[local-name()='AttributeList']")
|
|
block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown"
|
|
if attribute_list_node:
|
|
attr_list = attribute_list_node[0]
|
|
name_node = attr_list.xpath("./*[local-name()='Name']/text()")
|
|
block_name_val = name_node[0].strip() if name_node else block_name_val
|
|
num_node = attr_list.xpath("./*[local-name()='Number']/text()")
|
|
try: block_number_val = int(num_node[0]) if num_node else None
|
|
except ValueError: block_number_val = None
|
|
lang_node = attr_list.xpath("./*[local-name()='ProgrammingLanguage']/text()")
|
|
block_lang_val = lang_node[0].strip() if lang_node else block_lang_val
|
|
print(f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'")
|
|
else: print(f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}.")
|
|
block_comment_val = ""
|
|
comment_node_list = the_block.xpath("./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']")
|
|
if comment_node_list: block_comment_val = get_multilingual_text(comment_node_list[0]); print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'")
|
|
result = {"block_name": block_name_val, "block_number": block_number_val, "language": block_lang_val, "block_comment": block_comment_val, "interface": {}, "networks": []}
|
|
print("Paso 4: Extrayendo la interfaz del bloque...")
|
|
if attribute_list_node:
|
|
interface_node_list = attribute_list_node[0].xpath(".//*[local-name()='Interface']")
|
|
if interface_node_list:
|
|
interface_node = interface_node_list[0]
|
|
print("Paso 4: Nodo Interface encontrado.")
|
|
for section in interface_node.xpath(".//iface:Section", namespaces=ns):
|
|
section_name = section.get("Name");
|
|
if not section_name: continue
|
|
members = []
|
|
for member in section.xpath("./iface:Member", namespaces=ns):
|
|
member_name = member.get("Name"); member_dtype = member.get("Datatype")
|
|
if member_name and member_dtype: members.append({"name": member_name, "datatype": member_dtype})
|
|
if members: result["interface"][section_name] = members
|
|
if not result["interface"]: print("Advertencia: Interface sin secciones iface:Section válidas.")
|
|
else: print("Advertencia: No se encontró <Interface> dentro de <AttributeList>.")
|
|
if not result["interface"]: print("Advertencia: No se pudo extraer información de la interfaz.")
|
|
print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...")
|
|
networks_processed_count = 0
|
|
object_list_node = the_block.xpath("./*[local-name()='ObjectList']")
|
|
if object_list_node:
|
|
compile_units = object_list_node[0].xpath("./*[local-name()='SW.Blocks.CompileUnit']")
|
|
print(f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit.")
|
|
for network_elem in compile_units:
|
|
networks_processed_count += 1
|
|
parsed_network = parse_network(network_elem) # Llamada a la función de parseo de red
|
|
if parsed_network and parsed_network.get("error") is None: result["networks"].append(parsed_network)
|
|
elif parsed_network: print(f"Error: Falló parseo red ID={parsed_network.get('id')}: {parsed_network.get('error')}")
|
|
else: print(f"Error Fatal: parse_network devolvió None para CompileUnit ID={network_elem.get('ID')}.")
|
|
if networks_processed_count == 0: print("Advertencia: ObjectList sin SW.Blocks.CompileUnit.")
|
|
else: print("Advertencia: No se encontró ObjectList para el bloque.")
|
|
print("Paso 6: Escribiendo el resultado en el archivo JSON...")
|
|
if not result["interface"]: print("ADVERTENCIA FINAL: 'interface' está vacía.")
|
|
if not result["networks"]: print("ADVERTENCIA FINAL: 'networks' está vacía.")
|
|
try:
|
|
with open(json_filepath, "w", encoding="utf-8") as f: json.dump(result, f, indent=4, ensure_ascii=False)
|
|
print("Paso 6: Escritura completada."); print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'")
|
|
except IOError as e: print(f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}")
|
|
except TypeError as e: print(f"Error Crítico: Problema al serializar a JSON. Error: {e}")
|
|
except etree.XMLSyntaxError as e: print(f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}")
|
|
except Exception as e: print(f"Error Crítico: Error inesperado durante la conversión: {e}"); print("--- Traceback ---"); traceback.print_exc(); print("--- Fin Traceback ---")
|
|
|
|
# --- Punto de Entrada Principal ---
|
|
if __name__ == "__main__":
|
|
xml_filename_base = "TestLAD"
|
|
xml_file = f"{xml_filename_base}.xml"
|
|
json_file = f"{xml_filename_base}_simplified.json"
|
|
convert_xml_to_json(xml_file, json_file) |