ParamManagerScripts/backend/script_groups/XML Parser to SCL/x0_main.py

512 lines
22 KiB
Python

"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script convierte archivos XML de Siemens TIA Portal (LAD/FUP) a código SCL equivalente.
Utiliza una arquitectura modular para facilitar el mantenimiento y la extensión.
"""
# ToUpload/x0_main.py
import argparse
import subprocess
import os
import sys
import locale
import glob
import time
import traceback
import json
import datetime # <-- NUEVO: Para timestamps
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# <-- NUEVO: Importar funciones directamente -->
from x1_to_json import convert_xml_to_json
from x2_process import process_json_to_scl
from x3_generate_scl import generate_scl_or_markdown
# <-- NUEVO: Importar funciones de x4 y x5 -->
from x4_cross_reference import generate_cross_references # Asumiendo que x4_cross_reference.py tiene esta función
from x5_aggregate import aggregate_outputs
CONSOLE_ENCODING = "utf-8"
# <-- NUEVO: Importar format_variable_name (necesario para predecir nombre de salida) -->
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
from generators.generator_utils import format_variable_name
print("INFO: format_variable_name importado desde generators.generator_utils")
except ImportError:
print(
"ADVERTENCIA: No se pudo importar format_variable_name desde generators. Usando copia local."
)
import re
def format_variable_name(name):
if not name:
return "_INVALID_NAME_"
if name.startswith('"') and name.endswith('"'):
return name
prefix = "#" if name.startswith("#") else ""
if prefix:
name = name[1:]
if name and name[0].isdigit():
name = "_" + name
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
return prefix + name
# <-- NUEVO: Función de Logging -->
LOG_FILENAME = "log.txt"
def log_message(message, log_file_handle, also_print=True):
"""Escribe un mensaje en el archivo log y opcionalmente en la consola."""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[
:-3
] # Incluye milisegundos
log_line = f"{timestamp} - {message}"
try:
log_file_handle.write(log_line + "\n")
log_file_handle.flush() # Asegurar escritura inmediata
except Exception as e:
# Fallback si falla escritura en log
print(f"{timestamp} - LOGGING ERROR: {e}", file=sys.stderr)
print(f"{timestamp} - ORIGINAL MSG: {message}", file=sys.stderr)
if also_print:
print(message) # Imprimir mensaje original en consola
# <-- FIN NUEVO -->
# <-- run_script ya no es necesaria -->
# --- Función check_skip_status (sin cambios en su lógica interna) ---
def check_skip_status(
xml_filepath, processed_json_filepath, final_output_dir, log_f
): # Añadido log_f
status = {"skip_x1_x2": False, "skip_x3": False}
can_check_x3 = False
if not os.path.exists(processed_json_filepath):
return status
stored_mtime = None
stored_size = None
block_name = None
block_type = None
processed_json_mtime = None
try:
processed_json_mtime = os.path.getmtime(processed_json_filepath)
with open(processed_json_filepath, "r", encoding="utf-8") as f:
data = json.load(f)
stored_mtime = data.get("source_xml_mod_time")
stored_size = data.get("source_xml_size")
block_name = data.get("block_name")
block_type = data.get("block_type")
except Exception as e:
log_message(
f"Advertencia: Error leyendo JSON procesado {processed_json_filepath}: {e}. No se saltará.",
log_f,
also_print=False,
)
return status
if stored_mtime is None or stored_size is None:
can_check_x3 = block_name is not None and block_type is not None
else:
try:
current_xml_mtime = os.path.getmtime(xml_filepath)
current_xml_size = os.path.getsize(xml_filepath)
time_match = abs(stored_mtime - current_xml_mtime) < 0.001
size_match = stored_size == current_xml_size
if time_match and size_match:
status["skip_x1_x2"] = True
can_check_x3 = True
except OSError as e:
log_message(
f"Advertencia: Error obteniendo metadatos XML para {xml_filepath}: {e}. No se saltará x1/x2.",
log_f,
also_print=False,
)
can_check_x3 = block_name is not None and block_type is not None
if status["skip_x1_x2"] and can_check_x3:
try:
expected_extension = (
".md" if block_type in ["PlcUDT", "PlcTagTable"] else ".scl"
)
final_filename = format_variable_name(block_name) + expected_extension
final_output_path = os.path.join(final_output_dir, final_filename)
if os.path.exists(final_output_path):
final_output_mtime = os.path.getmtime(final_output_path)
if final_output_mtime >= processed_json_mtime:
status["skip_x3"] = True
except Exception as e:
log_message(
f"Advertencia: Error determinando estado de salto x3 para {block_name or 'desconocido'}: {e}. No se saltará x3.",
log_f,
also_print=False,
)
return status
# --- Bloque Principal ---
if __name__ == "__main__":
configs = load_configuration()
working_directory = configs.get("working_directory")
group_config = configs.get("level2", {})
# <-- NUEVO: Leer parámetros de configuración para x3, x4, x5 -->
xml_parser_config = configs.get("XML Parser to SCL", {})
cfg_scl_output_dirname = xml_parser_config.get("scl_output_dir", "scl_output")
cfg_xref_output_dirname = xml_parser_config.get("xref_output_dir", "xref_output")
cfg_xref_source_subdir = xml_parser_config.get("xref_source_subdir", "source")
cfg_call_xref_filename = xml_parser_config.get("call_xref_filename", "xref_calls_tree.md")
cfg_db_usage_xref_filename = xml_parser_config.get("db_usage_xref_filename", "xref_db_usage_summary.md")
cfg_plc_tag_xref_filename = xml_parser_config.get("plc_tag_xref_filename", "xref_plc_tags_summary.md")
cfg_max_call_depth = xml_parser_config.get("max_call_depth", 5)
cfg_max_users_list = xml_parser_config.get("max_users_list", 20)
cfg_aggregated_filename = xml_parser_config.get("aggregated_filename", "full_project_representation.md")
# <-- FIN NUEVO -->
# Directorio donde se encuentra este script (x0_main.py)
script_dir = os.path.dirname(os.path.abspath(__file__))
# <-- MODIFICADO: Abrir archivo log -->
log_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), LOG_FILENAME
)
with open(
log_filepath, "w", encoding="utf-8"
) as log_f: # Usar 'a' para añadir al log
log_message("=" * 40 + " LOG START " + "=" * 40, log_f)
# --- PARTE 1: BUSCAR ARCHIVOS ---
# <-- MODIFICADO: Apuntar al subdirectorio 'PLC' dentro del working_directory -->
plc_subdir_name = "PLC" # Nombre estándar del subdirectorio de TIA Portal
xml_project_dir = os.path.join(working_directory, plc_subdir_name)
log_message(
f"Directorio de trabajo base configurado: '{working_directory}'", log_f
)
log_message(
f"Buscando archivos XML recursivamente en el subdirectorio: '{xml_project_dir}'", log_f
)
# Verificar si el directorio PLC existe
if not os.path.isdir(xml_project_dir):
log_message(
f"Error: El subdirectorio '{plc_subdir_name}' no existe dentro de '{working_directory}'. "
f"Se esperaba encontrar la estructura del proyecto TIA Portal en '{xml_project_dir}'.",
log_f,
also_print=False,
)
print(
f"Error: El subdirectorio '{plc_subdir_name}' no existe dentro de '{working_directory}'. "
f"Asegúrese de que la ruta del directorio de trabajo apunte a la carpeta que *contiene* la carpeta '{plc_subdir_name}'.", file=sys.stderr
)
sys.exit(1)
search_pattern = os.path.join(xml_project_dir, "**", "*.xml")
xml_files_found = glob.glob(search_pattern, recursive=True)
if not xml_files_found:
log_message(
f"No se encontraron archivos XML en '{xml_project_dir}' o sus subdirectorios.",
log_f,
)
sys.exit(0)
log_message(
f"Se encontraron {len(xml_files_found)} archivos XML para procesar:", log_f
)
xml_files_found.sort()
[
log_message(f" - {os.path.relpath(xml_file, working_directory)}", log_f) # Mostrar ruta relativa al working_directory original
for xml_file in xml_files_found
]
# --- Directorios de salida ---
# Estos directorios ahora se crearán DENTRO de xml_project_dir (es decir, dentro de 'PLC')
scl_output_dir = os.path.join(xml_project_dir, cfg_scl_output_dirname) # Usar valor de config
xref_output_dir = os.path.join(xml_project_dir, cfg_xref_output_dirname) # Usar valor de config
# --- PARTE 2: PROCESAMIENTO INDIVIDUAL (x1, x2, x3) ---
log_message("\n--- Fase 1: Procesamiento Individual (x1, x2, x3) ---", log_f)
# Los nombres de script ya no se usan directamente para x1, x2, x3
# script1 = "x1_to_json.py"
# script2 = "x2_process.py"
# script3 = "x3_generate_scl.py"
processed_count = 0
skipped_full_count = 0
failed_count = 0
skipped_partial_count = 0
for i, xml_filepath in enumerate(xml_files_found):
relative_path = os.path.relpath(xml_filepath, working_directory)
log_message(f"\n--- Procesando archivo: {relative_path} ---", log_f)
base_filename = os.path.splitext(os.path.basename(xml_filepath))[0]
parsing_dir = os.path.join(os.path.dirname(xml_filepath), "parsing")
# Crear directorio de parsing si no existe
os.makedirs(parsing_dir, exist_ok=True)
json_output_file = os.path.join(parsing_dir, f"{base_filename}.json")
processed_json_filepath = os.path.join(
parsing_dir, f"{base_filename}_processed.json" # <-- Corregido: nombre correcto
)
# 1. Comprobar estado de salto
skip_info = check_skip_status(
xml_filepath, processed_json_filepath, scl_output_dir, log_f
) # Pasar log_f
skip_x1_x2 = skip_info["skip_x1_x2"]
skip_x3 = skip_info["skip_x3"]
# Si se salta todo, registrar y continuar
if skip_x1_x2 and skip_x3:
log_message(
f"--- SALTANDO TODO (x1, x2, x3) para: {relative_path} (XML no modificado, salida final actualizada)",
log_f,
)
skipped_full_count += 1
processed_count += 1 # Contar como procesado si se salta todo
continue
# Usar try/except para capturar errores en las llamadas directas
try:
# 2. Ejecutar/Saltar x1 (convert_xml_to_json)
if skip_x1_x2:
log_message(
f"--- SALTANDO x1 para: {relative_path} (XML no modificado, JSON procesado existe)",
log_f,
)
success_x1 = True # Asumir éxito si se salta
else:
log_message(
f"--- Ejecutando x1 (convert_xml_to_json) para: {relative_path} ---", log_f
)
success_x1 = convert_xml_to_json(xml_filepath, json_output_file)
if not success_x1:
log_message(f"--- x1 FALLÓ para: {relative_path} ---", log_f, also_print=False) # La función ya imprime el error
if not success_x1:
failed_count += 1
continue # No continuar si x1 falló
# 3. Ejecutar/Saltar x2 (process_json_to_scl)
if skip_x1_x2: # Si se saltó x1, también se salta x2
log_message(
f"--- SALTANDO x2 para: {relative_path} (razón anterior)", log_f
)
success_x2 = True # Asumir éxito si se salta
else:
log_message(
f"--- Ejecutando x2 (process_json_to_scl) para: {relative_path} ---", log_f
)
success_x2 = process_json_to_scl(json_output_file, processed_json_filepath)
if not success_x2:
log_message(f"--- x2 FALLÓ para: {relative_path} ---", log_f, also_print=False)
if not success_x2:
failed_count += 1
continue # No continuar si x2 falló
# 4. Ejecutar x3 (generate_scl_or_markdown) - skip_x3 ya se manejó al principio
# Si llegamos aquí, x3 SIEMPRE debe ejecutarse (porque skip_x3 era False)
if skip_x1_x2:
skipped_partial_count += 1 # Se saltó x1/x2 pero se ejecuta x3
log_message(
f"--- Ejecutando x3 (generate_scl_or_markdown) para: {relative_path} ---", log_f
)
# Asegurar que el directorio de salida final exista ANTES de llamar a la función
os.makedirs(scl_output_dir, exist_ok=True)
success_x3 = generate_scl_or_markdown(
processed_json_filepath, scl_output_dir, xml_project_dir
)
if not success_x3:
log_message(f"--- x3 FALLÓ para: {relative_path} ---", log_f, also_print=False)
failed_count += 1
continue # No continuar si x3 falló
# Si todo fue bien
processed_count += 1
except Exception as e:
# Capturar cualquier error inesperado durante las llamadas a funciones
log_message(f"--- ERROR INESPERADO procesando {relative_path}: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO procesando {relative_path}: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False) # Loguear traceback
traceback.print_exc(file=sys.stderr) # Mostrar traceback en consola
failed_count += 1
continue # Pasar al siguiente archivo
# --- PARTE 3: EJECUTAR x4 (Referencias Cruzadas) ---
log_message(
f"\n--- Fase 2: Ejecutando x4_cross_reference.py (salida en '{cfg_xref_output_dirname}/') ---", # Usar valor de config
log_f,
)
run_x4 = True
success_x4 = False
# La condición para ejecutar x4 ahora depende de si *algún* archivo tuvo éxito en x1 y x2
# (Necesitamos una forma de rastrear esto, o simplemente intentarlo si no hubo fallos fatales antes)
# Simplificación: Ejecutar x4 si no todos los archivos fallaron en x1/x2.
# Una mejor comprobación sería ver si existe algún archivo _processed.json
can_run_x4 = failed_count < len(xml_files_found) # Aproximación simple
if not can_run_x4 and len(xml_files_found) > 0:
log_message(
"Advertencia: Todos los archivos fallaron en x1/x2. Saltando x4.", log_f
)
run_x4 = False
elif len(xml_files_found) == 0:
run_x4 = False # No hay archivos, no ejecutar
if run_x4:
log_message(
f"Ejecutando x4 (generate_cross_references) sobre: {xml_project_dir}, salida en: {xref_output_dir}",
log_f,
)
try:
# Llamada directa a la función de x4
# <-- MODIFICADO: Pasar todos los parámetros leídos de la config -->
success_x4 = generate_cross_references(
xml_project_dir,
xref_output_dir,
cfg_scl_output_dirname,
cfg_xref_source_subdir,
cfg_call_xref_filename,
cfg_db_usage_xref_filename,
cfg_plc_tag_xref_filename,
cfg_max_call_depth,
cfg_max_users_list)
if not success_x4:
# La función interna ya debería haber impreso/logueado el error específico
log_message(f"--- x4 (generate_cross_references) FALLÓ. ---", log_f, also_print=False)
except Exception as e:
# Capturar error inesperado en la llamada a x4
log_message(f"--- ERROR INESPERADO ejecutando x4: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO ejecutando x4: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False)
traceback.print_exc(file=sys.stderr)
success_x4 = False # Marcar como fallo
else:
log_message("Fase 2 (x4) omitida.", log_f)
# --- PARTE 4: EJECUTAR x5 (Agregación) ---
log_message(
f"\n--- Fase 3: Ejecutando x5_aggregate.py (salida en '{cfg_aggregated_filename}') ---", # Usar valor de config
log_f
)
run_x5 = True
success_x5 = False
# Condición similar a x4: ejecutar si no todo falló en x1/x2/x3
can_run_x5 = failed_count < len(xml_files_found)
if not can_run_x5 and len(xml_files_found) > 0:
log_message(
"Advertencia: Todos los archivos fallaron en x1/x2/x3. Saltando x5.", log_f
)
run_x5 = False
elif len(xml_files_found) == 0:
run_x5 = False
if run_x5:
output_agg_file = os.path.join(working_directory, cfg_aggregated_filename) # Usar valor de config
log_message(
f"Ejecutando x5 (aggregate_outputs) sobre: {xml_project_dir}, salida agregada en: {output_agg_file}",
log_f
)
try:
# Llamada directa a la función de x5
# <-- MODIFICADO: Pasar los parámetros necesarios leídos de la config -->
success_x5 = aggregate_outputs(
xml_project_dir,
output_agg_file,
cfg_scl_output_dirname,
cfg_xref_output_dirname)
if not success_x5:
# La función interna ya debería haber impreso/logueado el error específico
log_message(f"--- x5 (aggregate_outputs) FALLÓ. ---", log_f, also_print=False)
except Exception as e:
# Capturar error inesperado en la llamada a x5
log_message(f"--- ERROR INESPERADO ejecutando x5: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO ejecutando x5: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False)
traceback.print_exc(file=sys.stderr)
success_x5 = False # Marcar como fallo
else:
log_message("Fase 3 (x5) omitida.", log_f)
# --- PARTE 5: RESUMEN FINAL --- (MOVIDO AQUÍ)
# --- PARTE 5: RESUMEN FINAL ---
log_message(
"\n" + "-" * 20 + " Resumen Final del Procesamiento Completo " + "-" * 20,
log_f,
)
log_message(f"Total de archivos XML encontrados: {len(xml_files_found)}", log_f)
log_message(
f"Archivos procesados/actualizados exitosamente (x1-x3): {processed_count}",
log_f,
)
log_message(
f"Archivos completamente saltados (x1, x2, x3): {skipped_full_count}", log_f
)
log_message(
f"Archivos parcialmente saltados (x1, x2 saltados; x3 ejecutado): {skipped_partial_count}",
log_f,
)
log_message(f"Archivos fallidos (en x1, x2, x3 o error inesperado): {failed_count}", log_f)
# El detalle de archivos fallidos es más difícil de rastrear ahora sin el dict 'file_status'
# Se podría reintroducir si es necesario, actualizándolo en cada paso.
# Por ahora, solo mostramos el conteo.
# if failed_count > 0:
# log_message("Archivos fallidos:", log_f)
# ... (lógica para mostrar cuáles fallaron) ...
log_message(
f"Fase 2 (Generación XRef - x4): {'Completada' if run_x4 and success_x4 else ('Fallida' if run_x4 and not success_x4 else 'Omitida')}",
log_f,
)
log_message(
f"Fase 3 (Agregación - x5): {'Completada' if run_x5 and success_x5 else ('Fallida' if run_x5 and not success_x5 else 'Omitida')}",
log_f,
)
log_message("-" * (80), log_f)
has_errors = (
failed_count > 0
or (run_x4 and not success_x4)
or (run_x5 and not success_x5)
)
# Mensaje final en consola
final_console_message = "Proceso finalizado exitosamente."
exit_code = 0
if has_errors:
final_console_message = "Proceso finalizado con errores."
exit_code = 1
log_message(final_console_message, log_f) # Loguear mensaje final
print(
f"\n{final_console_message} Consulta '{LOG_FILENAME}' para detalles."
) # Mostrar mensaje en consola
log_message("="*41 + " LOG END " + "="*42, log_f)
# <-- NUEVO: Flush explícito antes de salir -->
try:
log_f.flush()
os.fsync(log_f.fileno()) # Intenta forzar escritura a disco (puede no funcionar en todos los OS)
except Exception as flush_err:
print(f"Advertencia: Error durante flush/fsync final del log: {flush_err}", file=sys.stderr)
# <-- FIN NUEVO -->
# Mensaje final ya impreso antes del flush
sys.exit(exit_code) # Salir con el código apropiado