# -*- coding: utf-8 -*- import json import os from lxml import etree import traceback # --- Namespaces --- ns = { 'iface': 'http://www.siemens.com/automation/Openness/SW/Interface/v5', 'flg': 'http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4' } # --- Helper Functions --- # (get_multilingual_text, get_symbol_name, parse_access, parse_part sin cambios) def get_multilingual_text(element, default_lang='en-US', fallback_lang='it-IT'): """ Intenta extraer texto de un elemento MultilingualText, priorizando idiomas. Busca directamente los Text dentro de Items/AttributeList/Text bajo las Culture especificadas. """ if element is None: return "" try: # Intenta encontrar el idioma por defecto xpath_expr = f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{default_lang}']]" \ f"/*[local-name()='AttributeList']/*[local-name()='Text']" text_items = element.xpath(xpath_expr) if text_items and text_items[0].text is not None: return text_items[0].text.strip() # Si no, intenta encontrar el idioma de fallback xpath_expr = f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{fallback_lang}']]" \ f"/*[local-name()='AttributeList']/*[local-name()='Text']" text_items = element.xpath(xpath_expr) if text_items and text_items[0].text is not None: return text_items[0].text.strip() # Si no, toma el primer texto que encuentre xpath_expr = f".//*[local-name()='MultilingualTextItem']/*[local-name()='AttributeList']/*[local-name()='Text']" text_items = element.xpath(xpath_expr) if text_items and text_items[0].text is not None: return text_items[0].text.strip() return "" # Devuelve cadena vacía si no se encuentra nada except Exception as e: print(f"Advertencia: Error extrayendo MultilingualText desde {etree.tostring(element, encoding='unicode')[:100]}...: {e}") return "" def get_symbol_name(symbol_element): """ Construye el nombre completo del símbolo (variable) a partir de sus elementos Component. Encapsula cada componente entre comillas dobles y los une con puntos. """ if symbol_element is None: return None try: # Selecciona el atributo 'Name' de cada elemento 'Component' hijo directo del Symbol components = symbol_element.xpath("./*[local-name()='Component']/@Name") if components: # Une los componentes, asegurándose de que cada uno esté entre comillas dobles full_name = ".".join(f'"{c}"' for c in components) return full_name else: return None # Indica que no se pudo formar un nombre except Exception as e: print(f"Advertencia: Excepción en get_symbol_name para {etree.tostring(symbol_element, encoding='unicode')[:100]}...: {e}") return None def parse_access(access_element): """ Parsea un elemento Access (acceso a operando) para obtener información detallada sobre si es una variable (Symbol) o una constante (Constant). Devuelve un diccionario con la información o None si hay un error crítico. """ if access_element is None: return None uid = access_element.get('UId') scope = access_element.get('Scope') info = {'uid': uid, 'scope': scope, 'type': 'unknown'} # Inicializa info symbol = access_element.xpath("./*[local-name()='Symbol']") constant = access_element.xpath("./*[local-name()='Constant']") if symbol: info['type'] = 'variable' info['name'] = get_symbol_name(symbol[0]) if info['name'] is None: info['type'] = 'error_parsing_symbol' print(f"Error: No se pudo parsear el nombre del símbolo para Access UID={uid}") return info elif constant: info['type'] = 'constant' const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']") const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']") info['datatype'] = const_type_elem[0].text if const_type_elem and const_type_elem[0].text is not None else 'Unknown' value_str = const_val_elem[0].text if const_val_elem and const_val_elem[0].text is not None else None if value_str is None: info['type'] = 'error_parsing_constant' info['value'] = None print(f"Error: Constante sin valor encontrada para Access UID={uid}") return info if info['datatype'] == 'Unknown': val_lower = value_str.lower() if val_lower in ['true', 'false']: info['datatype'] = 'Bool' elif value_str.isdigit() or (value_str.startswith('-') and value_str[1:].isdigit()): info['datatype'] = 'Int' elif '.' in value_str: try: float(value_str); info['datatype'] = 'Real' except ValueError: pass elif '#' in value_str: info['datatype'] = 'TypedConstant' info['value'] = value_str dtype_lower = info['datatype'].lower() val_str_processed = value_str.split('#')[-1] if '#' in value_str else value_str try: if dtype_lower in ['int', 'dint', 'udint', 'sint', 'usint', 'lint', 'ulint', 'word', 'dword', 'lword', 'byte']: info['value'] = int(val_str_processed) elif dtype_lower == 'bool': info['value'] = val_str_processed.lower() == 'true' or val_str_processed == '1' elif dtype_lower in ['real', 'lreal']: info['value'] = float(val_str_processed) elif dtype_lower == 'typedconstant': info['value'] = value_str except (ValueError, TypeError) as e: print(f"Advertencia: No se pudo convertir el valor '{val_str_processed}' a tipo {dtype_lower} para UID={uid}. Se mantiene como string. Error: {e}") info['value'] = value_str else: info['type'] = 'unknown_structure' print(f"Advertencia: Access UID={uid} no es ni Symbol ni Constant.") return info if info['type'] == 'variable' and info.get('name') is None: print(f"Error Interno: parse_access terminó con tipo 'variable' pero sin nombre para UID {uid}.") info['type'] = 'error_no_name' return info return info def parse_part(part_element): """ Parsea un elemento Part (representa una instrucción como Add, Move, Contact, Coil) y extrae su UID, nombre y valores de plantilla (TemplateValue). Devuelve un diccionario con la información o None si el elemento es inválido. """ if part_element is None: return None uid = part_element.get('UId') name = part_element.get('Name') if not uid or not name: print(f"Error: Part encontrado sin UID o Name: {etree.tostring(part_element, encoding='unicode')}") return None template_values = {} try: for tv in part_element.xpath("./*[local-name()='TemplateValue']"): tv_name = tv.get('Name') tv_type = tv.get('Type') if tv_name and tv_type: template_values[tv_name] = tv_type except Exception as e: print(f"Advertencia: Error extrayendo TemplateValues para Part UID={uid}: {e}") return { 'uid': uid, 'name': name, 'template_values': template_values # Mantenemos esto por si acaso, aunque no se use prominentemente } # --- Main Parsing Logic --- def parse_network(network_element): """ Parsea un elemento SW.Blocks.CompileUnit (representa una red de lógica) y extrae su ID, título, comentario y la lógica interna simplificada en formato JSON, incluyendo la lógica conectada a ENO si no es un simple EN->ENO. """ if network_element is None: print("Error: parse_network llamado con network_element=None") return {'id': 'ERROR', 'title': 'Invalid Network Element', 'comment': '', 'logic': [], 'error': 'Input element was None'} network_id = network_element.get('ID') # Extrae el título de la red title_element = network_element.xpath(".//*[local-name()='MultilingualText'][@CompositionName='Title']") network_title = get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}" # *** NUEVO: Extrae el comentario de la red *** network_comment = "" comment_title_element = network_element.xpath("./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']") if comment_title_element: network_comment = get_multilingual_text(comment_title_element[0]) # print(f"DEBUG: Comentario Red {network_id}: '{network_comment[:50]}...'") flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) if not flgnet_list: print(f"Error: No se encontró FlgNet en la red ID={network_id}") return {'id': network_id, 'title': network_title, 'comment': network_comment, 'logic': [], 'error': 'FlgNet not found'} flgnet = flgnet_list[0] # 1. Parsear Access y Parts access_map = {} for acc in flgnet.xpath(".//flg:Access", namespaces=ns): acc_info = parse_access(acc) if acc_info and 'uid' in acc_info: access_map[acc_info['uid']] = acc_info parts_map = {} for part in flgnet.xpath(".//flg:Part", namespaces=ns): part_info = parse_part(part) if part_info and 'uid' in part_info: parts_map[part_info['uid']] = part_info # 2. Parsear Wires y construir mapa de conexiones de entrada y *salida ENO* wire_connections = {} # Clave=(dest_uid, dest_pin), Valor=lista de (source_uid, source_pin) eno_outputs = {} # Clave=source_part_uid, Valor=lista de (dest_uid, dest_pin) flg_ns_uri = ns['flg'] for wire in flgnet.xpath(".//flg:Wire", namespaces=ns): source_uid, source_pin, dest_uid, dest_pin = None, None, None, None children = wire.getchildren() if len(children) < 2: continue source_elem, dest_elem = children[0], children[1] # Determina la fuente if source_elem.tag == etree.QName(flg_ns_uri, 'Powerrail'): source_uid, source_pin = 'POWERRAIL', 'out' elif source_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): source_uid, source_pin = source_elem.get('UId'), 'value' elif source_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): source_uid, source_pin = source_elem.get('UId'), source_elem.get('Name') # Determina el destino if dest_elem.tag == etree.QName(flg_ns_uri, 'IdentCon'): dest_uid, dest_pin = dest_elem.get('UId'), 'value' elif dest_elem.tag == etree.QName(flg_ns_uri, 'NameCon'): dest_uid, dest_pin = dest_elem.get('UId'), dest_elem.get('Name') # Registrar conexión de entrada normal if dest_uid and dest_pin and source_uid is not None: dest_key = (dest_uid, dest_pin) source_info = (source_uid, source_pin) if dest_key not in wire_connections: wire_connections[dest_key] = [] if source_info not in wire_connections[dest_key]: wire_connections[dest_key].append(source_info) # *** NUEVO: Registrar conexiones que SALEN de un pin ENO *** if source_pin == 'eno' and source_uid in parts_map: if source_uid not in eno_outputs: eno_outputs[source_uid] = [] eno_dest_info = (dest_uid, dest_pin) if eno_dest_info not in eno_outputs[source_uid]: eno_outputs[source_uid].append(eno_dest_info) # print(f"DEBUG: Red {network_id} - ENO de {source_uid} conectado a ({dest_uid}, {dest_pin})") # 3. Construir la representación lógica principal all_logic_steps = {} for part_uid, part_info in parts_map.items(): instruction_repr = {'instruction_uid': part_uid, 'type': part_info['name'], 'inputs': {}, 'outputs': {}} # Procesar Entradas (igual que antes) for (conn_dest_uid, conn_dest_pin), sources_list in wire_connections.items(): if conn_dest_uid == part_uid: input_sources_repr = [] for source_uid, source_pin in sources_list: if source_uid == 'POWERRAIL': input_sources_repr.append({'type': 'powerrail'}) elif source_uid in access_map: input_sources_repr.append(access_map[source_uid]) elif source_uid in parts_map: input_sources_repr.append({'type': 'connection', 'source_instruction_uid': source_uid, 'source_instruction_type': parts_map[source_uid]['name'], 'source_pin': source_pin}) else: input_sources_repr.append({'type': 'unknown_source', 'uid': source_uid}) if len(input_sources_repr) == 1: instruction_repr['inputs'][conn_dest_pin] = input_sources_repr[0] elif len(input_sources_repr) > 1: instruction_repr['inputs'][conn_dest_pin] = input_sources_repr # Procesar Salidas (igual que antes) for (conn_dest_uid, conn_dest_pin), sources_list in wire_connections.items(): for source_uid, source_pin in sources_list: if source_uid == part_uid and conn_dest_uid in access_map: if source_pin not in instruction_repr['outputs']: instruction_repr['outputs'][source_pin] = [] if access_map[conn_dest_uid] not in instruction_repr['outputs'][source_pin]: instruction_repr['outputs'][source_pin].append(access_map[conn_dest_uid]) all_logic_steps[part_uid] = instruction_repr # *** NUEVO: Procesar y añadir lógica ENO "interesante" *** for source_instr_uid, eno_destinations in eno_outputs.items(): if source_instr_uid not in all_logic_steps: continue # Seguridad interesting_eno_logic = [] for dest_uid, dest_pin in eno_destinations: # Determinar si es una conexión directa a EN de otra instrucción is_direct_en_connection = (dest_uid in parts_map and dest_pin == 'en') if not is_direct_en_connection: # Si NO es directa a EN, es "interesante" target_info = {'target_pin': dest_pin} if dest_uid in parts_map: target_info['target_type'] = 'instruction' target_info['target_uid'] = dest_uid target_info['target_name'] = parts_map[dest_uid]['name'] elif dest_uid in access_map: # El destino es una variable o constante target_info['target_type'] = 'operand' target_info['target_details'] = access_map[dest_uid] # Incluye toda la info del Access else: target_info['target_type'] = 'unknown' target_info['target_uid'] = dest_uid interesting_eno_logic.append(target_info) # print(f"DEBUG: Red {network_id} - ENO de {source_instr_uid}: Lógica interesante -> {target_info}") # Añadir la lista de lógica ENO interesante a la instrucción fuente, si existe if interesting_eno_logic: all_logic_steps[source_instr_uid]['eno_logic'] = interesting_eno_logic # 4. Ordenar y finalizar try: sorted_uids = sorted(all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float('inf')) except ValueError: print(f"Advertencia: UIDs no puramente numéricos en red {network_id}. Ordenando alfabéticamente.") sorted_uids = sorted(all_logic_steps.keys()) network_logic = [all_logic_steps[uid] for uid in sorted_uids if uid in all_logic_steps] # Devolver estructura de red con ID, título, comentario y lógica return {'id': network_id, 'title': network_title, 'comment': network_comment, 'logic': network_logic} def convert_xml_to_json(xml_filepath, json_filepath): """ Función principal que orquesta la conversión del archivo XML de Openness a un archivo JSON simplificado que representa la estructura del bloque FC, incluyendo comentarios y lógica ENO no trivial. """ print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") if not os.path.exists(xml_filepath): print(f"Error Crítico: Archivo XML no encontrado en '{xml_filepath}'") return try: print("Paso 1: Parseando archivo XML...") parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(xml_filepath, parser) root = tree.getroot() print("Paso 1: Parseo XML completado.") print("Paso 2: Buscando el bloque SW.Blocks.FC...") fc_block_list = root.xpath("//*[local-name()='SW.Blocks.FC']") if not fc_block_list: print("Error Crítico: No se encontró el elemento en el archivo.") return fc_block = fc_block_list[0] print(f"Paso 2: Bloque SW.Blocks.FC encontrado (ID={fc_block.get('ID')}).") print("Paso 3: Extrayendo atributos del bloque...") attribute_list_node = fc_block.xpath("./*[local-name()='AttributeList']") block_name_val = "Unknown" block_number_val = None block_lang_val = "Unknown" if attribute_list_node: attr_list = attribute_list_node[0] name_node = attr_list.xpath("./*[local-name()='Name']/text()") if name_node: block_name_val = name_node[0].strip() num_node = attr_list.xpath("./*[local-name()='Number']/text()") if num_node and num_node[0].isdigit(): block_number_val = int(num_node[0]) lang_node = attr_list.xpath("./*[local-name()='ProgrammingLanguage']/text()") if lang_node: block_lang_val = lang_node[0].strip() print(f"Paso 3: Atributos extraídos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'") else: print("Advertencia: No se encontró AttributeList para el bloque FC.") # *** NUEVO: Extraer comentario del bloque *** block_comment_val = "" # El comentario del bloque suele estar en ObjectList > MultilingualText[@CompositionName='Comment'] comment_node_list = fc_block.xpath("./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']") if comment_node_list: block_comment_val = get_multilingual_text(comment_node_list[0]) print(f"Paso 3b: Comentario del bloque extraído: '{block_comment_val[:50]}...'") result = { "block_name": block_name_val, "block_number": block_number_val, "language": block_lang_val, "block_comment": block_comment_val, # Añadido comentario del bloque "interface": {}, "networks": [] } print("Paso 4: Extrayendo la interfaz del bloque...") interface_found = False if attribute_list_node: interface_node_list = attribute_list_node[0].xpath("./*[local-name()='Interface']") if interface_node_list: interface_node = interface_node_list[0] interface_found = True print("Paso 4: Nodo Interface encontrado dentro de AttributeList.") for section in interface_node.xpath(".//iface:Section", namespaces=ns): section_name = section.get('Name') members = [] for member in section.xpath("./iface:Member", namespaces=ns): member_name = member.get('Name') member_dtype = member.get('Datatype') if member_name and member_dtype: members.append({"name": member_name, "datatype": member_dtype}) if members: result["interface"][section_name] = members if not result["interface"]: print("Advertencia: Nodo Interface encontrado, pero no contenía secciones iface:Section válidas.") else: print("Advertencia: No se encontró el nodo DENTRO de .") if not interface_found and not result["interface"]: print("Advertencia: No se pudo extraer ninguna información de la interfaz.") print("Paso 5: Extrayendo la lógica de las redes (CompileUnits)...") networks_processed_count = 0 object_list_node = fc_block.xpath("./*[local-name()='ObjectList']") if object_list_node: compile_units = object_list_node[0].xpath("./*[local-name()='SW.Blocks.CompileUnit']") print(f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit.") for network_elem in compile_units: networks_processed_count += 1 print(f"DEBUG: Procesando red #{networks_processed_count} (ID={network_elem.get('ID')})...") parsed_network = parse_network(network_elem) # Ahora parse_network incluye comentario if parsed_network and parsed_network.get('error') is None: result["networks"].append(parsed_network) elif parsed_network: print(f"Error: Falló el parseo de la red ID={parsed_network.get('id')}: {parsed_network.get('error')}") result["networks"].append(parsed_network) else: print(f"Error: parse_network devolvió None para un CompileUnit (ID={network_elem.get('ID')}).") if networks_processed_count == 0: print("Advertencia: ObjectList encontrado, pero no contenía SW.Blocks.CompileUnit.") else: print("Advertencia: No se encontró ObjectList para el bloque FC.") print("Paso 6: Escribiendo el resultado en el archivo JSON...") # Chequeos finales if not result["interface"]: print("ADVERTENCIA FINAL: La sección 'interface' está vacía.") if not result["networks"]: print("ADVERTENCIA FINAL: La sección 'networks' está vacía.") else: # Chequea si alguna instrucción tiene lógica ENO interesante eno_logic_found = any(instr.get('eno_logic') for net in result.get('networks', []) if net.get('error') is None for instr in net.get('logic', [])) if eno_logic_found: print("INFO FINAL: Se detectó lógica ENO interesante en al menos una instrucción.") else: print("INFO FINAL: No se detectó lógica ENO interesante (solo conexiones directas ENO->EN o ENO no conectado).") try: with open(json_filepath, 'w', encoding='utf-8') as f: json.dump(result, f, indent=4, ensure_ascii=False) print(f"Paso 6: Escritura completada.") print(f"Conversión finalizada con éxito. Archivo JSON guardado en: '{json_filepath}'") except IOError as e: print(f"Error Crítico: No se pudo escribir el archivo JSON en '{json_filepath}'. Error: {e}") except TypeError as e: print(f"Error Crítico: Problema al serializar datos a JSON. Error: {e}") except etree.XMLSyntaxError as e: print(f"Error Crítico: Error de sintaxis en el archivo XML '{xml_filepath}'. Detalles: {e}") except Exception as e: print(f"Error Crítico: Ocurrió un error inesperado durante el procesamiento: {e}") print("--- Traceback ---"); traceback.print_exc(); print("--- Fin Traceback ---") # --- Punto de Entrada Principal --- if __name__ == "__main__": xml_file = 'BlenderRun_ProdTime.xml' json_file = 'BlenderRun_ProdTime_simplified.json' convert_xml_to_json(xml_file, json_file)