From 099553f4b43569b13334023a412a045cdfbcf045 Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 18 Apr 2025 10:52:53 +0200 Subject: [PATCH] Buena version --- BlenderRun_ProdTime_simplified.json | 8 +- to_json.py | 714 ++++++++++++++++++++-------- 2 files changed, 514 insertions(+), 208 deletions(-) diff --git a/BlenderRun_ProdTime_simplified.json b/BlenderRun_ProdTime_simplified.json index 2b8c642..798a314 100644 --- a/BlenderRun_ProdTime_simplified.json +++ b/BlenderRun_ProdTime_simplified.json @@ -787,7 +787,7 @@ "uid": "24", "scope": "TypedConstant", "type": "constant", - "datatype": "Unknown", + "datatype": "TypedConstant", "value": "DINT#60" } }, @@ -822,7 +822,7 @@ "uid": "27", "scope": "TypedConstant", "type": "constant", - "datatype": "Unknown", + "datatype": "TypedConstant", "value": "DINT#0" } }, @@ -1003,7 +1003,7 @@ "uid": "25", "scope": "TypedConstant", "type": "constant", - "datatype": "Unknown", + "datatype": "TypedConstant", "value": "DINT#60" } }, @@ -1038,7 +1038,7 @@ "uid": "28", "scope": "TypedConstant", "type": "constant", - "datatype": "Unknown", + "datatype": "TypedConstant", "value": "DINT#0" } }, diff --git a/to_json.py b/to_json.py index d22e004..b984d9b 100644 --- a/to_json.py +++ b/to_json.py @@ -2,296 +2,602 @@ import json import os from lxml import etree +import traceback # Para obtener detalles de excepciones # --- Namespaces --- +# Namespaces usados comúnmente en los archivos XML de TIA Portal Openness ns = { + # Namespace principal para elementos de la interfaz del bloque (Input, Output, Temp, etc.) 'iface': 'http://www.siemens.com/automation/Openness/SW/Interface/v5', + # Namespace para elementos dentro de la lógica de red LAD/FBD (FlgNet) 'flg': 'http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4' + # Podrían añadirse otros si fueran necesarios (p.ej., para SCL, DBs) } # --- Helper Functions --- -# (get_multilingual_text, get_symbol_name, parse_access, parse_part sin cambios respecto a la última versión) + def get_multilingual_text(element, default_lang='en-US', fallback_lang='it-IT'): - """Intenta extraer texto de un MultilingualText, priorizando idiomas.""" - if element is None: return "" + """ + Intenta extraer texto de un elemento MultilingualText, priorizando idiomas. + Busca directamente los Text dentro de Items/AttributeList/Text bajo las Culture especificadas. + """ + if element is None: + # print("DEBUG: get_multilingual_text llamado con element=None") + return "" try: - xpath_expr = f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture']='{default_lang}']/*[local-name()='AttributeList']/*[local-name()='Text']" - text_item = element.xpath(xpath_expr) - if text_item and text_item[0].text: return text_item[0].text.strip() - xpath_expr = f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture']='{fallback_lang}']/*[local-name()='AttributeList']/*[local-name()='Text']" - text_item = element.xpath(xpath_expr) - if text_item and text_item[0].text: return text_item[0].text.strip() + # Intenta encontrar el idioma por defecto + xpath_expr = f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{default_lang}']]" \ + f"/*[local-name()='AttributeList']/*[local-name()='Text']" + text_items = element.xpath(xpath_expr) + if text_items and text_items[0].text is not None: + # print(f"DEBUG: Texto encontrado para {default_lang}: {text_items[0].text.strip()}") + return text_items[0].text.strip() + + # Si no, intenta encontrar el idioma de fallback + xpath_expr = f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{fallback_lang}']]" \ + f"/*[local-name()='AttributeList']/*[local-name()='Text']" + text_items = element.xpath(xpath_expr) + if text_items and text_items[0].text is not None: + # print(f"DEBUG: Texto encontrado para {fallback_lang}: {text_items[0].text.strip()}") + return text_items[0].text.strip() + + # Si no, toma el primer texto que encuentre xpath_expr = f".//*[local-name()='MultilingualTextItem']/*[local-name()='AttributeList']/*[local-name()='Text']" - text_item = element.xpath(xpath_expr) - if text_item and text_item[0].text: return text_item[0].text.strip() - except Exception as e: print(f"Advertencia: Error extrayendo MultilingualText: {e}") - return "" + text_items = element.xpath(xpath_expr) + if text_items and text_items[0].text is not None: + # print(f"DEBUG: Texto encontrado (primer disponible): {text_items[0].text.strip()}") + return text_items[0].text.strip() + + # print("DEBUG: No se encontró ningún texto en MultilingualTextItem") + return "" # Devuelve cadena vacía si no se encuentra nada + except Exception as e: + print(f"Advertencia: Error extrayendo MultilingualText desde {etree.tostring(element, encoding='unicode')[:100]}...: {e}") + return "" def get_symbol_name(symbol_element): - """Construye el nombre completo del símbolo a partir de sus componentes.""" - if symbol_element is None: return None + """ + Construye el nombre completo del símbolo (variable) a partir de sus elementos Component. + Encapsula cada componente entre comillas dobles y los une con puntos. + """ + if symbol_element is None: + # print("DEBUG: get_symbol_name llamado con symbol_element=None") + return None try: + # Selecciona el atributo 'Name' de cada elemento 'Component' hijo directo del Symbol components = symbol_element.xpath("./*[local-name()='Component']/@Name") - if components: return ".".join(f'"{c}"' for c in components) + if components: + # Une los componentes, asegurándose de que cada uno esté entre comillas dobles + full_name = ".".join(f'"{c}"' for c in components) + # print(f"DEBUG: Nombre de símbolo construido: {full_name}") + return full_name else: - # print(f"DEBUG: No se encontraron 'Component' en: {etree.tostring(symbol_element).decode()}") - return None + # print(f"Advertencia: No se encontraron 'Component' en Symbol: {etree.tostring(symbol_element, encoding='unicode')}") + return None # Indica que no se pudo formar un nombre except Exception as e: - # print(f"DEBUG: Excepción en get_symbol_name: {e}") + print(f"Advertencia: Excepción en get_symbol_name para {etree.tostring(symbol_element, encoding='unicode')[:100]}...: {e}") return None def parse_access(access_element): - """Parsea un elemento Access para obtener información de variable o constante.""" - info = {'uid': access_element.get('UId'), 'scope': access_element.get('Scope'), 'type': 'unknown'} + """ + Parsea un elemento Access (acceso a operando) para obtener información + detallada sobre si es una variable (Symbol) o una constante (Constant). + Devuelve un diccionario con la información o None si hay un error crítico. + """ + if access_element is None: + # print("DEBUG: parse_access llamado con access_element=None") + return None + + uid = access_element.get('UId') + scope = access_element.get('Scope') + # print(f"DEBUG: Parseando Access UID={uid}, Scope={scope}") + + info = {'uid': uid, 'scope': scope, 'type': 'unknown'} # Inicializa info + + # Intenta encontrar un elemento Symbol (indica una variable) symbol = access_element.xpath("./*[local-name()='Symbol']") + # Intenta encontrar un elemento Constant (indica un valor literal) constant = access_element.xpath("./*[local-name()='Constant']") + if symbol: - info['type'] = 'variable'; info['name'] = get_symbol_name(symbol[0]) - if info['name'] is None: info['type'] = 'error_parsing_symbol' + info['type'] = 'variable' + info['name'] = get_symbol_name(symbol[0]) + if info['name'] is None: + # Si get_symbol_name falló, marca como error y reporta + info['type'] = 'error_parsing_symbol' + print(f"Error: No se pudo parsear el nombre del símbolo para Access UID={uid}") + return info # Devolver la info con el error marcado + # print(f"DEBUG: Access UID={uid} es Variable: {info['name']}") + elif constant: info['type'] = 'constant' + # Extrae el tipo de dato de la constante const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']") const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']") - info['datatype'] = const_type_elem[0].text if const_type_elem and const_type_elem[0].text is not None else 'Unknown' - info['value_str'] = const_val_elem[0].text if const_val_elem and const_val_elem[0].text is not None else None - if info['datatype'] == 'Unknown' and info['value_str'] is not None: - if info['value_str'].lower() in ['true', 'false']: info['datatype'] = 'Bool' - elif info['value_str'].isdigit() or (info['value_str'].startswith('-') and info['value_str'][1:].isdigit()): info['datatype'] = 'Int' - elif '.' in info['value_str']: - try: float(info['value_str']); info['datatype'] = 'Real' - except ValueError: pass - if info['value_str'] is not None: - info['value'] = info['value_str'] - dtype_lower = info['datatype'].lower() - val_str_processed = info['value_str'].split('#')[-1] if '#' in info['value_str'] else info['value_str'] - try: - if dtype_lower in ['int', 'dint', 'udint', 'sint', 'usint', 'lint', 'ulint', 'word', 'dword', 'lword', 'byte']: info['value'] = int(val_str_processed) - elif dtype_lower == 'bool': info['value'] = val_str_processed.lower() == 'true' or val_str_processed == '1' - elif dtype_lower in ['real', 'lreal']: info['value'] = float(val_str_processed) - except (ValueError, TypeError): info['value'] = info['value_str'] - if 'value_str' in info: del info['value_str'] - else: info['type'] = 'error_parsing_constant'; info['value'] = None - else: info['type'] = 'unknown_structure' - # Verifica si realmente se pudo parsear el nombre para variables - if info['type'] == 'variable' and info.get('name') is None: - # Si get_symbol_name falló y marcó como None, mantenemos error_parsing_symbol - if info.get('type') != 'error_parsing_symbol': - print(f"Advertencia: parse_access terminó con tipo 'variable' pero sin nombre para UID {info['uid']}.") - info['type'] = 'error_no_name' # Nuevo estado de error - return info + # Obtiene el texto del tipo de dato, o 'Unknown' si no está presente + info['datatype'] = const_type_elem[0].text if const_type_elem and const_type_elem[0].text is not None else 'Unknown' + # Obtiene el texto del valor, o None si no está presente + value_str = const_val_elem[0].text if const_val_elem and const_val_elem[0].text is not None else None + + if value_str is None: + # Si no hay valor, marca como error y reporta + info['type'] = 'error_parsing_constant' + info['value'] = None + print(f"Error: Constante sin valor encontrada para Access UID={uid}") + return info # Devolver la info con el error marcado + + # Intenta inferir el tipo si es 'Unknown' + if info['datatype'] == 'Unknown': + val_lower = value_str.lower() + if val_lower in ['true', 'false']: info['datatype'] = 'Bool' + elif value_str.isdigit() or (value_str.startswith('-') and value_str[1:].isdigit()): info['datatype'] = 'Int' # Asume Int base + elif '.' in value_str: + try: + float(value_str) + info['datatype'] = 'Real' # Asume Real base + except ValueError: pass # Sigue siendo Unknown si no parece número real + elif '#' in value_str: + info['datatype'] = 'TypedConstant' # Ej: DINT#60 + # print(f"DEBUG: Tipo de constante inferido para UID={uid}: {info['datatype']} (desde '{value_str}')") + + + # Intenta convertir el valor a un tipo Python nativo + info['value'] = value_str # Valor por defecto es el string original + dtype_lower = info['datatype'].lower() + # Procesa el valor, quitando prefijos tipo 'DINT#' si existen + val_str_processed = value_str.split('#')[-1] if '#' in value_str else value_str + try: + if dtype_lower in ['int', 'dint', 'udint', 'sint', 'usint', 'lint', 'ulint', 'word', 'dword', 'lword', 'byte']: + info['value'] = int(val_str_processed) + elif dtype_lower == 'bool': + info['value'] = val_str_processed.lower() == 'true' or val_str_processed == '1' + elif dtype_lower in ['real', 'lreal']: + info['value'] = float(val_str_processed) + # Para TypedConstant u otros, mantenemos el string original como valor, pero registramos el tipo + elif dtype_lower == 'typedconstant': + # Podríamos intentar extraer el tipo y valor aquí si fuera necesario + info['value'] = value_str # Mantiene el string original + # Nota: Los tipos Time, Date, String, etc., se mantendrán como strings + except (ValueError, TypeError) as e: + # Si la conversión falla, mantenemos el string original y reportamos + print(f"Advertencia: No se pudo convertir el valor '{val_str_processed}' a tipo {dtype_lower} para UID={uid}. Se mantiene como string. Error: {e}") + info['value'] = value_str # Asegura que se mantenga el string original si falla la conversión + + # print(f"DEBUG: Access UID={uid} es Constante: Tipo={info['datatype']}, Valor={info['value']}") + + else: + # Si no es ni Symbol ni Constant, es una estructura desconocida + info['type'] = 'unknown_structure' + print(f"Advertencia: Access UID={uid} no es ni Symbol ni Constant.") + return info # Devolver la info con el tipo 'unknown_structure' + + # Verificación final: si es variable, debe tener nombre + if info['type'] == 'variable' and info.get('name') is None: + # Esto no debería ocurrir si get_symbol_name funciona, pero es una salvaguarda + print(f"Error Interno: parse_access terminó con tipo 'variable' pero sin nombre para UID {uid}.") + info['type'] = 'error_no_name' # Marca un estado de error específico + return info + + return info # Devuelve el diccionario con la información parseada def parse_part(part_element): - """Parsea un elemento Part (instrucción).""" + """ + Parsea un elemento Part (representa una instrucción como Add, Move, Contact, Coil) + y extrae su UID, nombre y valores de plantilla (TemplateValue). + Devuelve un diccionario con la información o None si el elemento es inválido. + """ + if part_element is None: + # print("DEBUG: parse_part llamado con part_element=None") + return None + + uid = part_element.get('UId') + name = part_element.get('Name') + # print(f"DEBUG: Parseando Part UID={uid}, Name={name}") + + if not uid or not name: + print(f"Error: Part encontrado sin UID o Name: {etree.tostring(part_element, encoding='unicode')}") + return None # Ignora partes inválidas + + # Extrae los TemplateValue si existen (información adicional sobre la instrucción) + template_values = {} + try: + # Selecciona los atributos Name y Type de cada TemplateValue hijo + for tv in part_element.xpath("./*[local-name()='TemplateValue']"): + tv_name = tv.get('Name') + tv_type = tv.get('Type') + if tv_name and tv_type: + template_values[tv_name] = tv_type + # print(f"DEBUG: TemplateValues para Part UID={uid}: {template_values}") + except Exception as e: + print(f"Advertencia: Error extrayendo TemplateValues para Part UID={uid}: {e}") + return { - 'uid': part_element.get('UId'), 'name': part_element.get('Name'), - 'template_values': {tv.get('Name'): tv.get('Type') for tv in part_element.xpath("./*[local-name()='TemplateValue']")} + 'uid': uid, + 'name': name, + 'template_values': template_values } # --- Main Parsing Logic --- def parse_network(network_element): - """Parsea una red (CompileUnit) y extrae su lógica simplificada.""" - # (parse_network sin cambios respecto a la última versión) - network_logic = [] + """ + Parsea un elemento SW.Blocks.CompileUnit (representa una red de lógica) + y extrae su ID, título y la lógica interna simplificada en formato JSON. + """ + if network_element is None: + print("Error: parse_network llamado con network_element=None") + return {'id': 'ERROR', 'title': 'Invalid Network Element', 'logic': [], 'error': 'Input element was None'} + network_id = network_element.get('ID') + # print(f"--- Parseando Red ID={network_id} ---") + + # Extrae el título de la red usando la función auxiliar title_element = network_element.xpath(".//*[local-name()='MultilingualText'][@CompositionName='Title']") network_title = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}" - flgnet = network_element.xpath(".//flg:FlgNet", namespaces=ns) - if not flgnet: return {'id': network_id, 'title': network_title, 'logic': [], 'error': 'FlgNet not found'} - flgnet = flgnet[0] - access_map = {acc_info['uid']: acc_info for acc in flgnet.xpath(".//flg:Access", namespaces=ns) if (acc_info := parse_access(acc))} - parts_map = {part_info['uid']: part_info for part in flgnet.xpath(".//flg:Part", namespaces=ns) if (part_info := parse_part(part))} - wire_connections = {} - flg_ns_uri = ns['flg'] + # print(f"DEBUG: Título de la Red: {network_title}") + + # Encuentra el contenedor FlgNet que tiene la lógica LAD/FBD + # Usa '//' para buscar en cualquier nivel descendiente y el namespace 'flg' + flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) + if not flgnet_list: + print(f"Error: No se encontró FlgNet en la red ID={network_id}") + return {'id': network_id, 'title': network_title, 'logic': [], 'error': 'FlgNet not found'} + flgnet = flgnet_list[0] # Toma el primer FlgNet encontrado + + # 1. Parsea todos los Access (operandos) y Parts (instrucciones) dentro de FlgNet + access_map = {} + for acc in flgnet.xpath(".//flg:Access", namespaces=ns): + acc_info = parse_access(acc) + if acc_info and 'uid' in acc_info: + access_map[acc_info['uid']] = acc_info + else: + print(f"Advertencia: Se ignoró un Access inválido en la red {network_id}") + + parts_map = {} + for part in flgnet.xpath(".//flg:Part", namespaces=ns): + part_info = parse_part(part) + if part_info and 'uid' in part_info: + parts_map[part_info['uid']] = part_info + else: + print(f"Advertencia: Se ignoró un Part inválido en la red {network_id}") + + # print(f"DEBUG: Red {network_id}: {len(access_map)} Access encontrados, {len(parts_map)} Parts encontrados.") + + # 2. Parsea todas las conexiones (Wires) para entender el flujo + wire_connections = {} # Diccionario: clave=(dest_uid, dest_pin), valor=lista de (source_uid, source_pin) + flg_ns_uri = ns['flg'] # Obtiene la URI del namespace 'flg' para comparaciones de tags + for wire in flgnet.xpath(".//flg:Wire", namespaces=ns): + # print(f"DEBUG: Procesando Wire: {etree.tostring(wire, encoding='unicode').strip()}") source_uid, source_pin, dest_uid, dest_pin = None, None, None, None - children = wire.getchildren() - if not children: continue - source_elem, dest_elem = children[0], children[1] if len(children) > 1 else None - if source_elem.tag == etree.QName(flg_ns_uri, 'Powerrail'): source_uid, source_pin = 'POWERRAIL', 'out' - elif source_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): source_uid, source_pin = source_elem.get('UId'), 'value' - elif source_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): source_uid, source_pin = source_elem.get('UId'), source_elem.get('Name') - if dest_elem is not None: - if dest_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): dest_uid, dest_pin = dest_elem.get('UId'), 'value' - elif dest_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): dest_uid, dest_pin = dest_elem.get('UId'), dest_elem.get('Name') - if dest_uid and dest_pin and (source_uid is not None or source_pin == 'out'): + children = wire.getchildren() # Obtiene los hijos directos del Wire (conexiones) + + if len(children) < 2: + # print(f"Advertencia: Wire con menos de 2 hijos ignorado en red {network_id}: {etree.tostring(wire, encoding='unicode')}") + continue # Un wire necesita al menos un origen y un destino + + source_elem, dest_elem = children[0], children[1] + + # Determina la fuente de la conexión + # Utiliza etree.QName para comparar tags incluyendo el namespace + if source_elem.tag == etree.QName(flg_ns_uri, 'Powerrail'): + source_uid, source_pin = 'POWERRAIL', 'out' # Fuente es la barra de potencia + elif source_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): + # Conexión a un operando (Access) + source_uid, source_pin = source_elem.get('UId'), 'value' # Usamos 'value' como pin genérico para Access + elif source_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): + # Conexión desde un pin específico de una instrucción (Part) + source_uid, source_pin = source_elem.get('UId'), source_elem.get('Name') + else: + # print(f"Advertencia: Tipo de fuente de Wire desconocido '{source_elem.tag}' en red {network_id}") + pass # Ignora fuentes desconocidas por ahora + + # Determina el destino de la conexión + if dest_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): + # Conexión a un operando (Access) - Generalmente una asignación de salida + dest_uid, dest_pin = dest_elem.get('UId'), 'value' # Usamos 'value' como pin genérico + elif dest_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): + # Conexión a un pin específico de una instrucción (Part) - Generalmente una entrada + dest_uid, dest_pin = dest_elem.get('UId'), dest_elem.get('Name') + else: + # print(f"Advertencia: Tipo de destino de Wire desconocido '{dest_elem.tag}' en red {network_id}") + pass # Ignora destinos desconocidos + + # Si tenemos un destino válido y una fuente válida, registra la conexión + if dest_uid and dest_pin and source_uid is not None: dest_key = (dest_uid, dest_pin) source_info = (source_uid, source_pin) - if dest_key not in wire_connections: wire_connections[dest_key] = [] - wire_connections[dest_key].append(source_info) - all_logic_steps = {} + # print(f"DEBUG: Wire Conexión: De ({source_uid}, {source_pin}) a ({dest_uid}, {dest_pin})") + + # Añade la fuente a la lista de fuentes para ese pin de destino + if dest_key not in wire_connections: + wire_connections[dest_key] = [] + if source_info not in wire_connections[dest_key]: # Evita duplicados si el XML fuera redundante + wire_connections[dest_key].append(source_info) + + # print(f"DEBUG: Red {network_id}: {len(wire_connections)} Conexiones de destino procesadas.") + + # 3. Construye la representación lógica de cada instrucción (Part) + all_logic_steps = {} # Diccionario para almacenar la representación JSON de cada instrucción for part_uid, part_info in parts_map.items(): - instruction_repr = {'instruction_uid': part_uid, 'type': part_info['name'], 'inputs': {}, 'outputs': {}} + instruction_repr = { + 'instruction_uid': part_uid, + 'type': part_info['name'], + 'inputs': {}, # Pines de entrada de la instrucción + 'outputs': {} # Pines de salida de la instrucción + } + + # Busca todas las conexiones que *llegan* a esta instrucción (Part) for (conn_dest_uid, conn_dest_pin), sources_list in wire_connections.items(): - if conn_dest_uid == part_uid: - input_sources = [] + if conn_dest_uid == part_uid: # Si el destino es esta instrucción + # print(f"DEBUG: Part UID={part_uid}, Pin Entrada='{conn_dest_pin}', Fuentes={sources_list}") + input_sources_repr = [] # Lista para representar las fuentes de este pin for source_uid, source_pin in sources_list: - if source_uid == 'POWERRAIL': input_sources.append({'type': 'powerrail'}) - elif source_uid in access_map: input_sources.append(access_map[source_uid]) - elif source_uid in parts_map: input_sources.append({'type': 'connection', 'source_instruction_uid': source_uid, 'source_instruction_type': parts_map[source_uid]['name'], 'source_pin': source_pin}) - else: input_sources.append({'type': 'unknown_source', 'uid': source_uid}) - if len(input_sources) == 1: instruction_repr['inputs'][conn_dest_pin] = input_sources[0] - else: instruction_repr['inputs'][conn_dest_pin] = {'type': 'OR_Branch', 'sources': input_sources} + if source_uid == 'POWERRAIL': + input_sources_repr.append({'type': 'powerrail'}) + elif source_uid in access_map: + # La fuente es una variable o constante + input_sources_repr.append(access_map[source_uid]) + elif source_uid in parts_map: + # La fuente es la salida de otra instrucción + input_sources_repr.append({ + 'type': 'connection', + 'source_instruction_uid': source_uid, + 'source_instruction_type': parts_map[source_uid]['name'], + 'source_pin': source_pin + }) + else: + # Fuente desconocida (error o caso no manejado) + input_sources_repr.append({'type': 'unknown_source', 'uid': source_uid}) + print(f"Advertencia: Fuente desconocida UID={source_uid} encontrada como entrada para Part UID={part_uid}, Pin={conn_dest_pin}") + + # Asigna la representación de las fuentes al pin de entrada correspondiente + # Si solo hay una fuente, la asigna directamente. + # Si hay múltiples fuentes (caso de ramas OR en LAD), las agrupa. + # NOTA: Esta lógica de OR_Branch es una simplificación y podría no cubrir todos los casos complejos. + # Asumimos que múltiples wires llegando al mismo pin de entrada implican un OR. + if len(input_sources_repr) == 1: + instruction_repr['inputs'][conn_dest_pin] = input_sources_repr[0] + elif len(input_sources_repr) > 1: + # Si hay varias fuentes para el mismo pin de entrada, las marcamos + # print(f"DEBUG: Múltiples fuentes para Part UID={part_uid}, Pin={conn_dest_pin}. Asumiendo OR.") + # Comprobamos si todas las fuentes son simples conexiones o powerrail + is_simple_or = all(s['type'] in ['powerrail', 'connection'] for s in input_sources_repr) + if is_simple_or: + # Si son conexiones simples, mantenemos la lista (implica OR) + instruction_repr['inputs'][conn_dest_pin] = input_sources_repr + else: + # Si hay operandos directos (variables/constantes), esto es más complejo. + # Por ahora, lo marcamos como una rama OR genérica. + # Una mejora sería analizar la estructura LAD/FBD más a fondo. + print(f"Advertencia: Rama OR compleja detectada para Part UID={part_uid}, Pin={conn_dest_pin}. Simplificando a lista.") + instruction_repr['inputs'][conn_dest_pin] = input_sources_repr + # else: no sources found for this input pin (podría ser normal si no está conectado) + + + # Busca todas las conexiones que *salen* de esta instrucción y van a un Access (asignación) for (conn_dest_uid, conn_dest_pin), sources_list in wire_connections.items(): for source_uid, source_pin in sources_list: - if source_uid == part_uid and conn_dest_uid in access_map: - if source_pin not in instruction_repr['outputs']: instruction_repr['outputs'][source_pin] = [] - if access_map[conn_dest_uid] not in instruction_repr['outputs'][source_pin]: instruction_repr['outputs'][source_pin].append(access_map[conn_dest_uid]) + if source_uid == part_uid and conn_dest_uid in access_map: # Si la fuente es esta instrucción y el destino es un Access + # print(f"DEBUG: Part UID={part_uid}, Pin Salida='{source_pin}', Destino Access UID={conn_dest_uid}") + # Agrega el Access de destino a la lista de salidas de este pin + if source_pin not in instruction_repr['outputs']: + instruction_repr['outputs'][source_pin] = [] + # Evita añadir duplicados si el XML es redundante + if access_map[conn_dest_uid] not in instruction_repr['outputs'][source_pin]: + instruction_repr['outputs'][source_pin].append(access_map[conn_dest_uid]) + + # Almacena la representación JSON de la instrucción all_logic_steps[part_uid] = instruction_repr - sorted_uids = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf')) + + # 4. Ordena las instrucciones por UID (intenta mantener un orden lógico si los UIDs son numéricos) + # Esto es una heurística, el orden real de ejecución depende del PLC. + try: + # Intenta convertir UIDs a enteros para ordenar numéricamente, + # poniendo los no numéricos al final. + sorted_uids = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf')) + except ValueError: + # Si falla la conversión a int (UIDs no numéricos complejos), ordena alfabéticamente. + print(f"Advertencia: UIDs no puramente numéricos en red {network_id}. Ordenando alfabéticamente.") + sorted_uids = sorted(all_logic_steps.keys()) + + # Construye la lista final de lógica ordenada network_logic = [all_logic_steps[uid] for uid in sorted_uids if uid in all_logic_steps] + + # print(f"--- Fin Parseo Red ID={network_id} ---") return {'id': network_id, 'title': network_title, 'logic': network_logic} def convert_xml_to_json(xml_filepath, json_filepath): - """Función principal para convertir el XML a JSON.""" + """ + Función principal que orquesta la conversión del archivo XML de Openness + a un archivo JSON simplificado que representa la estructura del bloque FC. + """ + print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") + if not os.path.exists(xml_filepath): - print(f"Error: Archivo XML no encontrado en {xml_filepath}") - return + print(f"Error Crítico: Archivo XML no encontrado en '{xml_filepath}'") + return # Termina si el archivo no existe try: - tree = etree.parse(xml_filepath) + # Parsea el archivo XML + print("Paso 1: Parseando archivo XML...") + parser = etree.XMLParser(remove_blank_text=True) # Intenta limpiar espacios irrelevantes + tree = etree.parse(xml_filepath, parser) root = tree.getroot() + print("Paso 1: Parseo XML completado.") - # Buscar FC usando local-name() + # Encuentra el bloque FC principal usando local-name() para evitar problemas de namespace + # // busca en todo el árbol + print("Paso 2: Buscando el bloque SW.Blocks.FC...") fc_block_list = root.xpath("//*[local-name()='SW.Blocks.FC']") if not fc_block_list: - print("Error: No se encontró el elemento ") - return - fc_block = fc_block_list[0] + print("Error Crítico: No se encontró el elemento en el archivo.") + return # Termina si no encuentra el bloque FC + fc_block = fc_block_list[0] # Asume que solo hay un bloque FC principal en este archivo + print(f"Paso 2: Bloque SW.Blocks.FC encontrado (ID={fc_block.get('ID')}).") - # Obtener el namespace real de fc_block (si tiene uno) - fc_block_ns_uri = fc_block.tag.split('}')[0][1:] if '}' in fc_block.tag else None - temp_ns = ns.copy() # Copiar namespaces base - fc_prefix = 'fcns' # Prefijo temporal para el namespace de fc_block - if fc_block_ns_uri: - temp_ns[fc_prefix] = fc_block_ns_uri - # Construir el tag con prefijo para búsquedas relativas si hay namespace - interface_tag = f"{fc_prefix}:Interface" - attribute_list_tag = f"{fc_prefix}:AttributeList" - object_list_tag = f"{fc_prefix}:ObjectList" + + # Extrae los atributos básicos del bloque desde su AttributeList + print("Paso 3: Extrayendo atributos del bloque (Nombre, Número, Lenguaje)...") + # Busca AttributeList como hijo directo de fc_block + attribute_list_node = fc_block.xpath("./*[local-name()='AttributeList']") + block_name_val = "Unknown" + block_number_val = None + block_lang_val = "Unknown" + + if attribute_list_node: + attr_list = attribute_list_node[0] + # Extrae Nombre + name_node = attr_list.xpath("./*[local-name()='Name']/text()") + if name_node: block_name_val = name_node[0].strip() + # Extrae Número + num_node = attr_list.xpath("./*[local-name()='Number']/text()") + if num_node and num_node[0].isdigit(): block_number_val = int(num_node[0]) + # Extrae Lenguaje + lang_node = attr_list.xpath("./*[local-name()='ProgrammingLanguage']/text()") + if lang_node: block_lang_val = lang_node[0].strip() + print(f"Paso 3: Atributos extraídos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'") else: - # Si fc_block no tiene namespace, buscar hijos sin prefijo - interface_tag = "Interface" - attribute_list_tag = "AttributeList" - object_list_tag = "ObjectList" + print("Advertencia: No se encontró AttributeList para el bloque FC.") - print(f"DEBUG: Buscando hijos de {fc_block.tag} (Namespace URI: {fc_block_ns_uri})") - print(f"DEBUG: Usando tag para Interface: {interface_tag}") - print(f"DEBUG: Usando tag para AttributeList: {attribute_list_tag}") - print(f"DEBUG: Usando tag para ObjectList: {object_list_tag}") - - # Usar el tag construido (con o sin prefijo) para buscar AttributeList y sus hijos - block_name = fc_block.xpath(f"./{attribute_list_tag}/*[local-name()='Name']/text()", namespaces=temp_ns) - block_number = fc_block.xpath(f"./{attribute_list_tag}/*[local-name()='Number']/text()", namespaces=temp_ns) - block_lang = fc_block.xpath(f"./{attribute_list_tag}/*[local-name()='ProgrammingLanguage']/text()", namespaces=temp_ns) + # Inicializa la estructura del resultado JSON result = { - "block_name": block_name[0].strip() if block_name else "Unknown", - "block_number": int(block_number[0]) if block_number and block_number[0].isdigit() else None, - "language": block_lang[0].strip() if block_lang else "Unknown", - "interface": {}, - "networks": [] + "block_name": block_name_val, + "block_number": block_number_val, + "language": block_lang_val, + "interface": {}, # Diccionario para las secciones de la interfaz + "networks": [] # Lista para las redes lógicas } - # --- CORREGIDA NUEVAMENTE: Extracción de Interfaz --- - # Buscar Interface DENTRO de AttributeList - # Usamos local-name() para robustez - interface_node_list = fc_block.xpath(f"./*[local-name()='AttributeList']/*[local-name()='Interface']") - if interface_node_list: - interface_node = interface_node_list[0] # Tomar el primer nodo Interface encontrado - print(f"DEBUG: Nodo Interface encontrado DENTRO de AttributeList!") - # Dentro de Interface, Sections/Section/Member usan el namespace 'iface' - interface_sections = interface_node.xpath(".//iface:Section", namespaces=ns) # Usar ns original aquí - if not interface_sections: - print("Advertencia: Nodo Interface encontrado, pero no se encontraron iface:Section dentro.") - section_count = 0 - for section in interface_sections: - section_count += 1 - section_name = section.get('Name') - members = [] - member_count = 0 - for member in section.xpath("./iface:Member", namespaces=ns): # Usar ns original - member_count += 1 - members.append({ - "name": member.get('Name'), - "datatype": member.get('Datatype') - }) - if members: - print(f"DEBUG: Sección '{section_name}' encontrada con {member_count} miembros.") - result["interface"][section_name] = members - else: - print(f"DEBUG: Sección '{section_name}' encontrada pero sin miembros iface:Member.") + # Extrae la Interfaz del bloque + print("Paso 4: Extrayendo la interfaz del bloque...") + interface_found = False + # Busca Interface DENTRO de AttributeList (corrección clave) + if attribute_list_node: # Solo busca si encontramos AttributeList + interface_node_list = attribute_list_node[0].xpath("./*[local-name()='Interface']") + if interface_node_list: + interface_node = interface_node_list[0] + interface_found = True + print("Paso 4: Nodo Interface encontrado dentro de AttributeList.") + # Ahora busca las secciones DENTRO de Interface usando el namespace 'iface' + section_count = 0 + # Usa './/' para buscar Sections en cualquier nivel dentro de Interface + for section in interface_node.xpath(".//iface:Section", namespaces=ns): + section_count += 1 + section_name = section.get('Name') + members = [] + member_count = 0 + # Busca Members como hijos directos de Section usando 'iface' + for member in section.xpath("./iface:Member", namespaces=ns): + member_count +=1 + member_name = member.get('Name') + member_dtype = member.get('Datatype') + if member_name and member_dtype: + members.append({"name": member_name, "datatype": member_dtype}) + else: + print(f"Advertencia: Miembro inválido encontrado en sección '{section_name}' (sin nombre o tipo).") - if section_count == 0 and not result["interface"]: - print("Advertencia: Nodo Interface encontrado, pero sin secciones iface:Section válidas.") + if members: + print(f"DEBUG: Interfaz - Sección '{section_name}' encontrada con {member_count} miembros.") + result["interface"][section_name] = members + # else: print(f"DEBUG: Interfaz - Sección '{section_name}' encontrada pero sin miembros válidos.") - else: - # Si no se encontró Interface DENTRO de AttributeList - print("Advertencia: No se encontró el nodo DENTRO de .") - # Si no se encontró Interface DENTRO de AttributeList - print("Advertencia: No se encontró el nodo DENTRO de .") - # Sacar la expresión XPath fuera de la f-string para evitar SyntaxError - attribute_list_nodes = fc_block.xpath("./*[local-name()='AttributeList']") - if attribute_list_nodes: - attribute_list_content = etree.tostring(attribute_list_nodes[0], pretty_print=True).decode() + if section_count == 0: + print("Advertencia: Nodo Interface encontrado, pero no contenía secciones iface:Section válidas.") else: - attribute_list_content = 'AttributeList no encontrado' - print(f"DEBUG: Contenido de AttributeList: {attribute_list_content}") + print("Advertencia: No se encontró el nodo DENTRO de .") + # else: # Ya se advirtió que no se encontró AttributeList - # --- Extracción Lógica de Redes --- - # Buscar ObjectList usando tag construido, luego CompileUnit sin prefijo - object_list_node = fc_block.xpath(f"./{object_list_tag}", namespaces=temp_ns) - networks = [] + if not interface_found and not result["interface"]: + print("Advertencia: No se pudo extraer ninguna información de la interfaz.") + + + # Extrae la lógica de las Redes (CompileUnits) + print("Paso 5: Extrayendo la lógica de las redes (CompileUnits)...") + networks_processed_count = 0 + # Busca ObjectList como hijo directo de fc_block + object_list_node = fc_block.xpath("./*[local-name()='ObjectList']") if object_list_node: - networks = object_list_node[0].xpath("./*[local-name()='SW.Blocks.CompileUnit']") # Buscar CompileUnit por local-name + # Busca CompileUnits dentro de ObjectList + compile_units = object_list_node[0].xpath("./*[local-name()='SW.Blocks.CompileUnit']") + print(f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit.") + for network_elem in compile_units: + networks_processed_count += 1 + print(f"DEBUG: Procesando red #{networks_processed_count} (ID={network_elem.get('ID')})...") + parsed_network = parse_network(network_elem) + # Solo añade la red si el parseo no devolvió un error grave + if parsed_network and parsed_network.get('error') is None: + result["networks"].append(parsed_network) + elif parsed_network: + print(f"Error: Falló el parseo de la red ID={parsed_network.get('id')}: {parsed_network.get('error')}") + result["networks"].append(parsed_network) # Añadir incluso con error para depurar + else: + print(f"Error: parse_network devolvió None para un CompileUnit (ID={network_elem.get('ID')}).") - network_count = 0 - for network_elem in networks: - network_count += 1 - parsed_network = parse_network(network_elem) - result["networks"].append(parsed_network) - - if network_count == 0: - print("Advertencia: No se encontraron redes (SW.Blocks.CompileUnit).") + if networks_processed_count == 0: + print("Advertencia: ObjectList encontrado, pero no contenía SW.Blocks.CompileUnit.") + else: + print("Advertencia: No se encontró ObjectList para el bloque FC.") - # --- Escribir resultado a JSON --- - # Añadir un chequeo final antes de escribir + # Escribe el resultado al archivo JSON + print("Paso 6: Escribiendo el resultado en el archivo JSON...") + # Realiza chequeos finales antes de escribir if not result["interface"]: print("ADVERTENCIA FINAL: La sección 'interface' está vacía en el JSON resultante.") - variable_names_found = any( - acc.get('type') == 'variable' and acc.get('name') is not None - for net in result.get('networks', []) - for instr in net.get('logic', []) - for pin_data in instr.get('inputs', {}).values() if isinstance(pin_data, dict) - for acc in ([pin_data] if pin_data.get('type') != 'OR_Branch' else pin_data.get('sources', [])) - ) or any( - acc.get('type') == 'variable' and acc.get('name') is not None - for net in result.get('networks', []) - for instr in net.get('logic', []) - for pin_data_list in instr.get('outputs', {}).values() if isinstance(pin_data_list, list) - for acc in pin_data_list if isinstance(acc, dict) - ) - if not variable_names_found: - print("ADVERTENCIA FINAL: Parece que no se extrajeron nombres de variables en las redes.") + if not result["networks"]: + print("ADVERTENCIA FINAL: La sección 'networks' está vacía en el JSON resultante.") + else: + # Verifica si se extrajeron nombres de variables en las redes + variable_names_found = any( + acc.get('type') == 'variable' and acc.get('name') is not None + for net in result.get('networks', []) if net.get('error') is None # Solo redes válidas + for instr in net.get('logic', []) + for pin_data in instr.get('inputs', {}).values() + # Maneja entradas que son listas (OR) o diccionarios + for acc in (pin_data if isinstance(pin_data, list) else [pin_data] if isinstance(pin_data, dict) else []) + if isinstance(acc, dict) # Asegura que acc sea un diccionario parseado + ) or any( + acc.get('type') == 'variable' and acc.get('name') is not None + for net in result.get('networks', []) if net.get('error') is None # Solo redes válidas + for instr in net.get('logic', []) + for pin_data_list in instr.get('outputs', {}).values() if isinstance(pin_data_list, list) + for acc in pin_data_list if isinstance(acc, dict) # Asegura que acc sea un diccionario + ) + if not variable_names_found: + print("ADVERTENCIA FINAL: No parece haberse extraído ningún nombre de variable válido en las redes procesadas.") + else: + print("INFO FINAL: Se detectaron nombres de variables en las redes procesadas.") - with open(json_filepath, 'w', encoding='utf-8') as f: - json.dump(result, f, indent=4, ensure_ascii=False) + # Escribe el archivo + try: + with open(json_filepath, 'w', encoding='utf-8') as f: + # Usa indent=4 para formato legible, ensure_ascii=False para caracteres UTF-8 + json.dump(result, f, indent=4, ensure_ascii=False) + print(f"Paso 6: Escritura completada.") + print(f"Conversión finalizada con éxito. Archivo JSON guardado en: '{json_filepath}'") + except IOError as e: + print(f"Error Crítico: No se pudo escribir el archivo JSON en '{json_filepath}'. Error: {e}") + except TypeError as e: + print(f"Error Crítico: Problema al serializar datos a JSON (posiblemente datos no serializables). Error: {e}") - print(f"Conversión completada. Archivo JSON guardado en: {json_filepath}") except etree.XMLSyntaxError as e: - print(f"Error de sintaxis XML: {e}") + print(f"Error Crítico: Error de sintaxis en el archivo XML '{xml_filepath}'. Detalles: {e}") except Exception as e: - print(f"Ocurrió un error inesperado durante el procesamiento: {e}") - import traceback - traceback.print_exc() + print(f"Error Crítico: Ocurrió un error inesperado durante el procesamiento: {e}") + print("--- Traceback ---") + traceback.print_exc() # Imprime la traza completa de la excepción + print("--- Fin Traceback ---") -# --- Ejecución --- +# --- Punto de Entrada Principal --- if __name__ == "__main__": + # Define los nombres de los archivos de entrada y salida xml_file = 'BlenderRun_ProdTime.xml' json_file = 'BlenderRun_ProdTime_simplified.json' + + # Llama a la función principal de conversión convert_xml_to_json(xml_file, json_file) \ No newline at end of file