ParamManagerScripts/backend/script_groups/XML Parser to SCL/x4_cross_reference.py

675 lines
24 KiB
Python

"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script genera documentacion MD de Cross Reference para Obsidian
"""
# ToUpload/x4_cross_reference.py
# -*- coding: utf-8 -*-
import json
import os
import argparse
import sys
import traceback
import glob
import re
import urllib.parse
import shutil # <-- NUEVO: Para copiar archivos
from collections import defaultdict
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Importar format_variable_name (sin cambios) ---
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from generators.generator_utils import format_variable_name
print("INFO: format_variable_name importado desde generators.generator_utils")
except ImportError:
print(
"ADVERTENCIA: No se pudo importar format_variable_name desde generators. Usando copia local."
)
def format_variable_name(name): # Fallback
if not name:
return "_INVALID_NAME_"
if name.startswith('"') and name.endswith('"'):
return name
prefix = "#" if name.startswith("#") else ""
if prefix:
name = name[1:]
if name and name[0].isdigit():
name = "_" + name
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
# --- Constantes ---
SCL_OUTPUT_DIRNAME = "scl_output"
XREF_SOURCE_SUBDIR = "source" # <-- NUEVO: Subdirectorio para fuentes MD
CALL_XREF_FILENAME = "xref_calls_tree.md"
DB_USAGE_XREF_FILENAME = "xref_db_usage_summary.md"
PLC_TAG_XREF_FILENAME = "xref_plc_tags_summary.md"
MAX_CALL_DEPTH = 5
INDENT_STEP = " "
MAX_USERS_LIST = 20
# --- Funciones de Análisis (find_calls_in_scl, find_db_tag_usage, find_plc_tag_usage sin cambios) ---
# (Se omiten por brevedad, son las mismas de la versión anterior)
def find_calls_in_scl(scl_code, block_data):
calls = defaultdict(int)
known_blocks = set(block_data.keys())
known_instances = set()
for name, data in block_data.items():
block_info = data.get("data", {})
if block_info.get("block_type") == "FB":
static_vars = block_info.get("interface", {}).get("Static", [])
for var in static_vars:
var_type = var.get("datatype", "")
base_type = var_type.replace('"', "").split("[")[0].strip()
if (
base_type in known_blocks
and block_data[base_type]["data"].get("block_type") == "FB"
):
known_instances.add(f'"{name}"."{var.get("name")}"')
known_instances.add(f'"{var.get("name")}"') # Mejorable
general_call_pattern = re.compile(
r'\b(?<![:=<>])("?([a-zA-Z_#][a-zA-Z0-9_."]*)"?)\s*\('
)
system_funcs = {
"IF",
"WHILE",
"FOR",
"CASE",
"REPEAT",
"RETURN",
"EXIT",
"TRUE",
"FALSE",
"AND",
"OR",
"XOR",
"NOT",
"MOD",
"ABS",
"SQRT",
"LN",
"EXP",
"SIN",
"COS",
"TAN",
"ASIN",
"ACOS",
"ATAN",
"CONCAT",
"LEN",
"LEFT",
"RIGHT",
"MID",
"DELETE",
"INSERT",
"FIND",
"REPLACE",
"INT_TO_STRING",
"STRING_TO_INT",
"TON",
"TOF",
"TP",
"CTU",
"CTD",
"CTUD",
"BLKMOV",
"ARRAY",
"STRUCT",
"VAR",
"FUNCTION",
"FUNCTION_BLOCK",
"DATA_BLOCK",
"BOOL",
"INT",
"DINT",
"REAL",
"STRING",
"TIME",
"DATE",
"WORD",
"BYTE",
}
for match in general_call_pattern.finditer(scl_code):
potential_name_quoted = match.group(1)
potential_name_clean = match.group(2)
if potential_name_clean.upper() in system_funcs:
continue
is_instance_call = (
potential_name_clean.startswith("#")
or potential_name_quoted in known_instances
)
if is_instance_call:
pass
elif potential_name_clean in known_blocks:
callee_type = block_data[potential_name_clean]["data"].get("block_type")
if callee_type in ["FC", "FB"]:
calls[potential_name_clean] += 1
return calls
def find_db_tag_usage(scl_code):
usage = defaultdict(lambda: defaultdict(int))
db_tag_pattern = re.compile(
r'("([a-zA-Z0-9_ ]+)"|(DB\d+))\."?([a-zA-Z0-9_ ]+)"?(\s*\[.*?\]|\.\w+)*'
)
write_pattern = re.compile(r"^\s*(.*?)\s*:=")
lines = scl_code.splitlines()
for line in lines:
line_strip = line.strip()
is_write = False
match_write = write_pattern.match(line_strip)
target_part = ""
if match_write:
is_write = True
target_part = match_write.group(1).strip()
for match in db_tag_pattern.finditer(line):
db_part = match.group(1)
tag_part = match.group(3)
full_match = match.group(0)
db_name = match.group(2) if match.group(2) else db_part
tag_name = match.group(4) if match.group(4) else tag_part
db_tag_key = f"{db_name}.{tag_name}"
access_type = (
"write" if (is_write and target_part.startswith(full_match)) else "read"
)
usage[db_tag_key][access_type] += 1
return usage
def find_plc_tag_usage(scl_code, plc_tag_names_set):
usage = defaultdict(lambda: defaultdict(int))
identifier_pattern = re.compile(
r"""(?<![."#\d])("([a-zA-Z_][a-zA-Z0-9_ .]*)"|([a-zA-Z_][a-zA-Z0-9_.]*))(?![(\[])""",
re.VERBOSE,
)
write_pattern = re.compile(r"^\s*(.*?)\s*:=")
lines = scl_code.splitlines()
for line in lines:
line_strip = line.strip()
is_write = False
match_write = write_pattern.match(line_strip)
target_part = ""
if match_write:
is_write = True
target_part = match_write.group(1).strip()
for match in identifier_pattern.finditer(line):
full_match = match.group(1)
tag_name_candidate = match.group(2) if match.group(2) else match.group(3)
if (
tag_name_candidate in plc_tag_names_set
or full_match in plc_tag_names_set
):
tag_key = full_match
access_type = (
"write"
if (is_write and target_part.startswith(full_match))
else "read"
)
usage[tag_key][access_type] += 1
return usage
# <-- NUEVA FUNCION -->
def copy_and_prepare_source_files(project_root_dir, xref_output_dir):
"""
Copia archivos .scl y .md desde scl_output a xref_output/source,
convirtiendo .scl a .md con formato de bloque de código.
"""
scl_source_dir = os.path.join(project_root_dir, SCL_OUTPUT_DIRNAME)
md_target_dir = os.path.join(xref_output_dir, XREF_SOURCE_SUBDIR)
if not os.path.isdir(scl_source_dir):
print(
f"Advertencia: Directorio '{scl_source_dir}' no encontrado. No se copiarán archivos fuente.",
file=sys.stderr,
)
return
try:
os.makedirs(md_target_dir, exist_ok=True)
print(
f"Copiando y preparando archivos fuente para Obsidian en: {md_target_dir}"
)
except OSError as e:
print(
f"Error creando directorio de destino '{md_target_dir}': {e}",
file=sys.stderr,
)
return
copied_count = 0
converted_count = 0
errors_count = 0
# Procesar archivos .scl
scl_files = glob.glob(os.path.join(scl_source_dir, "*.scl"))
for scl_path in scl_files:
base_name = os.path.basename(scl_path)
md_name = os.path.splitext(base_name)[0] + ".md"
md_path = os.path.join(md_target_dir, md_name)
try:
with open(scl_path, "r", encoding="utf-8") as f_scl:
scl_content = f_scl.read()
# <-- MODIFICADO: Limpiar contenido SCL antes de envolverlo -->
# Quitar posibles bloques de código Markdown anidados o incorrectos dentro del SCL
scl_content_cleaned = scl_content.replace("```stl", "").replace("```", "")
# Crear contenido Markdown
md_content = f"```pascal\n{scl_content_cleaned}\n```\n"
# <-- FIN MODIFICADO -->
with open(md_path, "w", encoding="utf-8") as f_md:
f_md.write(md_content)
converted_count += 1
except Exception as e:
print(f" Error procesando SCL '{base_name}': {e}", file=sys.stderr)
errors_count += 1
# Procesar archivos .md (UDT, TagTable)
md_files = glob.glob(os.path.join(scl_source_dir, "*.md"))
for md_src_path in md_files:
base_name = os.path.basename(md_src_path)
md_dest_path = os.path.join(md_target_dir, base_name)
try:
# Simplemente copiar el archivo .md existente
shutil.copy2(md_src_path, md_dest_path) # copy2 preserva metadatos
copied_count += 1
except Exception as e:
print(f" Error copiando MD '{base_name}': {e}", file=sys.stderr)
errors_count += 1
print(
f"Archivos fuente preparados: {converted_count} SCL convertidos, {copied_count} MD copiados."
)
if errors_count > 0:
print(
f"ADVERTENCIA: Hubo {errors_count} errores durante la preparación de archivos fuente.",
file=sys.stderr,
)
# --- Funciones Árbol de Llamadas Modificadas (para apuntar a xref_output/source/*.md) ---
# <-- MODIFICADO: get_scl_link -->
def get_scl_link(
block_name, block_entry, base_xref_dir
): # Ya no necesita project_root_dir
"""
Genera un enlace Markdown relativo al archivo .md correspondiente DENTRO de xref_output/source.
"""
if not block_entry:
return f"`{block_name}`"
# El nombre del archivo destino siempre será .md
md_filename = format_variable_name(block_name) + ".md"
# La ruta siempre estará dentro del subdirectorio 'source'
link_target_path = f"{XREF_SOURCE_SUBDIR}/{md_filename}"
# Codificar para URL/Markdown
try:
# La ruta relativa desde xref_output_dir a xref_output_dir/source/file.md es solo source/file.md
encoded_path = urllib.parse.quote(
link_target_path
) # No necesita replace(os.sep, '/')
return f"[`{block_name}`]({encoded_path})"
except Exception as e:
print(f"Error generando enlace para {block_name}: {e}")
return f"`{block_name}` (error al generar enlace)"
# <-- MODIFICADO: build_call_tree_recursive (ya no necesita project_root_dir) -->
def build_call_tree_recursive(
current_node,
call_graph,
block_data,
output_lines,
visited_in_path,
base_xref_dir,
current_depth=0,
):
"""
Función recursiva para construir el árbol de llamadas indentado CON ENLACES
a los archivos .md en xref_output/source.
"""
indent = INDENT_STEP * current_depth
block_entry = block_data.get(current_node)
# Llamar a get_scl_link modificado
node_link = get_scl_link(current_node, block_entry, base_xref_dir)
output_lines.append(f"{indent}- {node_link}")
if current_depth >= MAX_CALL_DEPTH:
output_lines.append(
f"{indent}{INDENT_STEP}[... Profundidad máxima alcanzada ...]"
)
return
if current_node in visited_in_path:
output_lines.append(f"{indent}{INDENT_STEP}[... Recursión detectada ...]")
return
visited_in_path.add(current_node)
if current_node in call_graph:
callees = sorted(call_graph[current_node].keys())
for callee in callees:
# Llamada recursiva
build_call_tree_recursive(
callee,
call_graph,
block_data,
output_lines,
visited_in_path.copy(),
base_xref_dir,
current_depth + 1,
)
# <-- MODIFICADO: generate_call_tree_output (ya no necesita project_root_dir) -->
def generate_call_tree_output(call_graph, block_data, base_xref_dir):
"""
Genera las líneas de texto para el archivo de árbol de llamadas CON ENLACES
a los archivos .md en xref_output/source.
"""
output_lines = ["# Árbol de Referencias Cruzadas de Llamadas\n"]
output_lines.append(f"(Profundidad máxima: {MAX_CALL_DEPTH})\n")
root_nodes = sorted(
[
name
for name, data in block_data.items()
if data.get("data", {}).get("block_type") == "OB"
]
)
if not root_nodes:
output_lines.append("\nNo se encontraron OBs como puntos de entrada.")
else:
output_lines.append("\n## Puntos de Entrada (OBs)\n")
for ob_name in root_nodes:
ob_entry = block_data.get(ob_name)
ob_link = get_scl_link(
ob_name, ob_entry, base_xref_dir
) # Llamar a get_scl_link modificado
output_lines.append(f"\n### Iniciando desde: {ob_link}\n")
build_call_tree_recursive(
ob_name,
call_graph,
block_data,
output_lines,
set(),
base_xref_dir,
current_depth=0,
)
all_callers = set(call_graph.keys())
all_callees = set(c for v in call_graph.values() for c in v)
all_in_graph = all_callers.union(all_callees)
code_blocks = {
n
for n, d in block_data.items()
if d.get("data", {}).get("block_type") in ["FC", "FB"]
}
unreached = sorted(list(code_blocks - all_in_graph - set(root_nodes)))
if unreached:
output_lines.append(
"\n## Bloques (FC/FB) No Referenciados Directamente desde OBs\n"
)
for block_name in unreached:
block_entry = block_data.get(block_name)
block_link = get_scl_link(
block_name, block_entry, base_xref_dir
) # Llamar a get_scl_link modificado
output_lines.append(f"- {block_link}")
return output_lines
# --- Funciones para Salida Resumida (generate_db_usage_summary_output, generate_plc_tag_summary_output SIN CAMBIOS) ---
# (Se omiten por brevedad)
def generate_db_usage_summary_output(db_users):
"""Genera las líneas para el archivo Markdown de resumen de uso de DBs."""
output_lines = ["# Resumen de Uso de DB Globales por Bloque\n\n"]
if not db_users:
output_lines.append(
"Ningún DB global parece ser utilizado por bloques de código.\n"
)
else:
for db_name in sorted(db_users.keys()):
users_set = db_users[db_name]
users_list = sorted(list(users_set))
output_lines.append(f"## DB: `{db_name}`\n")
if not users_list:
output_lines.append("- No utilizado directamente.\n")
else:
output_lines.append("Utilizado por:\n")
display_users = users_list[:MAX_USERS_LIST]
remaining_count = len(users_list) - len(display_users)
for user_block in display_users:
output_lines.append(f"- `{user_block}`")
if remaining_count > 0:
output_lines.append(f"- ... (y {remaining_count} más)")
output_lines.append("")
return output_lines
def generate_plc_tag_summary_output(plc_tag_users):
"""Genera las líneas para el archivo Markdown de resumen de uso de PLC Tags."""
output_lines = ["# Resumen de Uso de PLC Tags Globales por Bloque\n\n"]
if not plc_tag_users:
output_lines.append(
"Ningún PLC Tag global parece ser utilizado por bloques de código.\n"
)
else:
for tag_name in sorted(plc_tag_users.keys()):
users_set = plc_tag_users[tag_name]
users_list = sorted(list(users_set))
output_lines.append(f"## PLC Tag: `{tag_name}`\n")
if not users_list:
output_lines.append("- No utilizado.\n")
else:
output_lines.append("Utilizado por:\n")
display_users = users_list[:MAX_USERS_LIST]
remaining_count = len(users_list) - len(display_users)
for user_block in display_users:
output_lines.append(f"- `{user_block}`")
if remaining_count > 0:
output_lines.append(f"- ... (y {remaining_count} más)")
output_lines.append("")
return output_lines
# --- Función Principal (MODIFICADA para llamar a copy_and_prepare_source_files) ---
def generate_cross_references(project_root_dir, output_dir):
"""
Genera archivos de referencias cruzadas y prepara archivos fuente (.md)
para visualización en Obsidian.
"""
print(f"--- Iniciando Generación de Referencias Cruzadas y Fuentes MD (x4) ---")
print(f"Buscando archivos JSON procesados en: {project_root_dir}")
print(f"Directorio de salida XRef: {output_dir}")
output_dir_abs = os.path.abspath(output_dir)
# <-- NUEVO: Crear directorio y preparar archivos fuente ANTES de generar XRefs -->
copy_and_prepare_source_files(project_root_dir, output_dir_abs)
# <-- FIN NUEVO -->
json_files = glob.glob(
os.path.join(project_root_dir, "**", "*_processed.json"), recursive=True
)
if not json_files:
print("Error: No se encontraron archivos '*_processed.json'.", file=sys.stderr)
return False
print(f"Archivos JSON encontrados: {len(json_files)}")
# 1. Cargar datos (sin cambios)
block_data = {}
all_db_names = set()
plc_tag_names = set()
for f_path in json_files:
try:
with open(f_path, "r", encoding="utf-8") as f:
data = json.load(f)
block_name = data.get("block_name")
block_type = data.get("block_type")
if block_name:
block_data[block_name] = {"data": data, "json_path": f_path}
if block_type == "GlobalDB":
all_db_names.add(block_name)
elif block_type == "PlcTagTable":
[
plc_tag_names.add(tag["name"])
for tag in data.get("tags", [])
if tag.get("name")
]
else:
print(
f"Advertencia: JSON sin 'block_name': {f_path}", file=sys.stderr
)
except Exception as e:
print(f"Error procesando {f_path}: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
if not block_data:
print("Error: No se pudieron cargar datos.", file=sys.stderr)
return False
print(
f"Datos cargados para {len(block_data)} bloques ({len(plc_tag_names)} PLC Tags globales)."
)
# 2. Analizar datos (sin cambios)
call_graph = defaultdict(lambda: defaultdict(int))
db_users = defaultdict(set)
plc_tag_users = defaultdict(set)
print("Analizando llamadas y uso de DBs/PLC Tags...")
for block_name, block_entry in block_data.items():
data = block_entry["data"]
block_type = data.get("block_type")
if block_type not in ["OB", "FC", "FB"]:
continue
caller_name = block_name
for network in data.get("networks", []):
combined_scl = ""
network_has_code = False
for instruction in network.get("logic", []):
if not instruction.get("grouped", False):
scl_code = instruction.get("scl", "")
edge_update_code = instruction.get("_edge_mem_update_scl", "")
if scl_code or edge_update_code:
network_has_code = True
combined_scl += (
(scl_code or "") + "\n" + (edge_update_code or "") + "\n"
)
if not network_has_code:
continue
calls_found = find_calls_in_scl(combined_scl, block_data)
for callee_name, count in calls_found.items():
if callee_name in block_data and block_data[callee_name]["data"].get(
"block_type"
) in ["FC", "FB"]:
call_graph[caller_name][callee_name] += count
db_usage_found = find_db_tag_usage(combined_scl)
for db_tag, access_counts in db_usage_found.items():
db_name_part = db_tag.split(".")[0]
if db_name_part in all_db_names or (
db_name_part.startswith("DB") and db_name_part[2:].isdigit()
):
db_users[db_name_part].add(caller_name)
plc_usage_found = find_plc_tag_usage(combined_scl, plc_tag_names)
for plc_tag, access_counts in plc_usage_found.items():
plc_tag_users[plc_tag].add(caller_name)
# 3. Generar Archivos de Salida XRef (MODIFICADO para usar la nueva función de árbol)
os.makedirs(output_dir_abs, exist_ok=True)
call_xref_path = os.path.join(output_dir_abs, CALL_XREF_FILENAME)
db_usage_xref_path = os.path.join(output_dir_abs, DB_USAGE_XREF_FILENAME)
plc_tag_xref_path = os.path.join(output_dir_abs, PLC_TAG_XREF_FILENAME)
print(f"Generando ÁRBOL XRef de llamadas en: {call_xref_path}")
try:
# <-- MODIFICADO: Llamar a la nueva función sin project_root_dir -->
call_tree_lines = generate_call_tree_output(
call_graph, block_data, output_dir_abs
)
with open(call_xref_path, "w", encoding="utf-8") as f:
[f.write(line + "\n") for line in call_tree_lines]
except Exception as e:
print(
f"Error al generar/escribir el ÁRBOL XRef de llamadas: {e}", file=sys.stderr
)
traceback.print_exc(file=sys.stderr)
# Generar Resumen de Uso de DB (sin cambios aquí)
print(f"Generando RESUMEN XRef de uso de DBs en: {db_usage_xref_path}")
try:
db_summary_lines = generate_db_usage_summary_output(db_users)
with open(db_usage_xref_path, "w", encoding="utf-8") as f:
[f.write(line + "\n") for line in db_summary_lines]
except Exception as e:
print(
f"Error al generar/escribir el RESUMEN XRef de uso de DB: {e}",
file=sys.stderr,
)
traceback.print_exc(file=sys.stderr)
# Generar Resumen de Uso de PLC Tags (sin cambios aquí)
print(f"Generando RESUMEN XRef de uso de PLC Tags en: {plc_tag_xref_path}")
try:
plc_tag_lines = generate_plc_tag_summary_output(plc_tag_users)
with open(plc_tag_xref_path, "w", encoding="utf-8") as f:
[f.write(line + "\n") for line in plc_tag_lines]
except Exception as e:
print(
f"Error al generar/escribir el RESUMEN XRef de uso de PLC Tags: {e}",
file=sys.stderr,
)
traceback.print_exc(file=sys.stderr)
print("--- Generación de Referencias Cruzadas y Fuentes MD (x4) Completada ---")
return True
# --- Punto de Entrada (sin cambios) ---
if __name__ == "__main__":
configs = load_configuration()
working_directory = configs.get("working_directory")
parser = argparse.ArgumentParser(
description="Genera refs cruzadas y prepara archivos fuente MD para Obsidian."
)
parser.add_argument("project_root_dir", help="Ruta dir raíz proyecto XML.")
parser.add_argument(
"-o",
"--output",
help="Directorio para guardar salida XRef (incluyendo subdir 'source').",
)
args = parser.parse_args()
if not os.path.isdir(args.project_root_dir):
print(
f"Error: Dir proyecto no existe: '{args.project_root_dir}'", file=sys.stderr
)
sys.exit(1)
if not args.output:
print(
"Error: Se requiere el argumento -o/--output para especificar el directorio de salida XRef.",
file=sys.stderr,
)
sys.exit(1)
output_destination = args.output
success = generate_cross_references(args.project_root_dir, output_destination)
if success:
print(
f"Archivos XRef y fuentes MD generados en: {os.path.abspath(output_destination)}"
)
sys.exit(0)
else:
print("Hubo errores durante la generación de refs cruzadas.", file=sys.stderr)
sys.exit(1)