# -*- coding: utf-8 -*- import json import argparse import os import re from lxml import etree import traceback from collections import defaultdict # --- Namespaces --- # Se añade el namespace 'st' para Structured Text ns = { "iface": "http://www.siemens.com/automation/Openness/SW/Interface/v5", "flg": "http://www.siemens.com/automation/Openness/SW/NetworkSource/FlgNet/v4", "st": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StructuredText/v3", "stl": "http://www.siemens.com/automation/Openness/SW/NetworkSource/StatementList/v4", } # --- Helper Functions --- def get_multilingual_text(element, default_lang="en-US", fallback_lang="it-IT"): # (Sin cambios respecto a la versión anterior) if element is None: return "" try: xpath_expr = ( f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{default_lang}']]" f"/*[local-name()='AttributeList']/*[local-name()='Text']" ) text_items = element.xpath(xpath_expr) if text_items and text_items[0].text is not None: return text_items[0].text.strip() xpath_expr = ( f".//*[local-name()='MultilingualTextItem'][*[local-name()='AttributeList']/*[local-name()='Culture' and text()='{fallback_lang}']]" f"/*[local-name()='AttributeList']/*[local-name()='Text']" ) text_items = element.xpath(xpath_expr) if text_items and text_items[0].text is not None: return text_items[0].text.strip() xpath_expr = f".//*[local-name()='MultilingualTextItem']/*[local-name()='AttributeList']/*[local-name()='Text']" text_items = element.xpath(xpath_expr) if text_items and text_items[0].text is not None: return text_items[0].text.strip() return "" except Exception as e: print(f"Advertencia: Error extrayendo MultilingualText: {e}") return "" def get_symbol_name(symbol_element): # (Sin cambios respecto a la versión anterior) if symbol_element is None: return None try: components = symbol_element.xpath("./*[local-name()='Component']/@Name") return ".".join(f'"{c}"' for c in components) if components else None except Exception as e: print(f"Advertencia: Excepción en get_symbol_name: {e}") return None def parse_access(access_element): # (Sin cambios respecto a la versión anterior) if access_element is None: return None uid = access_element.get("UId") scope = access_element.get("Scope") info = {"uid": uid, "scope": scope, "type": "unknown"} symbol = access_element.xpath("./*[local-name()='Symbol']") constant = access_element.xpath("./*[local-name()='Constant']") if symbol: info["type"] = "variable" info["name"] = get_symbol_name(symbol[0]) if info["name"] is None: info["type"] = "error_parsing_symbol" print(f"Error: No se pudo parsear nombre símbolo Access UID={uid}") return info elif constant: info["type"] = "constant" const_type_elem = constant[0].xpath("./*[local-name()='ConstantType']") const_val_elem = constant[0].xpath("./*[local-name()='ConstantValue']") info["datatype"] = ( const_type_elem[0].text if const_type_elem and const_type_elem[0].text is not None else "Unknown" ) value_str = ( const_val_elem[0].text if const_val_elem and const_val_elem[0].text is not None else None ) if value_str is None: info["type"] = "error_parsing_constant" info["value"] = None print(f"Error: Constante sin valor Access UID={uid}") return info if info["datatype"] == "Unknown": val_lower = value_str.lower() if val_lower in ["true", "false"]: info["datatype"] = "Bool" elif value_str.isdigit() or ( value_str.startswith("-") and value_str[1:].isdigit() ): info["datatype"] = "Int" elif "." in value_str: try: float(value_str) info["datatype"] = "Real" except ValueError: pass elif "#" in value_str: info["datatype"] = "TypedConstant" info["value"] = value_str dtype_lower = info["datatype"].lower() val_str_processed = value_str.split("#")[-1] if "#" in value_str else value_str try: if dtype_lower in [ "int", "dint", "udint", "sint", "usint", "lint", "ulint", "word", "dword", "lword", "byte", ]: info["value"] = int(val_str_processed) elif dtype_lower == "bool": info["value"] = ( val_str_processed.lower() == "true" or val_str_processed == "1" ) elif dtype_lower in ["real", "lreal"]: info["value"] = float(val_str_processed) elif dtype_lower == "typedconstant": info["value"] = value_str except (ValueError, TypeError) as e: print( f"Advertencia: No se pudo convertir valor '{val_str_processed}' a {dtype_lower} UID={uid}. Error: {e}" ) info["value"] = value_str else: info["type"] = "unknown_structure" print(f"Advertencia: Access UID={uid} no es Symbol ni Constant.") return info if info["type"] == "variable" and info.get("name") is None: print(f"Error Interno: parse_access var sin nombre UID {uid}.") info["type"] = "error_no_name" return info return info def parse_part(part_element): # (Sin cambios respecto a la versión anterior) if part_element is None: return None uid = part_element.get("UId") name = part_element.get("Name") if not uid or not name: print( f"Error: Part sin UID o Name: {etree.tostring(part_element, encoding='unicode')}" ) return None template_values = {} try: for tv in part_element.xpath("./*[local-name()='TemplateValue']"): tv_name = tv.get("Name") tv_type = tv.get("Type") if tv_name and tv_type: template_values[tv_name] = tv_type except Exception as e: print(f"Advertencia: Error extrayendo TemplateValues Part UID={uid}: {e}") negated_pins = {} try: for negated_elem in part_element.xpath("./*[local-name()='Negated']"): negated_pin_name = negated_elem.get("Name") if negated_pin_name: negated_pins[negated_pin_name] = True except Exception as e: print(f"Advertencia: Error extrayendo Negated Pins Part UID={uid}: {e}") return { "uid": uid, "type": name, "template_values": template_values, "negated_pins": negated_pins, } def parse_call(call_element): # (Mantiene la corrección para DB de instancia) if call_element is None: return None uid = call_element.get("UId") if not uid: print( f"Error: Call encontrado sin UID: {etree.tostring(call_element, encoding='unicode')}" ) return None call_info_elem = call_element.xpath("./*[local-name()='CallInfo']") if not call_info_elem: print(f"Error: Call UID {uid} sin elemento CallInfo.") return None call_info = call_info_elem[0] block_name = call_info.get("Name") block_type = call_info.get("BlockType") instance_name = None instance_scope = None if not block_name or not block_type: print(f"Error: CallInfo para UID {uid} sin Name o BlockType.") return None if block_type == "FB": instance_elem_list = call_info.xpath("./*[local-name()='Instance']") if instance_elem_list: instance_elem = instance_elem_list[0] instance_scope = instance_elem.get("Scope") component_elem_list = instance_elem.xpath( "./*[local-name()='Component']" ) # Busca Component directo if component_elem_list: component_elem = component_elem_list[0] db_name_raw = component_elem.get("Name") if db_name_raw: instance_name = f'"{db_name_raw}"' # Añade comillas else: print( f"Advertencia: dentro de para FB Call UID {uid} no tiene atributo 'Name'." ) else: print( f"Advertencia: No se encontró dentro de para FB Call UID {uid}. No se pudo obtener el nombre del DB." ) else: print( f"Advertencia: FB Call '{block_name}' UID {uid} no tiene elemento ." ) call_data = { "uid": uid, "type": "Call", "block_name": block_name, "block_type": block_type, } if instance_name: call_data["instance_db"] = instance_name if instance_scope: call_data["instance_scope"] = instance_scope return call_data # SCL (Structured Text) Parser def reconstruct_scl_from_tokens(st_node): """ Reconstruye SCL desde , mejorando el manejo de variables, constantes literales, tokens básicos, espacios y saltos de línea. """ if st_node is None: return "// Error: StructuredText node not found.\n" scl_parts = [] children = st_node.xpath("./st:*", namespaces=ns) for elem in children: tag = etree.QName(elem.tag).localname if tag == "Token": scl_parts.append(elem.get("Text", "")) elif tag == "Blank": # Añadir espacios simples, evitar múltiples si ya hay uno antes/después if not scl_parts or not scl_parts[-1].endswith(' '): scl_parts.append(" " * int(elem.get("Num", 1))) elif int(elem.get("Num", 1)) > 1: # Añadir extras si son más de 1 scl_parts.append(" " * (int(elem.get("Num", 1))-1)) elif tag == "NewLine": # Limpiar espacios antes del salto de línea real if scl_parts: scl_parts[-1] = scl_parts[-1].rstrip() scl_parts.append("\n") elif tag == "Access": scope = elem.get("Scope") access_str = f"/*_ERR_Scope_{scope}_*/" # Fallback más informativo if scope in ["GlobalVariable", "LocalVariable", "TempVariable", "InOutVariable", "InputVariable", "OutputVariable", "ConstantVariable"]: # Tipos comunes de variables symbol_elem = elem.xpath("./st:Symbol", namespaces=ns) if symbol_elem: components = symbol_elem[0].xpath("./st:Component", namespaces=ns) symbol_text_parts = [] for i, comp in enumerate(components): name = comp.get("Name", "_ERR_COMP_") # Añadir punto si no es el primer componente if i > 0: symbol_text_parts.append(".") # Reconstrucción de comillas (heurística) has_quotes_elem = comp.xpath("../st:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns) has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true" is_temp = name.startswith('#') if has_quotes or (i == 0 and not is_temp): # Comillas si HasQuotes o primer componente (no temp) symbol_text_parts.append(f'"{name}"') else: symbol_text_parts.append(name) # Manejar índices de array (RECURSIVO) index_access = comp.xpath("./st:Access", namespaces=ns) if index_access: # Llama recursivamente para obtener el texto de cada índice indices_text = [reconstruct_scl_from_tokens(idx_node) for idx_node in index_access] symbol_text_parts.append(f"[{','.join(indices_text)}]") access_str = "".join(symbol_text_parts) elif scope == "LiteralConstant": constant_elem = elem.xpath("./st:Constant", namespaces=ns) if constant_elem: val_elem = constant_elem[0].xpath("./st:ConstantValue/text()", namespaces=ns) type_elem = constant_elem[0].xpath("./st:ConstantType/text()", namespaces=ns) const_type = type_elem[0] if type_elem else "" const_val = val_elem[0] if val_elem else "_ERR_CONSTVAL_" # **CORRECCIÓN CLAVE**: Usar el valor extraído access_str = const_val # Opcional: añadir prefijos T#, L#, etc. si es necesario # if const_type == "Time": access_str = f"T#{const_val}" # elif const_type == "LTime": access_str = f"LT#{const_val}" # ... otros tipos ... else: access_str = "/*_ERR_NOCONST_*/" # --- Añadir más manejo de scopes aquí si es necesario --- # elif scope == "Call": access_str = reconstruct_call(elem) # elif scope == "Expression": access_str = reconstruct_expression(elem) scl_parts.append(access_str) elif tag == "Comment" or tag == "LineComment": comment_text = "".join(elem.xpath(".//text()")).strip() if tag == "Comment": scl_parts.append(f"(* {comment_text} *)") else: scl_parts.append(f"// {comment_text}") # else: Ignorar otros nodos # Unir partes, limpiar espacios extra alrededor de operadores y saltos de línea full_scl = "".join(scl_parts) # Re-indentar líneas después de IF/THEN, etc. (Simplificado) output_lines = [] indent_level = 0 for line in full_scl.split('\n'): line = line.strip() if not line: continue # Saltar líneas vacías # Reducir indentación antes de procesar END_IF, ELSE, etc. (simplificado) if line.startswith(('END_IF', 'END_WHILE', 'END_FOR', 'END_CASE', 'ELSE', 'ELSIF')): indent_level = max(0, indent_level - 1) output_lines.append(" " * indent_level + line) # Aplicar indentación # Aumentar indentación después de IF, WHILE, FOR, CASE, ELSE, ELSIF (simplificado) if line.endswith('THEN') or line.endswith('DO') or line.endswith('OF') or line == 'ELSE': indent_level += 1 # Nota: Esto no maneja bloques BEGIN/END dentro de SCL return "\n".join(output_lines) # STL (Statement List) Parser def get_access_text(access_element): """Reconstruye una representación textual simple de un Access en STL.""" if access_element is None: return "_ERR_ACCESS_" scope = access_element.get("Scope") # Intenta reconstruir el símbolo # CORREGIDO: Añadido namespaces=ns symbol_elem = access_element.xpath("./stl:Symbol", namespaces=ns) if symbol_elem: # CORREGIDO: Añadido namespaces=ns components = symbol_elem[0].xpath("./stl:Component", namespaces=ns) parts = [] for comp in components: name = comp.get("Name", "_ERR_COMP_") # CORREGIDO: Añadido namespaces=ns has_quotes_elem = comp.xpath("../stl:BooleanAttribute[@Name='HasQuotes']/text()", namespaces=ns) has_quotes = has_quotes_elem and has_quotes_elem[0].lower() == "true" # Usar nombre tal cual por ahora parts.append(name) # Añadir índices si existen # CORREGIDO: Añadido namespaces=ns index_access = comp.xpath("./stl:Access", namespaces=ns) if index_access: indices = [get_access_text(ia) for ia in index_access] parts.append(f"[{','.join(indices)}]") return ".".join(parts) # Intenta reconstruir constante # CORREGIDO: Añadido namespaces=ns constant_elem = access_element.xpath("./stl:Constant", namespaces=ns) if constant_elem: # CORREGIDO: Añadido namespaces=ns val_elem = constant_elem[0].xpath("./stl:ConstantValue/text()", namespaces=ns) type_elem = constant_elem[0].xpath("./stl:ConstantType/text()", namespaces=ns) # Obtener tipo para mejor formato const_type = type_elem[0] if type_elem else "" const_val = val_elem[0] if val_elem else "_ERR_CONST_" # Añadir prefijo de tipo si es necesario (ej. T# , L#) - Simplificado if const_type == "Time": return f"T#{const_val}" if const_type == "ARef": return f"{const_val}" # No necesita prefijo # Añadir más tipos si es necesario return const_val # Valor directo para otros tipos # Intenta reconstruir etiqueta # CORREGIDO: Añadido namespaces=ns label_elem = access_element.xpath("./stl:Label", namespaces=ns) if label_elem: name = label_elem[0].get("Name", "_ERR_LABEL_") return name # Intenta reconstruir acceso indirecto (simplificado) # CORREGIDO: Añadido namespaces=ns indirect_elem = access_element.xpath("./stl:Indirect", namespaces=ns) if indirect_elem: reg = indirect_elem[0].get("Register", "AR?") offset_str = indirect_elem[0].get("BitOffset", "0") area = indirect_elem[0].get("Area", "DB") width = indirect_elem[0].get("Width", "X") # Convertir BitOffset a formato P#Byte.Bit try: bit_offset = int(offset_str) byte_offset = bit_offset // 8 bit_in_byte = bit_offset % 8 p_format_offset = f"P#{byte_offset}.{bit_in_byte}" except ValueError: p_format_offset = "P#?.?" # Formatear ancho width_map = {"Bit": "X", "Byte": "B", "Word": "W", "Double": "D"} width_char = width_map.get(width, width[0] if width else "?") # Usa primera letra si no mapeado return f"{area}{width_char}[{reg},{p_format_offset}]" # Intenta reconstruir dirección absoluta # CORREGIDO: Añadido namespaces=ns address_elem = access_element.xpath("./stl:Address", namespaces=ns) if address_elem: area = address_elem[0].get("Area", "??") bit_offset_str = address_elem[0].get("BitOffset", "0") addr_type_str = address_elem[0].get("Type", "Bool") # Obtener tipo para ancho try: bit_offset = int(bit_offset_str) byte_offset = bit_offset // 8 bit_in_byte = bit_offset % 8 # Determinar ancho basado en tipo (simplificación) addr_width = "X" # Default a Bit if addr_type_str == "Byte": addr_width = "B" elif addr_type_str == "Word": addr_width = "W" elif addr_type_str in ["DWord", "DInt"]: addr_width = "D" # Añadir más tipos si es necesario (Real, etc.) # Mapear Area para STL estándar area_map = { "Input": "I", "Output": "Q", "Memory": "M", "PeripheryInput": "PI", "PeripheryOutput": "PQ", "DB": "DB", "DI": "DI", "Local": "L", # L no siempre válido aquí "Timer": "T", "Counter": "C" } stl_area = area_map.get(area, area) # Manejar DB/DI que necesitan número de bloque if stl_area in ["DB", "DI"]: block_num = address_elem[0].get("BlockNumber") if block_num: return f"{stl_area}{block_num}.{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DB1.DBX0.1 else: # Acceso con registro DB/DI return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: DBX0.1 elif stl_area in ["T", "C"]: return f"{stl_area}{byte_offset}" # Los timers/contadores solo usan el número else: # I, Q, M, L, PI, PQ return f"{stl_area}{addr_width}{byte_offset}.{bit_in_byte}" # Ej: M10.1, I0.0 except ValueError: return f"{area}?{bit_offset_str}?" return f"_{scope}_?" # Fallback def get_comment_text(comment_element): """Extrae texto de un LineComment o Comment.""" if comment_element is None: return "" # Usar get_multilingual_text si los comentarios son multilingües # Si no, extraer texto directamente ml_texts = comment_element.xpath(".//mlt:MultilingualTextItem/mlt:AttributeList/mlt:Text/text()", namespaces={'mlt': "http://www.siemens.com/automation/Openness/SW/Interface/v5"}) # Asumiendo ns if ml_texts: # Podrías intentar obtener un idioma específico o simplemente el primero return ml_texts[0].strip() if ml_texts else "" # Fallback a texto directo si no hay estructura multilingüe text_nodes = comment_element.xpath("./text()") return "".join(text_nodes).strip() def reconstruct_stl_from_statementlist(statement_list_node): """Reconstruye el código STL como una cadena de texto desde .""" if statement_list_node is None: return "// Error: StatementList node not found.\n" stl_lines = [] # CORREGIDO: Añadido namespaces=ns statements = statement_list_node.xpath("./stl:StlStatement", namespaces=ns) for stmt in statements: line_parts = [] line_comment = "" # Comentario al final de la línea # 1. Comentarios al inicio de la línea (como líneas separadas //) # CORREGIDO: Añadido namespaces=ns initial_comments = stmt.xpath("child::stl:Comment | child::stl:LineComment", namespaces=ns) for comm in initial_comments: comment_text = get_comment_text(comm) if comment_text: # Dividir comentarios multilínea en varias líneas // for comment_line in comment_text.splitlines(): stl_lines.append(f"// {comment_line}") # 2. Etiqueta (si existe) # CORREGIDO: Añadido namespaces=ns label_decl = stmt.xpath("./stl:LabelDeclaration", namespaces=ns) label_str = "" if label_decl: # CORREGIDO: Añadido namespaces=ns label_name_nodes = label_decl[0].xpath("./stl:Label/@Name", namespaces=ns) if label_name_nodes: label_str = f"{label_name_nodes[0]}:" # Buscar comentarios DENTRO de LabelDeclaration pero después de Label # CORREGIDO: Añadido namespaces=ns label_comments = label_decl[0].xpath("./stl:Comment | ./stl:LineComment", namespaces=ns) for lcomm in label_comments: comment_text = get_comment_text(lcomm) if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea # 3. Token de Instrucción STL # CORREGIDO: Añadido namespaces=ns instruction_token = stmt.xpath("./stl:StlToken", namespaces=ns) instruction_str = "" if instruction_token: token_text = instruction_token[0].get("Text", "_ERR_TOKEN_") instruction_str = token_text # Comentarios asociados directamente al token # CORREGIDO: Añadido namespaces=ns token_comments = instruction_token[0].xpath("./stl:Comment | ./stl:LineComment", namespaces=ns) for tcomm in token_comments: comment_text = get_comment_text(tcomm) if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea # 4. Acceso/Operando STL # CORREGIDO: Añadido namespaces=ns access_elem = stmt.xpath("./stl:Access", namespaces=ns) access_str = "" if access_elem: access_text = get_access_text(access_elem[0]) access_str = access_text # Comentarios DENTRO del Access (pueden ser de línea o bloque) # CORREGIDO: Añadido namespaces=ns access_comments = access_elem[0].xpath("child::stl:LineComment | child::stl:Comment", namespaces=ns) for acc_comm in access_comments: comment_text = get_comment_text(acc_comm) if comment_text: line_comment += f" // {comment_text}" # Añadir al comentario de línea # Construir la línea: Etiqueta (si hay) + Tab + Instrucción + Espacio + Operando (si hay) + Comentario(s) current_line = "" if label_str: current_line += label_str if instruction_str: if current_line: # Si ya había etiqueta, añadir tabulador current_line += "\t" current_line += instruction_str if access_str: if current_line: # Si ya había algo, añadir espacio current_line += " " current_line += access_str if line_comment: # Añadir espacio antes del comentario si hay código en la línea if current_line.strip(): current_line += f" {line_comment}" else: # Si la línea estaba vacía (solo comentarios iniciales), poner el comentario de línea current_line = line_comment # Añadir la línea construida solo si no está vacía if current_line.strip(): stl_lines.append(current_line.rstrip()) # Eliminar espacios finales return "\n".join(stl_lines) # --- Main Parsing Function --- def parse_network(network_element): """ Parsea una red, extrae lógica y añade conexiones EN implícitas. Maneja wires con múltiples destinos. """ if network_element is None: return { "id": "ERROR", "title": "Invalid Network Element", "comment": "", "logic": [], "error": "Input element was None", } network_id = network_element.get("ID") # --- Extracción Título/Comentario (sin cambios respecto a la última versión) --- title_element = network_element.xpath( ".//*[local-name()='MultilingualText'][@CompositionName='Title']" ) network_title = ( get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}" ) comment_element = network_element.xpath( "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" ) network_comment = ( get_multilingual_text(comment_element[0]) if comment_element else "" ) flgnet_list = network_element.xpath(".//flg:FlgNet", namespaces=ns) if not flgnet_list: # print(f"Advertencia: FlgNet no encontrado en Red ID={network_id}. Puede estar vacía o ser comentario.") return { "id": network_id, "title": network_title, "comment": network_comment, "logic": [], "error": "FlgNet not found", } flgnet = flgnet_list[0] # 1. Parsear Access, Parts y Calls (sin cambios) access_map = { acc_info["uid"]: acc_info for acc in flgnet.xpath(".//flg:Access", namespaces=ns) if (acc_info := parse_access(acc)) and acc_info["type"] != "unknown" } parts_and_calls_map = {} instruction_elements = flgnet.xpath(".//flg:Part | .//flg:Call", namespaces=ns) for element in instruction_elements: parsed_info = None tag_name = etree.QName( element.tag ).localname # Obtener nombre local de la etiqueta if tag_name == "Part": parsed_info = parse_part(element) elif tag_name == "Call": parsed_info = parse_call(element) if parsed_info and "uid" in parsed_info: parts_and_calls_map[parsed_info["uid"]] = parsed_info else: print( f"Advertencia: Se ignoró un Part/Call inválido en la red {network_id}" ) # --- 2. Parsear Wires (MODIFICADO para multi-destino) --- wire_connections = defaultdict( list ) # (dest_uid, dest_pin) -> [(src_uid, src_pin), ...] source_connections = defaultdict( list ) # (src_uid, src_pin) -> [(dest_uid, dest_pin), ...] eno_outputs = defaultdict( list ) # src_uid -> [(dest_uid, dest_pin), ...] (conexiones DESDE eno) flg_ns_uri = ns["flg"] # Cache namespace URI qname_powerrail = etree.QName(flg_ns_uri, "Powerrail") qname_identcon = etree.QName(flg_ns_uri, "IdentCon") qname_namecon = etree.QName(flg_ns_uri, "NameCon") for wire in flgnet.xpath(".//flg:Wire", namespaces=ns): children = wire.getchildren() if len(children) < 2: continue # Ignorar wires sin fuente y al menos un destino source_elem = children[0] source_uid, source_pin = None, None # Determinar fuente if source_elem.tag == qname_powerrail: source_uid, source_pin = "POWERRAIL", "out" elif source_elem.tag == qname_identcon: source_uid, source_pin = ( source_elem.get("UId"), "value", ) # Acceso a variable/constante elif source_elem.tag == qname_namecon: source_uid, source_pin = source_elem.get("UId"), source_elem.get( "Name" ) # Salida de instrucción if source_uid is None: continue # No se pudo determinar la fuente source_info = (source_uid, source_pin) # Par de fuente # Iterar sobre TODOS los posibles destinos (desde el segundo hijo en adelante) for dest_elem in children[1:]: dest_uid, dest_pin = None, None # Determinar destino if dest_elem.tag == qname_identcon: dest_uid, dest_pin = ( dest_elem.get("UId"), "value", ) # Entrada a variable/constante (Coil, etc.) elif dest_elem.tag == qname_namecon: dest_uid, dest_pin = dest_elem.get("UId"), dest_elem.get( "Name" ) # Entrada a instrucción # Guardar conexiones si son válidas if dest_uid is not None and dest_pin is not None: # Mapa de Conexiones (Destino -> [Fuentes]) dest_key = (dest_uid, dest_pin) if source_info not in wire_connections[dest_key]: wire_connections[dest_key].append(source_info) # Mapa de Fuentes (Fuente -> [Destinos]) source_key = (source_uid, source_pin) dest_info = (dest_uid, dest_pin) if dest_info not in source_connections[source_key]: source_connections[source_key].append(dest_info) # Registrar conexiones que SALEN de un pin 'eno' if source_pin == "eno" and source_uid in parts_and_calls_map: if dest_info not in eno_outputs[source_uid]: eno_outputs[source_uid].append(dest_info) # else: # Debug opcional si un elemento no es destino válido # print(f"Advertencia: Elemento en Wire {wire.get('UId')} no es destino válido: {etree.tostring(dest_elem)}") # --- FIN MODIFICACIÓN Wire --- # 3. Construcción Lógica Inicial (sin cambios) all_logic_steps = {} functional_block_types = [ "Move", "Add", "Sub", "Mul", "Div", "Mod", "Convert", "Call", "Se", "Sd", "BLKMOV", ] rlo_generators = [ "Contact", "O", "Eq", "Ne", "Gt", "Lt", "Ge", "Le", "And", "Xor", "PBox", "NBox", ] for instruction_uid, instruction_info in parts_and_calls_map.items(): # Copiar info básica instruction_repr = {"instruction_uid": instruction_uid, **instruction_info} instruction_repr["inputs"] = {} instruction_repr["outputs"] = {} # --- INICIO: Manejo Especial SdCoil y otros Timers --- original_type = instruction_info["type"] current_type = original_type input_pin_mapping = {} # Mapa XML pin -> JSON pin output_pin_mapping = {} # Mapa XML pin -> JSON pin if original_type == "SdCoil": print( f" Advertencia: Reinterpretando 'SdCoil' (UID: {instruction_uid}) como 'Se' (Pulse Timer)." ) current_type = "Se" # Tratarlo como Se (TP) input_pin_mapping = { "in": "s", # Pin XML 'in' mapea a JSON 's' (Start) "operand": "timer", # Pin XML 'operand' mapea a JSON 'timer' (Instance) "value": "tv", # Pin XML 'value' mapea a JSON 'tv' (Time Value) } output_pin_mapping = {"out": "q"} # Pin XML 'out' mapea a JSON 'q' (Output) elif original_type in ["Se", "Sd"]: # Mapear pines estándar de Se/Sd para consistencia con TP/TON input_pin_mapping = {"s": "s", "tv": "tv", "r": "r", "timer": "timer"} output_pin_mapping = { "q": "q", "rt": "rt", # "rtbcd": "rtbcd" (ignorar BCD) } # Añadir otros mapeos si son necesarios para otros bloques (ej. Contadores) # elif original_type == "CTU": # input_pin_mapping = {"cu": "cu", "r": "r", "pv": "pv", "counter": "counter"} # 'counter' es inventado para instancia # output_pin_mapping = {"qu": "qu", "cv": "cv"} # --- FIN Manejo Especial --- instruction_repr["type"] = current_type # Actualizar el tipo si se cambió # Mapear Entradas usando el mapeo de pines possible_input_pins = set( [ "en", "in", "in1", "in2", "s", "r", "tv", "value", "operand", "timer", "bit", "clk", "pv", "cu", "cd", "ld", "pre", "SRCBLK", ] ) # Ampliar con pines conocidos for xml_pin_name in possible_input_pins: dest_key = (instruction_uid, xml_pin_name) if dest_key in wire_connections: sources_list = wire_connections[dest_key] input_sources_repr = [] # ... (lógica existente para obtener input_sources_repr de sources_list) ... for source_uid, source_pin in sources_list: if source_uid == "POWERRAIL": input_sources_repr.append({"type": "powerrail"}) elif source_uid in access_map: input_sources_repr.append(access_map[source_uid]) elif source_uid in parts_and_calls_map: source_instr_info = parts_and_calls_map[source_uid] source_original_type = source_instr_info["type"] # Obtener el mapeo de salida para el tipo de la fuente (si existe) source_output_mapping = {} if source_original_type == "SdCoil": source_output_mapping = {"out": "q"} elif source_original_type in ["Se", "Sd"]: source_output_mapping = {"q": "q", "rt": "rt"} # Usar el pin mapeado si existe, sino el original mapped_source_pin = source_output_mapping.get(source_pin, source_pin) input_sources_repr.append({ "type": "connection", "source_instruction_type": source_original_type, # Guardar tipo original puede ser útil "source_instruction_uid": source_uid, "source_pin": mapped_source_pin # <-- USAR PIN MAPEADO }) else: input_sources_repr.append( {"type": "unknown_source", "uid": source_uid} ) # Usar el nombre de pin mapeado para el JSON json_pin_name = input_pin_mapping.get(xml_pin_name, xml_pin_name) if len(input_sources_repr) == 1: instruction_repr["inputs"][json_pin_name] = input_sources_repr[0] elif len(input_sources_repr) > 1: instruction_repr["inputs"][json_pin_name] = input_sources_repr # Mapear Salidas usando el mapeo de pines possible_output_pins = set( [ "out", "out1", "Q", "q", "eno", "RET_VAL", "DSTBLK", "rt", "rtbcd", "cv", "cvbcd", "QU", "QD", ] ) for xml_pin_name in possible_output_pins: source_key = (instruction_uid, xml_pin_name) if source_key in source_connections: # Usar el nombre de pin mapeado para el JSON json_pin_name = output_pin_mapping.get(xml_pin_name, xml_pin_name) if json_pin_name not in instruction_repr["outputs"]: instruction_repr["outputs"][json_pin_name] = [] for dest_uid, dest_pin in source_connections[source_key]: if dest_uid in access_map: if ( access_map[dest_uid] not in instruction_repr["outputs"][json_pin_name] ): instruction_repr["outputs"][json_pin_name].append( access_map[dest_uid] ) all_logic_steps[instruction_uid] = instruction_repr # 4. Inferencia EN (sin cambios) processed_blocks_en_inference = set() something_changed = True inference_passes = 0 max_inference_passes = len(all_logic_steps) + 5 try: sorted_uids_for_en = sorted( all_logic_steps.keys(), key=lambda x: int(x) if x.isdigit() else float("inf"), ) except ValueError: sorted_uids_for_en = sorted(all_logic_steps.keys()) ordered_logic_list_for_en = [ all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps ] while something_changed and inference_passes < max_inference_passes: something_changed = False inference_passes += 1 for i, instruction in enumerate(ordered_logic_list_for_en): part_uid = instruction["instruction_uid"] part_type_original = ( instruction["type"].replace("_scl", "").replace("_error", "") ) if ( part_type_original in functional_block_types and "en" not in instruction["inputs"] and part_uid not in processed_blocks_en_inference ): inferred_en_source = None if i > 0: for j in range(i - 1, -1, -1): prev_instr = ordered_logic_list_for_en[j] prev_uid = prev_instr["instruction_uid"] prev_type_original = ( prev_instr["type"].replace("_scl", "").replace("_error", "") ) if prev_type_original in rlo_generators: inferred_en_source = { "type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "out", } break elif prev_type_original in functional_block_types: source_key_eno = (prev_uid, "eno") if source_key_eno in source_connections: inferred_en_source = { "type": "connection", "source_instruction_uid": prev_uid, "source_instruction_type": prev_type_original, "source_pin": "eno", } break else: continue elif prev_type_original in [ "Coil", "SCoil", "RCoil", "SetCoil", "ResetCoil", "SdCoil", ]: break if inferred_en_source: all_logic_steps[part_uid]["inputs"]["en"] = inferred_en_source processed_blocks_en_inference.add(part_uid) something_changed = True # 5. Añadir lógica ENO interesante (sin cambios) for source_instr_uid, eno_destinations in eno_outputs.items(): if source_instr_uid not in all_logic_steps: continue interesting_eno_logic = [] for dest_uid, dest_pin in eno_destinations: is_direct_en_connection = False if dest_uid in parts_and_calls_map and dest_pin == "en": try: source_idx = sorted_uids_for_en.index(source_instr_uid) dest_idx = sorted_uids_for_en.index(dest_uid) if ( dest_idx == source_idx + 1 and parts_and_calls_map[dest_uid]["type"] in functional_block_types ): is_direct_en_connection = True except ValueError: pass if not is_direct_en_connection: target_info = {"target_pin": dest_pin} if dest_uid in parts_and_calls_map: target_info.update( { "target_type": "instruction", "target_uid": dest_uid, "target_name": parts_and_calls_map[dest_uid].get( "name", parts_and_calls_map[dest_uid].get("type") ), } ) elif dest_uid in access_map: target_info.update( { "target_type": "operand", "target_details": access_map[dest_uid], } ) else: target_info.update( {"target_type": "unknown", "target_uid": dest_uid} ) interesting_eno_logic.append(target_info) if interesting_eno_logic: all_logic_steps[source_instr_uid]["eno_logic"] = interesting_eno_logic # 6. Ordenar y Devolver (sin cambios) network_logic_final = [ all_logic_steps[uid] for uid in sorted_uids_for_en if uid in all_logic_steps ] return { "id": network_id, "title": network_title, "comment": network_comment, "logic": network_logic_final, } def convert_xml_to_json(xml_filepath, json_filepath): print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") if not os.path.exists(xml_filepath): print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'") return try: print("Paso 1: Parseando archivo XML...") parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(xml_filepath, parser) root = tree.getroot() print("Paso 1: Parseo XML completado.") print("Paso 2: Buscando el bloque SW.Blocks.FC...") # Asume FC primero block_list = root.xpath("//*[local-name()='SW.Blocks.FC']") block_type_found = "FC" if not block_list: block_list = root.xpath( "//*[local-name()='SW.Blocks.FB']" ) # Busca FB si no hay FC block_type_found = "FB" if not block_list: print("Error Crítico: No se encontró ni .") return else: print( "Advertencia: Se encontró en lugar de ." ) the_block = block_list[0] print( f"Paso 2: Bloque SW.Blocks.{block_type_found} encontrado (ID={the_block.get('ID')})." ) print("Paso 3: Extrayendo atributos del bloque...") attribute_list_node = the_block.xpath("./*[local-name()='AttributeList']") block_name_val, block_number_val, block_lang_val = "Unknown", None, "Unknown" if attribute_list_node: attr_list = attribute_list_node[0] name_node = attr_list.xpath("./*[local-name()='Name']/text()") block_name_val = name_node[0].strip() if name_node else block_name_val num_node = attr_list.xpath("./*[local-name()='Number']/text()") try: block_number_val = int(num_node[0]) if num_node else None except ValueError: block_number_val = None lang_node = attr_list.xpath( "./*[local-name()='ProgrammingLanguage']/text()" ) block_lang_val = lang_node[0].strip() if lang_node else block_lang_val print( f"Paso 3: Atributos: Nombre='{block_name_val}', Número={block_number_val}, Lenguaje='{block_lang_val}'" ) else: print( f"Advertencia: No se encontró AttributeList para el bloque {block_type_found}." ) block_comment_val = "" comment_node_list = the_block.xpath( "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" ) if comment_node_list: block_comment_val = get_multilingual_text(comment_node_list[0]) print(f"Paso 3b: Comentario bloque: '{block_comment_val[:50]}...'") result = { "block_name": block_name_val, "block_number": block_number_val, "language": block_lang_val, "block_comment": block_comment_val, "interface": {}, "networks": [], } print("Paso 4: Extrayendo la interfaz del bloque...") if attribute_list_node: interface_node_list = attribute_list_node[0].xpath( ".//*[local-name()='Interface']" ) if interface_node_list: interface_node = interface_node_list[0] print("Paso 4: Nodo Interface encontrado.") for section in interface_node.xpath(".//iface:Section", namespaces=ns): section_name = section.get("Name") if not section_name: continue members = [] for member in section.xpath("./iface:Member", namespaces=ns): member_name = member.get("Name") member_dtype = member.get("Datatype") if member_name and member_dtype: members.append( {"name": member_name, "datatype": member_dtype} ) if members: result["interface"][section_name] = members if not result["interface"]: print("Advertencia: Interface sin secciones iface:Section válidas.") else: print( "Advertencia: No se encontró dentro de ." ) if not result["interface"]: print("Advertencia: No se pudo extraer información de la interfaz.") print("Paso 5: Extrayendo y PROCESANDO lógica de redes (CompileUnits)...") networks_processed_count = 0 result["networks"] = [] # Initialize networks list here object_list_node = the_block.xpath("./*[local-name()='ObjectList']") if object_list_node: compile_units = object_list_node[0].xpath( "./*[local-name()='SW.Blocks.CompileUnit']" ) print( f"Paso 5: Se encontraron {len(compile_units)} elementos SW.Blocks.CompileUnit." ) for network_elem in compile_units: networks_processed_count += 1 network_id = network_elem.get("ID") if not network_id: print(" Advertencia: Se encontró CompileUnit sin ID. Saltando.") continue # --- Detectar lenguaje de la red --- attribute_list = network_elem.xpath("./*[local-name()='AttributeList']") programming_language = "LAD" # Default a LAD si no se especifica network_source_node = None # Nodo if attribute_list: lang_node = attribute_list[0].xpath( "./*[local-name()='ProgrammingLanguage']/text()" ) if lang_node: programming_language = lang_node[0].strip() # Obtener el nodo NetworkSource para pasarlo a los parsers network_source_list = attribute_list[0].xpath( "./*[local-name()='NetworkSource']" ) if network_source_list: network_source_node = network_source_list[0] print( f" - Procesando Red ID={network_id}, Lenguaje={programming_language}" ) # --- Extraer título y comentario (común) --- title_element = network_elem.xpath( ".//*[local-name()='MultilingualText'][@CompositionName='Title']" ) network_title = ( get_multilingual_text(title_element[0]) if title_element else f"Network {network_id}" ) comment_element = network_elem.xpath( "./*[local-name()='ObjectList']/*[local-name()='MultilingualText'][@CompositionName='Comment']" ) network_comment = ( get_multilingual_text(comment_element[0]) if comment_element else "" ) # --- Procesar según el lenguaje --- parsed_network_data = None if programming_language == "SCL": structured_text_node = ( network_source_node.xpath("./st:StructuredText", namespaces=ns) if network_source_node is not None else None ) reconstructed_scl = f"// SCL extraction failed for Network {network_id}: StructuredText node not found.\n" if structured_text_node: print( f" Reconstruyendo SCL desde tokens para red {network_id}..." ) reconstructed_scl = reconstruct_scl_from_tokens( structured_text_node[0] ) # print(f" ... SCL reconstruido (parcial):\n{reconstructed_scl[:200]}...") # Preview opcional else: print( f" Advertencia: No se encontró nodo para red SCL {network_id}." ) parsed_network_data = { "id": network_id, "title": network_title, "comment": network_comment, "language": "SCL", "logic": [ { "instruction_uid": f"SCL_{network_id}", # UID inventado "type": "RAW_SCL_CHUNK", "scl": reconstructed_scl, } ], } # --- NUEVO MANEJO STL --- elif programming_language == "STL": statement_list_node = ( network_source_node.xpath("./stl:StatementList", namespaces=ns) if network_source_node is not None else None ) reconstructed_stl = f"// STL extraction failed for Network {network_id}: StatementList node not found.\n" if statement_list_node: print(f" Reconstruyendo STL desde StatementList para red {network_id}...") # Llama a la nueva función de reconstrucción STL reconstructed_stl = reconstruct_stl_from_statementlist(statement_list_node[0]) # print(f" ... STL reconstruido (parcial):\n{reconstructed_stl[:200]}...") # Preview opcional else: print(f" Advertencia: No se encontró nodo para red STL {network_id}.") # Guardar como un chunk de texto crudo parsed_network_data = { "id": network_id, "title": network_title, "comment": network_comment, "language": "STL", # Indicar que es STL "logic": [ { "instruction_uid": f"STL_{network_id}", # UID inventado "type": "RAW_STL_CHUNK", # Nuevo tipo para identificarlo "stl": reconstructed_stl, # Guardar el texto reconstruido } ], } elif programming_language in ["LAD", "FBD"]: # Para LAD/FBD, llamar a parse_network (que espera FlgNet dentro de NetworkSource) # parse_network ya maneja su propio título/comentario si es necesario, pero podemos pasar los extraídos # Nota: parse_network espera el *CompileUnit* element, no el NetworkSource parsed_network_data = parse_network(network_elem) if parsed_network_data: parsed_network_data["language"] = ( programming_language # Asegurar que el lenguaje se guarda ) if parsed_network_data.get("error"): print( f" Error al parsear red {programming_language} ID={network_id}: {parsed_network_data['error']}" ) # parsed_network_data = None # Descomentar para omitir redes con error else: print( f" Error: parse_network devolvió None para red {programming_language} ID={network_id}" ) else: # Manejar otros lenguajes o casos inesperados print( f" Advertencia: Lenguaje no soportado '{programming_language}' en red ID={network_id}. Creando placeholder." ) parsed_network_data = { "id": network_id, "title": network_title, "comment": network_comment, "language": programming_language, "logic": [ { "instruction_uid": f"UNS_{network_id}", "type": "UNSUPPORTED_LANG", "scl": f"// Network {network_id} uses unsupported language: {programming_language}\n", } ], } # Añadir la red procesada (si es válida) al resultado if parsed_network_data: result["networks"].append(parsed_network_data) # --- Fin del bucle for network_elem --- if networks_processed_count == 0: print( "Advertencia: ObjectList no contenía elementos SW.Blocks.CompileUnit." ) else: print("Advertencia: No se encontró ObjectList para el bloque.") print("Paso 6: Escribiendo el resultado en el archivo JSON...") if not result["interface"]: print("ADVERTENCIA FINAL: 'interface' está vacía.") if not result["networks"]: print("ADVERTENCIA FINAL: 'networks' está vacía.") try: with open(json_filepath, "w", encoding="utf-8") as f: json.dump(result, f, indent=4, ensure_ascii=False) print("Paso 6: Escritura completada.") print(f"Conversión finalizada. JSON guardado en: '{json_filepath}'") except IOError as e: print( f"Error Crítico: No se pudo escribir JSON en '{json_filepath}'. Error: {e}" ) except TypeError as e: print(f"Error Crítico: Problema al serializar a JSON. Error: {e}") except etree.XMLSyntaxError as e: print( f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}" ) except Exception as e: print(f"Error Crítico: Error inesperado durante la conversión: {e}") print("--- Traceback ---") traceback.print_exc() print("--- Fin Traceback ---") if __name__ == "__main__": # Imports necesarios solo para la ejecución como script principal import argparse import os import sys # Configurar ArgumentParser para recibir la ruta del XML obligatoria parser = argparse.ArgumentParser( description="Convert Simatic XML (LAD/FBD/SCL/STL) to simplified JSON. Expects XML filepath as argument." ) parser.add_argument( "xml_filepath", # Argumento posicional obligatorio help="Path to the input XML file passed from the main script (x0_main.py).", ) args = parser.parse_args() # Parsea los argumentos de sys.argv xml_input_file = args.xml_filepath # Obtiene la ruta del argumento # Verificar si el archivo de entrada existe (es una buena práctica aunque x0 lo haga) if not os.path.exists(xml_input_file): print(f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'") sys.exit(1) # Salir si el archivo no existe # Derivar nombre base para archivo de salida JSON # El archivo JSON se guardará en el mismo directorio que el XML de entrada xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0] output_dir = os.path.dirname(xml_input_file) # Directorio del XML de entrada # Asegurarse de que el directorio de salida exista (aunque debería si el XML existe) os.makedirs(output_dir, exist_ok=True) json_output_file = os.path.join(output_dir, f"{xml_filename_base}_simplified.json") print(f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'") # Llamar a la función principal de conversión del script # Asumiendo que tu función principal se llama convert_xml_to_json(input_path, output_path) try: convert_xml_to_json(xml_input_file, json_output_file) except Exception as e: print(f"Error Crítico (x1) durante la conversión de '{xml_input_file}': {e}") import traceback traceback.print_exc() sys.exit(1) # Salir con error si la función principal falla