""" LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL Este script genera documentacion MD de Cross Reference para Obsidian """ # ToUpload/x4_cross_reference.py # -*- coding: utf-8 -*- import json import os import argparse import sys import traceback import glob import re import urllib.parse import shutil # <-- NUEVO: Para copiar archivos from collections import defaultdict script_root = os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(__file__))) ) sys.path.append(script_root) from backend.script_utils import load_configuration # --- Importar format_variable_name (sin cambios) --- try: current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) from generators.generator_utils import format_variable_name print("INFO: format_variable_name importado desde generators.generator_utils") except ImportError: print( "ADVERTENCIA: No se pudo importar format_variable_name desde generators. Usando copia local." ) def format_variable_name(name): # Fallback if not name: return "_INVALID_NAME_" if name.startswith('"') and name.endswith('"'): return name prefix = "#" if name.startswith("#") else "" if prefix: name = name[1:] if name and name[0].isdigit(): name = "_" + name name = re.sub(r"[^a-zA-Z0-9_]", "_", name) return prefix + name # --- Constantes --- # SCL_OUTPUT_DIRNAME = "scl_output" # Se leerá de config # XREF_SOURCE_SUBDIR = "source" # Se leerá de config # CALL_XREF_FILENAME = "xref_calls_tree.md" # Se leerá de config # DB_USAGE_XREF_FILENAME = "xref_db_usage_summary.md" # Se leerá de config # PLC_TAG_XREF_FILENAME = "xref_plc_tags_summary.md" # Se leerá de config # MAX_CALL_DEPTH = 5 # Se leerá de config INDENT_STEP = " " # MAX_USERS_LIST = 20 # Se leerá de config # --- Funciones de Análisis (find_calls_in_scl, find_db_tag_usage, find_plc_tag_usage sin cambios) --- # (Se omiten por brevedad, son las mismas de la versión anterior) def find_calls_in_scl(scl_code, block_data): calls = defaultdict(int) known_blocks = set(block_data.keys()) known_instances = set() for name, data in block_data.items(): block_info = data.get("data", {}) if block_info.get("block_type") == "FB": static_vars = block_info.get("interface", {}).get("Static", []) for var in static_vars: var_type = var.get("datatype", "") base_type = var_type.replace('"', "").split("[")[0].strip() if ( base_type in known_blocks and block_data[base_type]["data"].get("block_type") == "FB" ): known_instances.add(f'"{name}"."{var.get("name")}"') known_instances.add(f'"{var.get("name")}"') # Mejorable general_call_pattern = re.compile( r'\b(?])("?([a-zA-Z_#][a-zA-Z0-9_."]*)"?)\s*\(' ) system_funcs = { "IF", "WHILE", "FOR", "CASE", "REPEAT", "RETURN", "EXIT", "TRUE", "FALSE", "AND", "OR", "XOR", "NOT", "MOD", "ABS", "SQRT", "LN", "EXP", "SIN", "COS", "TAN", "ASIN", "ACOS", "ATAN", "CONCAT", "LEN", "LEFT", "RIGHT", "MID", "DELETE", "INSERT", "FIND", "REPLACE", "INT_TO_STRING", "STRING_TO_INT", "TON", "TOF", "TP", "CTU", "CTD", "CTUD", "BLKMOV", "ARRAY", "STRUCT", "VAR", "FUNCTION", "FUNCTION_BLOCK", "DATA_BLOCK", "BOOL", "INT", "DINT", "REAL", "STRING", "TIME", "DATE", "WORD", "BYTE", } for match in general_call_pattern.finditer(scl_code): potential_name_quoted = match.group(1) potential_name_clean = match.group(2) if potential_name_clean.upper() in system_funcs: continue is_instance_call = ( potential_name_clean.startswith("#") or potential_name_quoted in known_instances ) if is_instance_call: pass elif potential_name_clean in known_blocks: callee_type = block_data[potential_name_clean]["data"].get("block_type") if callee_type in ["FC", "FB"]: calls[potential_name_clean] += 1 return calls def find_db_tag_usage(scl_code): usage = defaultdict(lambda: defaultdict(int)) db_tag_pattern = re.compile( r'("([a-zA-Z0-9_ ]+)"|(DB\d+))\."?([a-zA-Z0-9_ ]+)"?(\s*\[.*?\]|\.\w+)*' ) write_pattern = re.compile(r"^\s*(.*?)\s*:=") lines = scl_code.splitlines() for line in lines: line_strip = line.strip() is_write = False match_write = write_pattern.match(line_strip) target_part = "" if match_write: is_write = True target_part = match_write.group(1).strip() for match in db_tag_pattern.finditer(line): db_part = match.group(1) tag_part = match.group(3) full_match = match.group(0) db_name = match.group(2) if match.group(2) else db_part tag_name = match.group(4) if match.group(4) else tag_part db_tag_key = f"{db_name}.{tag_name}" access_type = ( "write" if (is_write and target_part.startswith(full_match)) else "read" ) usage[db_tag_key][access_type] += 1 return usage def find_plc_tag_usage(scl_code, plc_tag_names_set): usage = defaultdict(lambda: defaultdict(int)) identifier_pattern = re.compile( r"""(? def copy_and_prepare_source_files(project_root_dir, xref_output_dir, scl_output_dirname, xref_source_subdir): """ Copia archivos .scl y .md desde scl_output a xref_output/source, convirtiendo .scl a .md con formato de bloque de código. Usa los nombres de directorios pasados como argumentos. """ scl_source_dir = os.path.join(project_root_dir, scl_output_dirname) md_target_dir = os.path.join(xref_output_dir, xref_source_subdir) if not os.path.isdir(scl_source_dir): print( f"Advertencia: Directorio '{scl_source_dir}' no encontrado. No se copiarán archivos fuente.", file=sys.stderr, ) return try: os.makedirs(md_target_dir, exist_ok=True) print( f"Copiando y preparando archivos fuente para Obsidian en: {md_target_dir}" ) except OSError as e: print( f"Error creando directorio de destino '{md_target_dir}': {e}", file=sys.stderr, ) return copied_count = 0 converted_count = 0 errors_count = 0 # Procesar archivos .scl scl_files = glob.glob(os.path.join(scl_source_dir, "*.scl")) for scl_path in scl_files: base_name = os.path.basename(scl_path) md_name = os.path.splitext(base_name)[0] + ".md" md_path = os.path.join(md_target_dir, md_name) try: with open(scl_path, "r", encoding="utf-8") as f_scl: scl_content = f_scl.read() # <-- MODIFICADO: Limpiar contenido SCL antes de envolverlo --> # Quitar posibles bloques de código Markdown anidados o incorrectos dentro del SCL scl_content_cleaned = scl_content.replace("```stl", "").replace("```", "") # Crear contenido Markdown md_content = f"```pascal\n{scl_content_cleaned}\n```\n" # <-- FIN MODIFICADO --> with open(md_path, "w", encoding="utf-8") as f_md: f_md.write(md_content) converted_count += 1 except Exception as e: print(f" Error procesando SCL '{base_name}': {e}", file=sys.stderr) errors_count += 1 # Procesar archivos .md (UDT, TagTable) md_files = glob.glob(os.path.join(scl_source_dir, "*.md")) for md_src_path in md_files: base_name = os.path.basename(md_src_path) md_dest_path = os.path.join(md_target_dir, base_name) try: # Simplemente copiar el archivo .md existente shutil.copy2(md_src_path, md_dest_path) # copy2 preserva metadatos copied_count += 1 except Exception as e: print(f" Error copiando MD '{base_name}': {e}", file=sys.stderr) errors_count += 1 print( f"Archivos fuente preparados: {converted_count} SCL convertidos, {copied_count} MD copiados." ) if errors_count > 0: print( f"ADVERTENCIA: Hubo {errors_count} errores durante la preparación de archivos fuente.", file=sys.stderr, ) # --- Funciones Árbol de Llamadas Modificadas (para apuntar a xref_output/source/*.md) --- # <-- MODIFICADO: get_scl_link --> def get_scl_link( block_name, block_entry, xref_source_subdir ): # Ya no necesita project_root_dir """ Genera un enlace Markdown relativo al archivo .md correspondiente DENTRO de xref_output/source. """ if not block_entry: return f"`{block_name}`" # El nombre del archivo destino siempre será .md md_filename = format_variable_name(block_name) + ".md" # Asegurar que format_variable_name esté disponible # La ruta siempre estará dentro del subdirectorio fuente de xref link_target_path = f"{xref_source_subdir}/{md_filename}" # Codificar para URL/Markdown try: # La ruta relativa desde xref_output_dir a xref_output_dir/source/file.md es solo source/file.md encoded_path = urllib.parse.quote( link_target_path ) # No necesita replace(os.sep, '/') return f"[`{block_name}`]({encoded_path})" except Exception as e: print(f"Error generando enlace para {block_name}: {e}") return f"`{block_name}` (error al generar enlace)" # <-- MODIFICADO: build_call_tree_recursive (ya no necesita project_root_dir) --> def build_call_tree_recursive( # Añadido max_call_depth, xref_source_subdir current_node, call_graph, block_data, output_lines, visited_in_path, base_xref_dir, current_depth=0, max_call_depth=5, xref_source_subdir="source" ): """ Función recursiva para construir el árbol de llamadas indentado CON ENLACES a los archivos .md en xref_output/source. """ indent = INDENT_STEP * current_depth block_entry = block_data.get(current_node) # Llamar a get_scl_link modificado node_link = get_scl_link(current_node, block_entry, xref_source_subdir) output_lines.append(f"{indent}- {node_link}") if current_depth >= max_call_depth: output_lines.append( f"{indent}{INDENT_STEP}[... Profundidad máxima alcanzada ...]" ) return if current_node in visited_in_path: output_lines.append(f"{indent}{INDENT_STEP}[... Recursión detectada ...]") return visited_in_path.add(current_node) if current_node in call_graph: callees = sorted(call_graph[current_node].keys()) for callee in callees: # Llamada recursiva build_call_tree_recursive( callee, call_graph, block_data, output_lines, visited_in_path.copy(), base_xref_dir, # base_xref_dir no se usa en la recursión, podría quitarse current_depth + 1, max_call_depth=max_call_depth, # Pasar parámetro xref_source_subdir=xref_source_subdir # Pasar parámetro ) # <-- MODIFICADO: generate_call_tree_output (ya no necesita project_root_dir) --> def generate_call_tree_output(call_graph, block_data, base_xref_dir, max_call_depth, xref_source_subdir): # Añadido max_call_depth, xref_source_subdir """ Genera las líneas de texto para el archivo de árbol de llamadas CON ENLACES a los archivos .md en xref_output/source. """ output_lines = ["# Árbol de Referencias Cruzadas de Llamadas\n"] output_lines.append(f"(Profundidad máxima: {MAX_CALL_DEPTH})\n") root_nodes = sorted( # Encontrar OBs [ name for name, data in block_data.items() if data.get("data", {}).get("block_type") == "OB" ] ) if not root_nodes: output_lines.append("\nNo se encontraron OBs como puntos de entrada.") else: output_lines.append("\n## Puntos de Entrada (OBs)\n") for ob_name in root_nodes: ob_entry = block_data.get(ob_name) ob_link = get_scl_link( ob_name, ob_entry, xref_source_subdir ) # Llamar a get_scl_link modificado output_lines.append(f"\n### Iniciando desde: {ob_link}\n") build_call_tree_recursive( ob_name, call_graph, block_data, output_lines, set(), base_xref_dir, # No se usa en recursión current_depth=0, max_call_depth=max_call_depth, # Pasar parámetro xref_source_subdir=xref_source_subdir # Pasar parámetro ) all_callers = set(call_graph.keys()) all_callees = set(c for v in call_graph.values() for c in v) all_in_graph = all_callers.union(all_callees) code_blocks = { n for n, d in block_data.items() if d.get("data", {}).get("block_type") in ["FC", "FB"] } unreached = sorted(list(code_blocks - all_in_graph - set(root_nodes))) if unreached: output_lines.append( "\n## Bloques (FC/FB) No Referenciados Directamente desde OBs\n" ) for block_name in unreached: block_entry = block_data.get(block_name) block_link = get_scl_link( block_name, block_entry, xref_source_subdir ) # Llamar a get_scl_link modificado output_lines.append(f"- {block_link}") return output_lines # --- Funciones para Salida Resumida (generate_db_usage_summary_output, generate_plc_tag_summary_output SIN CAMBIOS) --- # (Se omiten por brevedad) def generate_db_usage_summary_output(db_users, max_users_list): # Añadido max_users_list """Genera las líneas para el archivo Markdown de resumen de uso de DBs.""" output_lines = ["# Resumen de Uso de DB Globales por Bloque\n\n"] if not db_users: output_lines.append( "Ningún DB global parece ser utilizado por bloques de código.\n" ) else: for db_name in sorted(db_users.keys()): users_set = db_users[db_name] users_list = sorted(list(users_set)) output_lines.append(f"## DB: `{db_name}`\n") if not users_list: output_lines.append("- No utilizado directamente.\n") else: output_lines.append("Utilizado por:\n") display_users = users_list[:max_users_list] # Usar parámetro remaining_count = len(users_list) - len(display_users) for user_block in display_users: output_lines.append(f"- `{user_block}`") if remaining_count > 0: output_lines.append(f"- ... (y {remaining_count} más)") output_lines.append("") return output_lines def generate_plc_tag_summary_output(plc_tag_users, max_users_list): # Añadido max_users_list """Genera las líneas para el archivo Markdown de resumen de uso de PLC Tags.""" output_lines = ["# Resumen de Uso de PLC Tags Globales por Bloque\n\n"] if not plc_tag_users: output_lines.append( "Ningún PLC Tag global parece ser utilizado por bloques de código.\n" ) else: for tag_name in sorted(plc_tag_users.keys()): users_set = plc_tag_users[tag_name] users_list = sorted(list(users_set)) output_lines.append(f"## PLC Tag: `{tag_name}`\n") if not users_list: output_lines.append("- No utilizado.\n") else: output_lines.append("Utilizado por:\n") display_users = users_list[:max_users_list] # Usar parámetro remaining_count = len(users_list) - len(display_users) for user_block in display_users: output_lines.append(f"- `{user_block}`") if remaining_count > 0: output_lines.append(f"- ... (y {remaining_count} más)") output_lines.append("") return output_lines # --- Función Principal (MODIFICADA para llamar a copy_and_prepare_source_files) --- def generate_cross_references( project_root_dir, output_dir, scl_output_dirname, xref_source_subdir, call_xref_filename, db_usage_xref_filename, plc_tag_xref_filename, max_call_depth, max_users_list ): """ Genera archivos de referencias cruzadas y prepara archivos fuente (.md) para visualización en Obsidian. Utiliza los parámetros de configuración pasados como argumentos. """ print(f"--- Iniciando Generación de Referencias Cruzadas y Fuentes MD (x4) ---") print(f"Buscando archivos JSON procesados en: {project_root_dir}") print(f"Directorio de salida XRef: {output_dir}") print(f"Directorio fuente SCL/MD: {scl_output_dirname}") print(f"Subdirectorio fuentes MD para XRef: {xref_source_subdir}") output_dir_abs = os.path.abspath(output_dir) # <-- NUEVO: Crear directorio y preparar archivos fuente ANTES de generar XRefs --> # Pasar los nombres de directorios leídos de la config copy_and_prepare_source_files(project_root_dir, output_dir_abs, scl_output_dirname, xref_source_subdir) # <-- FIN NUEVO --> json_files = glob.glob( os.path.join(project_root_dir, "**", "*_processed.json"), recursive=True ) if not json_files: print("Error: No se encontraron archivos '*_processed.json'.", file=sys.stderr) return False print(f"Archivos JSON encontrados: {len(json_files)}") # 1. Cargar datos (sin cambios) block_data = {} all_db_names = set() plc_tag_names = set() for f_path in json_files: try: with open(f_path, "r", encoding="utf-8") as f: data = json.load(f) block_name = data.get("block_name") block_type = data.get("block_type") if block_name: block_data[block_name] = {"data": data, "json_path": f_path} if block_type == "GlobalDB": all_db_names.add(block_name) elif block_type == "PlcTagTable": [ plc_tag_names.add(tag["name"]) for tag in data.get("tags", []) if tag.get("name") ] else: print( f"Advertencia: JSON sin 'block_name': {f_path}", file=sys.stderr ) except Exception as e: print(f"Error procesando {f_path}: {e}", file=sys.stderr) traceback.print_exc(file=sys.stderr) if not block_data: print("Error: No se pudieron cargar datos.", file=sys.stderr) return False print( f"Datos cargados para {len(block_data)} bloques ({len(plc_tag_names)} PLC Tags globales)." ) # 2. Analizar datos (sin cambios) call_graph = defaultdict(lambda: defaultdict(int)) db_users = defaultdict(set) plc_tag_users = defaultdict(set) print("Analizando llamadas y uso de DBs/PLC Tags...") for block_name, block_entry in block_data.items(): data = block_entry["data"] block_type = data.get("block_type") if block_type not in ["OB", "FC", "FB"]: continue caller_name = block_name for network in data.get("networks", []): combined_scl = "" network_has_code = False for instruction in network.get("logic", []): if not instruction.get("grouped", False): scl_code = instruction.get("scl", "") edge_update_code = instruction.get("_edge_mem_update_scl", "") if scl_code or edge_update_code: network_has_code = True combined_scl += ( (scl_code or "") + "\n" + (edge_update_code or "") + "\n" ) if not network_has_code: continue calls_found = find_calls_in_scl(combined_scl, block_data) for callee_name, count in calls_found.items(): if callee_name in block_data and block_data[callee_name]["data"].get( "block_type" ) in ["FC", "FB"]: call_graph[caller_name][callee_name] += count db_usage_found = find_db_tag_usage(combined_scl) for db_tag, access_counts in db_usage_found.items(): db_name_part = db_tag.split(".")[0] if db_name_part in all_db_names or ( db_name_part.startswith("DB") and db_name_part[2:].isdigit() ): db_users[db_name_part].add(caller_name) plc_usage_found = find_plc_tag_usage(combined_scl, plc_tag_names) for plc_tag, access_counts in plc_usage_found.items(): plc_tag_users[plc_tag].add(caller_name) # 3. Generar Archivos de Salida XRef (MODIFICADO para usar la nueva función de árbol) os.makedirs(output_dir_abs, exist_ok=True) call_xref_path = os.path.join(output_dir_abs, call_xref_filename) # Usar parámetro db_usage_xref_path = os.path.join(output_dir_abs, db_usage_xref_filename) # Usar parámetro plc_tag_xref_path = os.path.join(output_dir_abs, plc_tag_xref_filename) # Usar parámetro print(f"Generando ÁRBOL XRef de llamadas en: {call_xref_path}") try: # <-- MODIFICADO: Llamar a la nueva función sin project_root_dir --> call_tree_lines = generate_call_tree_output( # Pasar parámetros call_graph, block_data, output_dir_abs ) with open(call_xref_path, "w", encoding="utf-8") as f: [f.write(line + "\n") for line in call_tree_lines] except Exception as e: print( f"Error al generar/escribir el ÁRBOL XRef de llamadas: {e}", file=sys.stderr ) traceback.print_exc(file=sys.stderr) # Generar Resumen de Uso de DB (sin cambios aquí) print(f"Generando RESUMEN XRef de uso de DBs en: {db_usage_xref_path}") try: db_summary_lines = generate_db_usage_summary_output(db_users, max_users_list) # Pasar parámetro with open(db_usage_xref_path, "w", encoding="utf-8") as f: [f.write(line + "\n") for line in db_summary_lines] except Exception as e: print( f"Error al generar/escribir el RESUMEN XRef de uso de DB: {e}", file=sys.stderr, ) traceback.print_exc(file=sys.stderr) # Generar Resumen de Uso de PLC Tags (sin cambios aquí) print(f"Generando RESUMEN XRef de uso de PLC Tags en: {plc_tag_xref_path}") try: plc_tag_lines = generate_plc_tag_summary_output(plc_tag_users, max_users_list) # Pasar parámetro with open(plc_tag_xref_path, "w", encoding="utf-8") as f: [f.write(line + "\n") for line in plc_tag_lines] except Exception as e: print( f"Error al generar/escribir el RESUMEN XRef de uso de PLC Tags: {e}", file=sys.stderr, ) traceback.print_exc(file=sys.stderr) print("--- Generación de Referencias Cruzadas y Fuentes MD (x4) Completada ---") return True # --- Punto de Entrada (sin cambios) --- if __name__ == "__main__": print("(x4 - Standalone) Ejecutando generación de referencias cruzadas...") # Cargar configuración para obtener rutas configs = load_configuration() working_directory = configs.get("working_directory") # Acceder a la configuración específica del grupo group_config = configs.get("level2", {}) # Leer parámetros con valores por defecto (usando los defaults del esquema como guía) # Parámetros necesarios para x4 cfg_scl_output_dirname = group_config.get("scl_output_dir", "scl_output") cfg_xref_output_dirname = group_config.get("xref_output_dir", "xref_output") cfg_xref_source_subdir = group_config.get("xref_source_subdir", "source") cfg_call_xref_filename = group_config.get("call_xref_filename", "xref_calls_tree.md") cfg_db_usage_xref_filename = group_config.get("db_usage_xref_filename", "xref_db_usage_summary.md") cfg_plc_tag_xref_filename = group_config.get("plc_tag_xref_filename", "xref_plc_tags_summary.md") cfg_max_call_depth = group_config.get("max_call_depth", 5) cfg_max_users_list = group_config.get("max_users_list", 20) # Calcular rutas if not working_directory: print("Error: 'working_directory' no encontrado en la configuración.", file=sys.stderr) # No usamos sys.exit(1) else: # Calcular rutas basadas en la configuración plc_subdir_name = "PLC" # Asumir nombre estándar project_root_dir = os.path.join(working_directory, plc_subdir_name) xref_output_dir = os.path.join(project_root_dir, cfg_xref_output_dirname) # Usar nombre de dir leído if not os.path.isdir(project_root_dir): print(f"Error: Directorio del proyecto '{project_root_dir}' no encontrado.", file=sys.stderr) else: # Llamar a la función principal success = generate_cross_references( project_root_dir, xref_output_dir, cfg_scl_output_dirname, cfg_xref_source_subdir, cfg_call_xref_filename, cfg_db_usage_xref_filename, cfg_plc_tag_xref_filename, cfg_max_call_depth, cfg_max_users_list ) if success: print("\n(x4 - Standalone) Proceso completado exitosamente.") else: print("\n(x4 - Standalone) Proceso finalizado con errores.", file=sys.stderr)