Compare commits

..

No commits in common. "6ffdec7a9a7ee5dde9ab0b540227fa34ba022d6b" and "fceebd1e2d3d2d813b90cbfd1bb4150b06a8b952" have entirely different histories.

30 changed files with 744 additions and 6184 deletions

174
.gitignore vendored
View File

@ -1,174 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

110
app.py
View File

@ -1,18 +1,9 @@
from flask import Flask, render_template, request, jsonify, url_for from flask import Flask, render_template, request, jsonify, url_for
from flask_sock import Sock from flask_sock import Sock
from config_manager import ConfigurationManager from config_manager import ConfigurationManager
from datetime import datetime
import os import os
import json # Added import import json # Added import
from datetime import datetime
import time # Added for shutdown delay
# --- Imports for System Tray Icon ---
import threading
import webbrowser
import sys
import requests # To send shutdown request
from PIL import Image
import pystray
app = Flask( app = Flask(
__name__, static_url_path="", static_folder="static", template_folder="templates" __name__, static_url_path="", static_folder="static", template_folder="templates"
@ -23,9 +14,6 @@ config_manager = ConfigurationManager()
# Lista global para mantener las conexiones WebSocket activas # Lista global para mantener las conexiones WebSocket activas
websocket_connections = set() websocket_connections = set()
# --- Globals for Tray Icon ---
tray_icon = None
@sock.route("/ws") @sock.route("/ws")
def handle_websocket(ws): def handle_websocket(ws):
@ -239,99 +227,5 @@ def get_directory_history(group):
return jsonify(history) return jsonify(history)
# --- System Tray Icon Functions ---
def run_flask():
"""Runs the Flask app."""
print("Starting Flask server on http://127.0.0.1:5000/")
try:
# use_reloader=False is important when running in a thread
# For production, consider using waitress or gunicorn instead of app.run
app.run(host='127.0.0.1', port=5000, debug=True, use_reloader=False)
except Exception as e:
print(f"Error running Flask app: {e}")
# Optionally try to stop the tray icon if Flask fails critically
if tray_icon:
print("Attempting to stop tray icon due to Flask error.")
tray_icon.stop()
def open_app_browser(icon, item):
"""Callback function to open the browser."""
print("Opening application in browser...")
webbrowser.open("http://127.0.0.1:5000/")
def shutdown_flask_server():
"""Attempts to gracefully shut down the Werkzeug server."""
try:
# This requires the development server (werkzeug)
# Send a request to a special shutdown route
requests.post("http://127.0.0.1:5000/_shutdown", timeout=1)
except Exception as e:
print(f"Could not send shutdown request to Flask server: {e}")
print("Flask server might need to be closed manually.")
def stop_icon_thread():
"""Helper function to stop the icon after a delay, allowing HTTP response."""
time.sleep(0.1) # Small delay to allow the HTTP response to be sent
if tray_icon:
print("Stopping tray icon from shutdown route...")
tray_icon.stop()
else:
print("Tray icon not available to stop.")
# As a last resort if the icon isn't running for some reason
# print("Attempting os._exit(0) as fallback.")
# os._exit(0) # Force exit - use with caution
@app.route('/_shutdown', methods=['POST'])
def shutdown_route():
"""Internal route to shut down the application via the tray icon."""
print("Shutdown endpoint called.")
# Stop the main application thread by stopping the tray icon.
# Do this in a separate thread to allow the HTTP response to return first.
stopper = threading.Thread(target=stop_icon_thread, daemon=True)
stopper.start()
print("Shutdown signal sent to tray icon thread.")
return jsonify(status="success", message="Application shutdown initiated..."), 200
def exit_application(icon, item):
"""Callback function to exit the application."""
print("Exit requested via tray menu.")
# Just stop the icon. This will end the main thread, and the daemon Flask thread will exit.
print("Stopping tray icon...")
if icon: # pystray passes the icon object
icon.stop()
elif tray_icon: # Fallback just in case
tray_icon.stop()
if __name__ == "__main__": if __name__ == "__main__":
# --- Start Flask in a background thread --- app.run(debug=True)
flask_thread = threading.Thread(target=run_flask, daemon=True)
flask_thread.start()
# --- Setup and run the system tray icon ---
icon_path = r"d:\Proyectos\Scripts\ParamManagerScripts\icon.png" # Use absolute path
try:
image = Image.open(icon_path)
menu = pystray.Menu(
pystray.MenuItem("Abrir ParamManager", open_app_browser, default=True),
pystray.MenuItem("Salir", exit_application)
)
tray_icon = pystray.Icon("ParamManager", image, "ParamManager", menu)
print("Starting system tray icon...")
tray_icon.run() # This blocks the main thread until icon.stop() is called
except FileNotFoundError:
print(f"Error: Icono no encontrado en '{icon_path}'. El icono de notificación no se iniciará.", file=sys.stderr)
print("La aplicación Flask seguirá ejecutándose en segundo plano. Presiona Ctrl+C para detenerla si es necesario.")
# Keep the main thread alive so the Flask thread doesn't exit immediately
# This allows Flask to continue running even without the tray icon.
try:
while flask_thread.is_alive():
flask_thread.join(timeout=1.0) # Wait indefinitely
except KeyboardInterrupt:
print("\nCtrl+C detectado. Intentando detener Flask...")
shutdown_flask_server() # Try to shutdown Flask on Ctrl+C too
print("Saliendo.")
except Exception as e:
print(f"Error al iniciar el icono de notificación: {e}", file=sys.stderr)
print("Aplicación finalizada.")

View File

@ -1,34 +0,0 @@
--- Log de Ejecución: x1.py ---
Grupo: EmailCrono
Directorio de Trabajo: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Inicio: 2025-05-03 17:15:12
Fin: 2025-05-03 17:15:14
Duración: 0:00:01.628641
Estado: SUCCESS (Código de Salida: 0)
--- SALIDA ESTÁNDAR (STDOUT) ---
Working directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Input directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Output directory: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs
Cronologia file: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
Attachments directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\adjuntos
Beautify rules file: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\config\beautify_rules.json
Found 1 .eml files
Loaded 0 existing messages
Processing C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS.eml
Aplicando reglas de prioridad 1
Aplicando reglas de prioridad 2
Aplicando reglas de prioridad 3
Aplicando reglas de prioridad 4
Estadísticas de procesamiento:
- Total mensajes encontrados: 1
- Mensajes únicos añadidos: 1
- Mensajes duplicados ignorados: 0
Writing 1 messages to C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
--- ERRORES (STDERR) ---
Ninguno
--- FIN DEL LOG ---

View File

@ -1,14 +0,0 @@
{
"level1": {
"api_key": "your-api-key-here",
"model": "gpt-3.5-turbo"
},
"level2": {
"attachments_dir": "adjuntos",
"cronologia_file": "cronologia.md"
},
"level3": {
"output_directory": "C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs"
},
"working_directory": "C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS"
}

View File

@ -1,8 +1,6 @@
{ {
"path": "C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS", "path": "C:\\Trabajo\\VM\\40 - 93040 - HENKEL - NEXT2 Problem\\Reporte\\EmailTody",
"history": [ "history": [
"C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS",
"C:\\Estudio",
"C:\\Trabajo\\VM\\40 - 93040 - HENKEL - NEXT2 Problem\\Reporte\\EmailTody", "C:\\Trabajo\\VM\\40 - 93040 - HENKEL - NEXT2 Problem\\Reporte\\EmailTody",
"C:\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Reporte\\Emails", "C:\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Reporte\\Emails",
"C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Emails", "C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Emails",

View File

@ -1,6 +0,0 @@
{
"name": "Exportador de objetos de Tia Portal y procesador de CAx",
"description": "Este conjunto de scripts exporta desde Tia Portal los objetos en fomarto XML y los objetos CAx. Luego se puede generar documentacion desde estos CAx de la periferia IO del PLC exportado.",
"version": "1.0",
"author": "Miguel"
}

View File

@ -1,11 +0,0 @@
{
"scl_output_dir": "scl_output",
"xref_output_dir": "xref_output",
"xref_source_subdir": "source",
"call_xref_filename": "xref_calls_tree.md",
"db_usage_xref_filename": "xref_db_usage_summary.md",
"plc_tag_xref_filename": "xref_plc_tags_summary.md",
"max_call_depth": 5,
"max_users_list": 20,
"aggregated_filename": "full_project_representation.md"
}

View File

@ -1,6 +0,0 @@
{
"name": "Procesador de XML exportado de TIA",
"description": "Conjunto de scripts que procesan archivos XML exportados de TIA, conviertiendo los objetos LAD a SCL y generando documentación en formato Markdown. ",
"version": "1.0",
"author": "Miguel"
}

View File

@ -1,59 +1,4 @@
{ {
"type": "object", "type": "object",
"properties": { "properties": {}
"scl_output_dir": {
"type": "string",
"title": "Directorio Salida SCL/MD (x3)",
"description": "Nombre del directorio (relativo a la raíz del proyecto PLC) donde x3 genera archivos .scl/.md, y x4/x5 leen.",
"default": "scl_output"
},
"xref_output_dir": {
"type": "string",
"title": "Directorio Salida XRef (x4)",
"description": "Nombre del directorio (relativo a la raíz del proyecto PLC) donde x4 genera archivos de referencias cruzadas.",
"default": "xref_output"
},
"xref_source_subdir": {
"type": "string",
"title": "Subdirectorio Fuentes XRef (x4)",
"description": "Nombre del subdirectorio dentro de xref_output_dir donde x4 coloca archivos fuente (.md) preparados para enlaces Obsidian.",
"default": "source"
},
"call_xref_filename": {
"type": "string",
"title": "Nombre Archivo Árbol Llamadas (x4)",
"description": "Nombre del archivo para la salida del árbol de llamadas generado por x4.",
"default": "xref_calls_tree.md"
},
"db_usage_xref_filename": {
"type": "string",
"title": "Nombre Archivo Uso DBs (x4)",
"description": "Nombre del archivo para el resumen de uso de DBs generado por x4.",
"default": "xref_db_usage_summary.md"
},
"plc_tag_xref_filename": {
"type": "string",
"title": "Nombre Archivo Uso PLC Tags (x4)",
"description": "Nombre del archivo para el resumen de uso de PLC Tags generado por x4.",
"default": "xref_plc_tags_summary.md"
},
"max_call_depth": {
"type": "integer",
"title": "Profundidad Máx. Árbol Llamadas (x4)",
"description": "Profundidad máxima de recursión para el árbol de llamadas generado por x4.",
"default": 5
},
"max_users_list": {
"type": "integer",
"title": "Máx. Usuarios Listados (x4)",
"description": "Número máximo de usuarios listados por DB/Tag en los resúmenes generados por x4.",
"default": 20
},
"aggregated_filename": {
"type": "string",
"title": "Nombre Archivo Agregado (x5)",
"description": "Nombre del archivo Markdown agregado final generado por x5 (se guarda en el directorio de trabajo principal).",
"default": "full_project_representation.md"
}
}
} }

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +0,0 @@
{
"level1": {
"api_key": "your-api-key-here",
"model": "gpt-3.5-turbo"
},
"level2": {
"scl_output_dir": "scl_output",
"xref_output_dir": "xref_output",
"xref_source_subdir": "source",
"call_xref_filename": "xref_calls_tree.md",
"db_usage_xref_filename": "xref_db_usage_summary.md",
"plc_tag_xref_filename": "xref_plc_tags_summary.md",
"max_call_depth": 5,
"max_users_list": 20,
"aggregated_filename": "full_project_representation.md"
},
"level3": {},
"working_directory": "C:\\Trabajo\\SIDEL\\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\\Reporte\\IOExport"
}

View File

@ -1,6 +0,0 @@
{
"path": "C:\\Trabajo\\SIDEL\\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\\Reporte\\IOExport",
"history": [
"C:\\Trabajo\\SIDEL\\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\\Reporte\\IOExport"
]
}

View File

@ -23,16 +23,15 @@ script_root = os.path.dirname(
sys.path.append(script_root) sys.path.append(script_root)
from backend.script_utils import load_configuration from backend.script_utils import load_configuration
# <-- NUEVO: Importar funciones directamente --> # --- Funciones (get_console_encoding - sin cambios) ---
from x1_to_json import convert_xml_to_json def get_console_encoding():
from x2_process import process_json_to_scl try:
from x3_generate_scl import generate_scl_or_markdown return locale.getpreferredencoding(False)
# <-- NUEVO: Importar funciones de x4 y x5 --> except Exception:
from x4_cross_reference import generate_cross_references # Asumiendo que x4_cross_reference.py tiene esta función return "cp1252"
from x5_aggregate import aggregate_outputs
CONSOLE_ENCODING = "utf-8" CONSOLE_ENCODING = get_console_encoding()
# <-- NUEVO: Importar format_variable_name (necesario para predecir nombre de salida) --> # <-- NUEVO: Importar format_variable_name (necesario para predecir nombre de salida) -->
try: try:
@ -86,7 +85,117 @@ def log_message(message, log_file_handle, also_print=True):
# <-- FIN NUEVO --> # <-- FIN NUEVO -->
# <-- run_script ya no es necesaria --> # <-- MODIFICADO: run_script para aceptar log_file_handle -->
def run_script(script_name, xml_arg, log_file_handle, *extra_args):
"""Runs a given script, logs output, and returns success status."""
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), script_name)
python_executable = sys.executable
command = [python_executable, script_path, os.path.abspath(xml_arg)]
command.extend(extra_args)
# Loguear el comando que se va a ejecutar
log_message(
f"--- Running {script_name} with arguments: {[os.path.relpath(arg) if isinstance(arg, str) and os.path.exists(arg) else arg for arg in command[2:]]} ---",
log_file_handle,
)
try:
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
encoding=CONSOLE_ENCODING,
errors="replace",
)
stdout_clean = result.stdout.strip() if result.stdout else ""
stderr_clean = result.stderr.strip() if result.stderr else ""
# Loguear stdout si existe
if stdout_clean:
log_message(
f"--- Stdout ({script_name}) ---", log_file_handle, also_print=False
) # Loguear encabezado
log_message(
stdout_clean, log_file_handle, also_print=True
) # Loguear y mostrar contenido
log_message(
f"--- End Stdout ({script_name}) ---", log_file_handle, also_print=False
) # Loguear fin
# Loguear stderr si existe
if stderr_clean:
# Usar log_message también para stderr, pero imprimir en consola como error
log_message(
f"--- Stderr ({script_name}) ---", log_file_handle, also_print=False
) # Loguear encabezado
log_message(
stderr_clean, log_file_handle, also_print=False
) # Loguear contenido
log_message(
f"--- End Stderr ({script_name}) ---", log_file_handle, also_print=False
) # Loguear fin
# Imprimir stderr en la consola de error estándar
print(f"--- Stderr ({script_name}) ---", file=sys.stderr)
print(stderr_clean, file=sys.stderr)
print("--------------------------", file=sys.stderr)
return True # Éxito
except FileNotFoundError:
error_msg = f"Error: Script '{script_path}' or Python executable '{python_executable}' not found."
log_message(error_msg, log_file_handle, also_print=False) # Loguear error
print(error_msg, file=sys.stderr) # Mostrar error en consola
return False
except subprocess.CalledProcessError as e:
error_msg = f"Error running {script_name}: Script returned non-zero exit code {e.returncode}."
log_message(error_msg, log_file_handle, also_print=False) # Loguear error
print(error_msg, file=sys.stderr) # Mostrar error en consola
stdout_decoded = e.stdout.strip() if e.stdout else ""
stderr_decoded = e.stderr.strip() if e.stderr else ""
if stdout_decoded:
log_message(
f"--- Stdout ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
log_message(stdout_decoded, log_file_handle, also_print=False)
log_message(
f"--- End Stdout ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
print(f"--- Stdout ({script_name}) ---", file=sys.stderr)
print(stdout_decoded, file=sys.stderr)
if stderr_decoded:
log_message(
f"--- Stderr ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
log_message(stderr_decoded, log_file_handle, also_print=False)
log_message(
f"--- End Stderr ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
print(f"--- Stderr ({script_name}) ---", file=sys.stderr)
print(stderr_decoded, file=sys.stderr)
print("--------------------------", file=sys.stderr)
return False
except Exception as e:
error_msg = f"An unexpected error occurred while running {script_name}: {e}"
log_message(error_msg, log_file_handle, also_print=False) # Loguear error
traceback_str = traceback.format_exc()
log_message(
traceback_str, log_file_handle, also_print=False
) # Loguear traceback
print(error_msg, file=sys.stderr) # Mostrar error en consola
traceback.print_exc(file=sys.stderr) # Mostrar traceback en consola
return False
# --- Función check_skip_status (sin cambios en su lógica interna) --- # --- Función check_skip_status (sin cambios en su lógica interna) ---
@ -157,29 +266,16 @@ def check_skip_status(
return status return status
# --- Constantes ---
AGGREGATED_FILENAME = "full_project_representation.md"
SCL_OUTPUT_DIRNAME = "scl_output"
XREF_OUTPUT_DIRNAME = "xref_output"
# --- Bloque Principal --- # --- Bloque Principal ---
if __name__ == "__main__": if __name__ == "__main__":
configs = load_configuration() configs = load_configuration()
working_directory = configs.get("working_directory") working_directory = configs.get("working_directory")
group_config = configs.get("level2", {})
# <-- NUEVO: Leer parámetros de configuración para x3, x4, x5 -->
xml_parser_config = configs.get("XML Parser to SCL", {})
cfg_scl_output_dirname = xml_parser_config.get("scl_output_dir", "scl_output")
cfg_xref_output_dirname = xml_parser_config.get("xref_output_dir", "xref_output")
cfg_xref_source_subdir = xml_parser_config.get("xref_source_subdir", "source")
cfg_call_xref_filename = xml_parser_config.get("call_xref_filename", "xref_calls_tree.md")
cfg_db_usage_xref_filename = xml_parser_config.get("db_usage_xref_filename", "xref_db_usage_summary.md")
cfg_plc_tag_xref_filename = xml_parser_config.get("plc_tag_xref_filename", "xref_plc_tags_summary.md")
cfg_max_call_depth = xml_parser_config.get("max_call_depth", 5)
cfg_max_users_list = xml_parser_config.get("max_users_list", 20)
cfg_aggregated_filename = xml_parser_config.get("aggregated_filename", "full_project_representation.md")
# <-- FIN NUEVO -->
# Directorio donde se encuentra este script (x0_main.py)
script_dir = os.path.dirname(os.path.abspath(__file__))
# <-- MODIFICADO: Abrir archivo log --> # <-- MODIFICADO: Abrir archivo log -->
log_filepath = os.path.join( log_filepath = os.path.join(
@ -191,28 +287,19 @@ if __name__ == "__main__":
log_message("=" * 40 + " LOG START " + "=" * 40, log_f) log_message("=" * 40 + " LOG START " + "=" * 40, log_f)
# --- PARTE 1: BUSCAR ARCHIVOS --- # --- PARTE 1: BUSCAR ARCHIVOS ---
# <-- MODIFICADO: Apuntar al subdirectorio 'PLC' dentro del working_directory --> xml_project_dir = working_directory
plc_subdir_name = "PLC" # Nombre estándar del subdirectorio de TIA Portal
xml_project_dir = os.path.join(working_directory, plc_subdir_name)
log_message( log_message(
f"Directorio de trabajo base configurado: '{working_directory}'", log_f f"Buscando archivos XML recursivamente en: '{xml_project_dir}'", log_f
) )
log_message(
f"Buscando archivos XML recursivamente en el subdirectorio: '{xml_project_dir}'", log_f
)
# Verificar si el directorio PLC existe
if not os.path.isdir(xml_project_dir): if not os.path.isdir(xml_project_dir):
log_message( log_message(
f"Error: El subdirectorio '{plc_subdir_name}' no existe dentro de '{working_directory}'. " f"Error: El directorio '{xml_project_dir}' no existe.",
f"Se esperaba encontrar la estructura del proyecto TIA Portal en '{xml_project_dir}'.",
log_f, log_f,
also_print=False, also_print=False,
) )
print( print(
f"Error: El subdirectorio '{plc_subdir_name}' no existe dentro de '{working_directory}'. " f"Error: El directorio '{xml_project_dir}' no existe.", file=sys.stderr
f"Asegúrese de que la ruta del directorio de trabajo apunte a la carpeta que *contiene* la carpeta '{plc_subdir_name}'.", file=sys.stderr
) )
sys.exit(1) sys.exit(1)
search_pattern = os.path.join(xml_project_dir, "**", "*.xml") search_pattern = os.path.join(xml_project_dir, "**", "*.xml")
@ -228,37 +315,35 @@ if __name__ == "__main__":
) )
xml_files_found.sort() xml_files_found.sort()
[ [
log_message(f" - {os.path.relpath(xml_file, working_directory)}", log_f) # Mostrar ruta relativa al working_directory original log_message(f" - {os.path.relpath(xml_file, script_dir)}", log_f)
for xml_file in xml_files_found for xml_file in xml_files_found
] ]
# --- Directorios de salida --- # --- Directorios de salida ---
# Estos directorios ahora se crearán DENTRO de xml_project_dir (es decir, dentro de 'PLC') scl_output_dir = os.path.join(xml_project_dir, SCL_OUTPUT_DIRNAME)
scl_output_dir = os.path.join(xml_project_dir, cfg_scl_output_dirname) # Usar valor de config xref_output_dir = os.path.join(xml_project_dir, XREF_OUTPUT_DIRNAME)
xref_output_dir = os.path.join(xml_project_dir, cfg_xref_output_dirname) # Usar valor de config
# --- PARTE 2: PROCESAMIENTO INDIVIDUAL (x1, x2, x3) --- # --- PARTE 2: PROCESAMIENTO INDIVIDUAL (x1, x2, x3) ---
log_message("\n--- Fase 1: Procesamiento Individual (x1, x2, x3) ---", log_f) log_message("\n--- Fase 1: Procesamiento Individual (x1, x2, x3) ---", log_f)
# Los nombres de script ya no se usan directamente para x1, x2, x3 script1 = "x1_to_json.py"
# script1 = "x1_to_json.py" script2 = "x2_process.py"
# script2 = "x2_process.py" script3 = "x3_generate_scl.py"
# script3 = "x3_generate_scl.py" file_status = {}
processed_count = 0 processed_count = 0
skipped_full_count = 0 skipped_full_count = 0
failed_count = 0 failed_count = 0
skipped_partial_count = 0 skipped_partial_count = 0
for i, xml_filepath in enumerate(xml_files_found): for xml_filepath in xml_files_found:
relative_path = os.path.relpath(xml_filepath, working_directory) relative_path = os.path.relpath(xml_filepath, script_dir)
log_message(f"\n--- Procesando archivo: {relative_path} ---", log_f) log_message(f"\n--- Procesando archivo: {relative_path} ---", log_f)
status = {"x1_ok": None, "x2_ok": None, "x3_ok": None}
file_status[relative_path] = status
base_filename = os.path.splitext(os.path.basename(xml_filepath))[0] base_filename = os.path.splitext(os.path.basename(xml_filepath))[0]
parsing_dir = os.path.join(os.path.dirname(xml_filepath), "parsing") parsing_dir = os.path.join(os.path.dirname(xml_filepath), "parsing")
# Crear directorio de parsing si no existe
os.makedirs(parsing_dir, exist_ok=True)
json_output_file = os.path.join(parsing_dir, f"{base_filename}.json")
processed_json_filepath = os.path.join( processed_json_filepath = os.path.join(
parsing_dir, f"{base_filename}_processed.json" # <-- Corregido: nombre correcto parsing_dir, f"{base_filename}_processed.json"
) )
# 1. Comprobar estado de salto # 1. Comprobar estado de salto
@ -268,184 +353,139 @@ if __name__ == "__main__":
skip_x1_x2 = skip_info["skip_x1_x2"] skip_x1_x2 = skip_info["skip_x1_x2"]
skip_x3 = skip_info["skip_x3"] skip_x3 = skip_info["skip_x3"]
# Si se salta todo, registrar y continuar # 2. Ejecutar/Saltar x1
if skip_x1_x2 and skip_x3:
log_message(
f"--- SALTANDO TODO (x1, x2, x3) para: {relative_path} (XML no modificado, salida final actualizada)",
log_f,
)
skipped_full_count += 1
processed_count += 1 # Contar como procesado si se salta todo
continue
# Usar try/except para capturar errores en las llamadas directas
try:
# 2. Ejecutar/Saltar x1 (convert_xml_to_json)
if skip_x1_x2: if skip_x1_x2:
log_message( log_message(
f"--- SALTANDO x1 para: {relative_path} (XML no modificado, JSON procesado existe)", f"--- SALTANDO x1 para: {relative_path} (archivo XML no modificado y JSON procesado existe)",
log_f, log_f,
) )
success_x1 = True # Asumir éxito si se salta status["x1_ok"] = True
else:
if run_script(script1, xml_filepath, log_f): # Pasar log_f
# Mensaje ya logueado por run_script
status["x1_ok"] = True
else: else:
log_message( log_message(
f"--- Ejecutando x1 (convert_xml_to_json) para: {relative_path} ---", log_f f"--- {script1} FALLÓ para: {relative_path} ---",
) log_f,
success_x1 = convert_xml_to_json(xml_filepath, json_output_file) also_print=False,
if not success_x1: ) # Ya impreso por run_script
log_message(f"--- x1 FALLÓ para: {relative_path} ---", log_f, also_print=False) # La función ya imprime el error status["x1_ok"] = False
if not success_x1:
failed_count += 1 failed_count += 1
continue # No continuar si x1 falló continue
# 3. Ejecutar/Saltar x2 (process_json_to_scl) # 3. Ejecutar/Saltar x2
if skip_x1_x2: # Si se saltó x1, también se salta x2 if skip_x1_x2:
log_message( log_message(
f"--- SALTANDO x2 para: {relative_path} (razón anterior)", log_f f"--- SALTANDO x2 para: {relative_path} (razón anterior)", log_f
) )
success_x2 = True # Asumir éxito si se salta status["x2_ok"] = True
else:
if run_script(script2, xml_filepath, log_f): # Pasar log_f
status["x2_ok"] = True
else: else:
log_message( log_message(
f"--- Ejecutando x2 (process_json_to_scl) para: {relative_path} ---", log_f f"--- {script2} FALLÓ para: {relative_path} ---",
log_f,
also_print=False,
) )
success_x2 = process_json_to_scl(json_output_file, processed_json_filepath) status["x2_ok"] = False
if not success_x2:
log_message(f"--- x2 FALLÓ para: {relative_path} ---", log_f, also_print=False)
if not success_x2:
failed_count += 1 failed_count += 1
continue # No continuar si x2 falló continue
# 4. Ejecutar x3 (generate_scl_or_markdown) - skip_x3 ya se manejó al principio # 4. Ejecutar/Saltar x3
# Si llegamos aquí, x3 SIEMPRE debe ejecutarse (porque skip_x3 era False) if skip_x3: # Solo puede ser True si skip_x1_x2 era True
log_message(
f"--- SALTANDO x3 para: {relative_path} (archivo de salida en '{SCL_OUTPUT_DIRNAME}' está actualizado)",
log_f,
)
status["x3_ok"] = True
skipped_full_count += 1
processed_count += 1
else:
if skip_x1_x2: if skip_x1_x2:
skipped_partial_count += 1 # Se saltó x1/x2 pero se ejecuta x3 skipped_partial_count += 1 # Se saltó x1/x2 pero se ejecuta x3
if run_script(
log_message( script3, xml_filepath, log_f, xml_project_dir
f"--- Ejecutando x3 (generate_scl_or_markdown) para: {relative_path} ---", log_f ): # Pasar log_f y project_root_dir
) status["x3_ok"] = True
# Asegurar que el directorio de salida final exista ANTES de llamar a la función
os.makedirs(scl_output_dir, exist_ok=True)
success_x3 = generate_scl_or_markdown(
processed_json_filepath, scl_output_dir, xml_project_dir
)
if not success_x3:
log_message(f"--- x3 FALLÓ para: {relative_path} ---", log_f, also_print=False)
failed_count += 1
continue # No continuar si x3 falló
# Si todo fue bien
processed_count += 1 processed_count += 1
else:
except Exception as e: log_message(
# Capturar cualquier error inesperado durante las llamadas a funciones f"--- {script3} FALLÓ para: {relative_path} ---",
log_message(f"--- ERROR INESPERADO procesando {relative_path}: {e} ---", log_f, also_print=False) log_f,
print(f"--- ERROR INESPERADO procesando {relative_path}: {e} ---", file=sys.stderr) also_print=False,
traceback_str = traceback.format_exc() )
log_message(traceback_str, log_f, also_print=False) # Loguear traceback status["x3_ok"] = False
traceback.print_exc(file=sys.stderr) # Mostrar traceback en consola
failed_count += 1 failed_count += 1
continue # Pasar al siguiente archivo continue
# --- PARTE 3: EJECUTAR x4 (Referencias Cruzadas) --- # --- PARTE 3: EJECUTAR x4 (Referencias Cruzadas) ---
log_message( log_message(
f"\n--- Fase 2: Ejecutando x4_cross_reference.py (salida en '{cfg_xref_output_dirname}/') ---", # Usar valor de config f"\n--- Fase 2: Ejecutando x4_cross_reference.py (salida en '{XREF_OUTPUT_DIRNAME}/') ---",
log_f, log_f,
) )
script4 = "x4_cross_reference.py"
run_x4 = True run_x4 = True
success_x4 = False success_x4 = False
# La condición para ejecutar x4 ahora depende de si *algún* archivo tuvo éxito en x1 y x2 can_run_x4 = any(s["x1_ok"] and s["x2_ok"] for s in file_status.values())
# (Necesitamos una forma de rastrear esto, o simplemente intentarlo si no hubo fallos fatales antes) if not can_run_x4:
# Simplificación: Ejecutar x4 si no todos los archivos fallaron en x1/x2.
# Una mejor comprobación sería ver si existe algún archivo _processed.json
can_run_x4 = failed_count < len(xml_files_found) # Aproximación simple
if not can_run_x4 and len(xml_files_found) > 0:
log_message( log_message(
"Advertencia: Todos los archivos fallaron en x1/x2. Saltando x4.", log_f "Advertencia: Ningún archivo completó x1/x2. Saltando x4.", log_f
)
run_x4 = False
script4_path = os.path.join(script_dir, script4)
if not os.path.exists(script4_path):
log_message(
f"Advertencia: Script '{script4}' no encontrado. Saltando x4.", log_f
) )
run_x4 = False run_x4 = False
elif len(xml_files_found) == 0:
run_x4 = False # No hay archivos, no ejecutar
if run_x4: if run_x4:
log_message( log_message(
f"Ejecutando x4 (generate_cross_references) sobre: {xml_project_dir}, salida en: {xref_output_dir}", f"Ejecutando {script4} sobre: {xml_project_dir}, salida en: {xref_output_dir}",
log_f, log_f,
) )
try: success_x4 = run_script(
# Llamada directa a la función de x4 script4, xml_project_dir, log_f, "-o", xref_output_dir
# <-- MODIFICADO: Pasar todos los parámetros leídos de la config --> ) # Pasar log_f
success_x4 = generate_cross_references(
xml_project_dir,
xref_output_dir,
cfg_scl_output_dirname,
cfg_xref_source_subdir,
cfg_call_xref_filename,
cfg_db_usage_xref_filename,
cfg_plc_tag_xref_filename,
cfg_max_call_depth,
cfg_max_users_list)
if not success_x4: if not success_x4:
# La función interna ya debería haber impreso/logueado el error específico log_message(f"--- {script4} FALLÓ. ---", log_f, also_print=False)
log_message(f"--- x4 (generate_cross_references) FALLÓ. ---", log_f, also_print=False) # Mensaje de éxito ya logueado por run_script
except Exception as e:
# Capturar error inesperado en la llamada a x4
log_message(f"--- ERROR INESPERADO ejecutando x4: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO ejecutando x4: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False)
traceback.print_exc(file=sys.stderr)
success_x4 = False # Marcar como fallo
else: else:
log_message("Fase 2 (x4) omitida.", log_f) log_message("Fase 2 (x4) omitida.", log_f)
# --- PARTE 4: EJECUTAR x5 (Agregación) --- # --- PARTE 4: EJECUTAR x5 (Agregación) ---
log_message( log_message(f"\n--- Fase 3: Ejecutando x5_aggregate.py ---", log_f)
f"\n--- Fase 3: Ejecutando x5_aggregate.py (salida en '{cfg_aggregated_filename}') ---", # Usar valor de config script5 = "x5_aggregate.py"
log_f
)
run_x5 = True run_x5 = True
success_x5 = False success_x5 = False
# Condición similar a x4: ejecutar si no todo falló en x1/x2/x3 can_run_x5 = any(s["x3_ok"] for s in file_status.values())
can_run_x5 = failed_count < len(xml_files_found) if not can_run_x5:
if not can_run_x5 and len(xml_files_found) > 0: log_message("Advertencia: Ningún archivo completó x3. Saltando x5.", log_f)
log_message(
"Advertencia: Todos los archivos fallaron en x1/x2/x3. Saltando x5.", log_f
)
run_x5 = False run_x5 = False
elif len(xml_files_found) == 0: script5_path = os.path.join(script_dir, script5)
if not os.path.exists(script5_path):
log_message(
f"Advertencia: Script '{script5}' no encontrado. Saltando x5.", log_f
)
run_x5 = False run_x5 = False
if run_x5: if run_x5:
output_agg_file = os.path.join(working_directory, cfg_aggregated_filename) # Usar valor de config output_agg_file = os.path.join(xml_project_dir, AGGREGATED_FILENAME)
log_message( log_message(
f"Ejecutando x5 (aggregate_outputs) sobre: {xml_project_dir}, salida agregada en: {output_agg_file}", f"Ejecutando {script5} sobre: {xml_project_dir}, salida en: {output_agg_file}",
log_f log_f,
) )
try: success_x5 = run_script(
# Llamada directa a la función de x5 script5, xml_project_dir, log_f, "-o", output_agg_file
# <-- MODIFICADO: Pasar los parámetros necesarios leídos de la config --> ) # Pasar log_f
success_x5 = aggregate_outputs(
xml_project_dir,
output_agg_file,
cfg_scl_output_dirname,
cfg_xref_output_dirname)
if not success_x5: if not success_x5:
# La función interna ya debería haber impreso/logueado el error específico log_message(f"--- {script5} FALLÓ. ---", log_f, also_print=False)
log_message(f"--- x5 (aggregate_outputs) FALLÓ. ---", log_f, also_print=False) # Mensaje de éxito ya logueado por run_script
except Exception as e:
# Capturar error inesperado en la llamada a x5
log_message(f"--- ERROR INESPERADO ejecutando x5: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO ejecutando x5: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False)
traceback.print_exc(file=sys.stderr)
success_x5 = False # Marcar como fallo
else: else:
log_message("Fase 3 (x5) omitida.", log_f) log_message("Fase 3 (x5) omitida.", log_f)
# --- PARTE 5: RESUMEN FINAL --- (MOVIDO AQUÍ)
# --- PARTE 5: RESUMEN FINAL --- # --- PARTE 5: RESUMEN FINAL ---
log_message( log_message(
"\n" + "-" * 20 + " Resumen Final del Procesamiento Completo " + "-" * 20, "\n" + "-" * 20 + " Resumen Final del Procesamiento Completo " + "-" * 20,
@ -463,13 +503,21 @@ if __name__ == "__main__":
f"Archivos parcialmente saltados (x1, x2 saltados; x3 ejecutado): {skipped_partial_count}", f"Archivos parcialmente saltados (x1, x2 saltados; x3 ejecutado): {skipped_partial_count}",
log_f, log_f,
) )
log_message(f"Archivos fallidos (en x1, x2, x3 o error inesperado): {failed_count}", log_f) log_message(f"Archivos fallidos (en x1, x2 o x3): {failed_count}", log_f)
# El detalle de archivos fallidos es más difícil de rastrear ahora sin el dict 'file_status' if failed_count > 0:
# Se podría reintroducir si es necesario, actualizándolo en cada paso. log_message("Archivos fallidos:", log_f)
# Por ahora, solo mostramos el conteo. for f, s in file_status.items():
# if failed_count > 0: if not (
# log_message("Archivos fallidos:", log_f) s.get("x1_ok", False)
# ... (lógica para mostrar cuáles fallaron) ... and s.get("x2_ok", False)
and s.get("x3_ok", False)
):
failed_step = (
"x1"
if not s.get("x1_ok", False)
else ("x2" if not s.get("x2_ok", False) else "x3")
)
log_message(f" - {f} (falló en {failed_step})", log_f)
log_message( log_message(
f"Fase 2 (Generación XRef - x4): {'Completada' if run_x4 and success_x4 else ('Fallida' if run_x4 and not success_x4 else 'Omitida')}", f"Fase 2 (Generación XRef - x4): {'Completada' if run_x4 and success_x4 else ('Fallida' if run_x4 and not success_x4 else 'Omitida')}",
log_f, log_f,
@ -507,5 +555,5 @@ if __name__ == "__main__":
print(f"Advertencia: Error durante flush/fsync final del log: {flush_err}", file=sys.stderr) print(f"Advertencia: Error durante flush/fsync final del log: {flush_err}", file=sys.stderr)
# <-- FIN NUEVO --> # <-- FIN NUEVO -->
# Mensaje final ya impreso antes del flush print(f"\n{final_console_message} Consulta '{LOG_FILENAME}' para detalles.")
sys.exit(exit_code) # Salir con el código apropiado sys.exit(exit_code) # Salir con el código apropiado

View File

@ -1,9 +1,3 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script convierte archivos XML de Siemens LAD/FUP a un formato JSON simplificado.
"""
# ToUpload/x1_to_json.py # ToUpload/x1_to_json.py
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
@ -13,15 +7,9 @@ import sys
import traceback import traceback
import importlib import importlib
from lxml import etree from lxml import etree
from lxml.etree import XMLSyntaxError as etree_XMLSyntaxError # Alias para evitar conflicto
from collections import defaultdict from collections import defaultdict
import copy import copy
import time # <-- NUEVO: Para obtener metadatos import time # <-- NUEVO: Para obtener metadatos
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Importar funciones comunes y namespaces desde el nuevo módulo de utils # Importar funciones comunes y namespaces desde el nuevo módulo de utils
try: try:
@ -221,18 +209,12 @@ def load_parsers(parsers_dir="parsers"):
return parser_map return parser_map
# <-- MODIFICADO: parser_map ya no es un argumento, se carga dentro --> def convert_xml_to_json(xml_filepath, json_filepath, parser_map):
def convert_xml_to_json(xml_filepath, json_filepath):
""" """
Convierte XML a JSON, detectando tipo, añadiendo metadatos del XML Convierte XML a JSON, detectando tipo, añadiendo metadatos del XML
y extrayendo comentarios/títulos de red de forma centralizada. (v3) y extrayendo comentarios/títulos de red de forma centralizada. (v3)
Carga los parsers necesarios internamente.
""" """
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...") print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
# <-- NUEVO: Cargar parsers aquí -->
print("Cargando parsers de red...")
parser_map = load_parsers()
# <-- FIN NUEVO -->
if not os.path.exists(xml_filepath): if not os.path.exists(xml_filepath):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'") print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return False return False
@ -456,7 +438,7 @@ def convert_xml_to_json(xml_filepath, json_filepath):
print("Error Crítico: No se generó ningún resultado para el archivo XML.") print("Error Crítico: No se generó ningún resultado para el archivo XML.")
return False return False
except etree_XMLSyntaxError as e: # Usar alias except etree.XMLSyntaxError as e:
print(f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}") print(f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}")
return False return False
except Exception as e: except Exception as e:
@ -466,35 +448,29 @@ def convert_xml_to_json(xml_filepath, json_filepath):
# --- Punto de Entrada Principal (__main__) --- # --- Punto de Entrada Principal (__main__) ---
if __name__ == "__main__": if __name__ == "__main__":
# Lógica para ejecución standalone parser = argparse.ArgumentParser(
try: description="Convert Simatic XML (FC/FB/OB/DB/UDT/TagTable) to simplified JSON using dynamic parsers and add XML metadata."
import tkinter as tk
from tkinter import filedialog
except ImportError:
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr)
# No salimos, podríamos intentar obtener el path de otra forma o fallar más adelante
tk = None # Marcar como no disponible
xml_input_file = ""
if tk:
root = tk.Tk()
root.withdraw() # Ocultar la ventana principal de Tkinter
print("Por favor, selecciona el archivo XML de entrada...")
xml_input_file = filedialog.askopenfilename(
title="Selecciona el archivo XML de entrada",
filetypes=[("XML files", "*.xml"), ("All files", "*.*")]
) )
root.destroy() # Cerrar Tkinter parser.add_argument(
"xml_filepath",
help="Path to the input XML file passed from the main script (x0_main.py).",
)
args = parser.parse_args()
xml_input_file = args.xml_filepath
if not xml_input_file: if not os.path.exists(xml_input_file):
print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr)
# sys.exit(1) # No usar sys.exit aquí
else:
print( print(
f"Archivo XML seleccionado: {xml_input_file}" f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'",
file=sys.stderr,
)
sys.exit(1)
loaded_parsers = load_parsers()
if not loaded_parsers:
print(
"Advertencia (x1): No se cargaron parsers de red. Se continuará para UDT/TagTable/DB."
) )
# Calcular ruta de salida JSON
xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0] xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0]
base_dir = os.path.dirname(xml_input_file) base_dir = os.path.dirname(xml_input_file)
output_dir = os.path.join(base_dir, "parsing") output_dir = os.path.join(base_dir, "parsing")
@ -502,13 +478,16 @@ if __name__ == "__main__":
json_output_file = os.path.join(output_dir, f"{xml_filename_base}.json") json_output_file = os.path.join(output_dir, f"{xml_filename_base}.json")
print( print(
f"(x1 - Standalone) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'" f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'"
) )
# Llamar a la función principal (que ahora carga los parsers) success = convert_xml_to_json(xml_input_file, json_output_file, loaded_parsers)
success = convert_xml_to_json(xml_input_file, json_output_file)
if success: if success:
print("\nConversión completada exitosamente.") sys.exit(0)
else: else:
print(f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.", file=sys.stderr) print(
f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.",
file=sys.stderr,
)
sys.exit(1)

View File

@ -1,10 +1,3 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script convierte un archivo JSON simplificado (resultado de un análisis de un XML de Siemens) a un
JSON enriquecido con lógica SCL. Se enfoca en la lógica de programación y la agrupación de instrucciones IF.
"""
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
import argparse import argparse
@ -15,11 +8,6 @@ import re
import importlib import importlib
import sys import sys
import sympy import sympy
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Import necessary components from processors directory # Import necessary components from processors directory
from processors.processor_utils import format_variable_name, sympy_expr_to_scl from processors.processor_utils import format_variable_name, sympy_expr_to_scl
@ -532,54 +520,57 @@ def process_json_to_scl(json_filepath, output_json_filepath):
# --- Ejecución (MODIFICADO) --- # --- Ejecución (MODIFICADO) ---
if __name__ == "__main__": if __name__ == "__main__":
# Lógica para ejecución standalone parser = argparse.ArgumentParser(
try: description="Process simplified JSON to embed SCL logic, copying XML metadata. Expects original XML filepath."
import tkinter as tk ) # <-- MODIFICADO
from tkinter import filedialog parser.add_argument(
except ImportError: "source_xml_filepath",
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr) help="Path to the original source XML file (passed from x0_main.py).",
tk = None
input_json_file = ""
if tk:
root = tk.Tk()
root.withdraw()
print("Por favor, selecciona el archivo JSON de entrada (generado por x1)...")
input_json_file = filedialog.askopenfilename(
title="Selecciona el archivo JSON de entrada (.json)",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
) )
root.destroy() args = parser.parse_args()
source_xml_file = args.source_xml_filepath
if not input_json_file: if not os.path.exists(source_xml_file):
print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr) print(
else: f"Advertencia (x2): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON correspondiente.",
print(f"Archivo JSON de entrada seleccionado: {input_json_file}") file=sys.stderr,
)
# No salir, intentar encontrar el JSON de todas formas
# Calcular ruta de salida JSON procesado xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0]
json_filename_base = os.path.splitext(os.path.basename(input_json_file))[0] base_dir = os.path.dirname(source_xml_file)
# Asumimos que el _processed.json va al mismo directorio 'parsing' parsing_dir = os.path.join(base_dir, "parsing")
parsing_dir = os.path.dirname(input_json_file) # x2 LEE el .json y ESCRIBE el _processed.json
output_json_file = os.path.join(parsing_dir, f"{json_filename_base}_processed.json") input_json_file = os.path.join(parsing_dir, f"{xml_filename_base}.json")
output_json_file = os.path.join(parsing_dir, f"{xml_filename_base}_processed.json")
# Asegurarse de que el directorio de salida exista (aunque debería si el input existe)
os.makedirs(parsing_dir, exist_ok=True) os.makedirs(parsing_dir, exist_ok=True)
print( print(
f"(x2 - Standalone) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'" f"(x2) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'"
) )
if not os.path.exists(input_json_file):
print(
f"Error Fatal (x2): El archivo de entrada JSON no existe: '{input_json_file}'",
file=sys.stderr,
)
print(
f"Asegúrate de que 'x1_to_json.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'.",
file=sys.stderr,
)
sys.exit(1)
else:
try: try:
success = process_json_to_scl(input_json_file, output_json_file) success = process_json_to_scl(input_json_file, output_json_file)
if success: if success:
print("\nProcesamiento completado exitosamente.") sys.exit(0)
else: else:
print(f"\nError durante el procesamiento de '{os.path.relpath(input_json_file)}'.", file=sys.stderr) sys.exit(1)
# sys.exit(1) # No usar sys.exit
except Exception as e: except Exception as e:
print( print(
f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}", f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}",
file=sys.stderr, file=sys.stderr,
) )
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
# sys.exit(1) # No usar sys.exit sys.exit(1)

View File

@ -1,9 +1,3 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script es parte de un conjunto de herramientas para convertir proyectos de Siemens LAD/FUP a SCL.
"""
# ToUpload/x3_generate_scl.py # ToUpload/x3_generate_scl.py
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
@ -12,11 +6,6 @@ import re
import argparse import argparse
import sys import sys
import traceback import traceback
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Importar Generadores Específicos --- # --- Importar Generadores Específicos ---
try: try:
@ -36,7 +25,7 @@ except ImportError as e:
sys.exit(1) sys.exit(1)
# --- Constantes --- # --- Constantes ---
# SCL_OUTPUT_DIRNAME = "scl_output" # <-- Ya no se usa directamente en __main__, se lee de config SCL_OUTPUT_DIRNAME = "scl_output" # <-- NUEVO: Nombre del directorio de salida final
# --- Modificar generate_scl_or_markdown para usar el nuevo directorio de salida --- # --- Modificar generate_scl_or_markdown para usar el nuevo directorio de salida ---
@ -143,54 +132,42 @@ def generate_scl_or_markdown(
# --- Ejecución (MODIFICADO para usar SCL_OUTPUT_DIRNAME) --- # --- Ejecución (MODIFICADO para usar SCL_OUTPUT_DIRNAME) ---
if __name__ == "__main__": if __name__ == "__main__":
# Lógica para ejecución standalone parser = argparse.ArgumentParser(
try: description=f"Generate final SCL/Markdown file into '{SCL_OUTPUT_DIRNAME}/'."
import tkinter as tk ) # <-- MODIFICADO
from tkinter import filedialog parser.add_argument(
except ImportError: "source_xml_filepath", help="Path to the original source XML file."
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr)
tk = None
input_json_file = ""
project_root_dir = ""
if tk:
root = tk.Tk()
root.withdraw()
print("Por favor, selecciona el archivo JSON procesado de entrada (generado por x2)...")
input_json_file = filedialog.askopenfilename(
title="Selecciona el archivo JSON procesado de entrada (_processed.json)",
filetypes=[("Processed JSON files", "*_processed.json"), ("JSON files", "*.json"), ("All files", "*.*")]
) )
if input_json_file: parser.add_argument(
print(f"Archivo JSON procesado seleccionado: {input_json_file}") "project_root_dir",
print("Por favor, selecciona el directorio raíz del proyecto XML (ej. la carpeta 'PLC')...") help="Path to the root directory of the XML project structure.",
project_root_dir = filedialog.askdirectory(
title="Selecciona el directorio raíz del proyecto XML"
) )
if project_root_dir: args = parser.parse_args()
print(f"Directorio raíz del proyecto seleccionado: {project_root_dir}") source_xml_file = args.source_xml_filepath
else: project_root_dir = args.project_root_dir
print("No se seleccionó directorio raíz. Saliendo.", file=sys.stderr)
else:
print("No se seleccionó archivo JSON procesado. Saliendo.", file=sys.stderr)
root.destroy()
if input_json_file and project_root_dir: if not os.path.exists(source_xml_file):
# Calcular directorio de salida final print(
# <-- NUEVO: Leer nombre del directorio de salida desde la configuración --> f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}'. Se intentará continuar.",
configs = load_configuration() file=sys.stderr,
xml_parser_config = configs.get("XML Parser to SCL", {}) )
cfg_scl_output_dirname = xml_parser_config.get("scl_output_dir", "scl_output") # Leer con default # No salir necesariamente, podríamos tener el JSON procesado
# <-- FIN NUEVO -->
final_output_dir = os.path.join(project_root_dir, cfg_scl_output_dirname) # Usar valor leído xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0]
xml_dir = os.path.dirname(source_xml_file)
parsing_dir = os.path.join(xml_dir, "parsing")
input_json_file = os.path.join(parsing_dir, f"{xml_filename_base}_processed.json")
print(f"(x3 - Standalone) Generando SCL/MD desde: '{os.path.relpath(input_json_file)}'") # <-- MODIFICADO: Calcular directorio de salida final -->
print(f"(x3 - Standalone) Directorio de salida final: '{os.path.relpath(final_output_dir)}'") # Siempre será 'scl_output' bajo la raíz del proyecto
print(f"(x3 - Standalone) Usando ruta raíz del proyecto: '{project_root_dir}' para buscar UDTs.") final_output_dir = os.path.join(project_root_dir, SCL_OUTPUT_DIRNAME)
# <-- FIN MODIFICADO -->
# Asegurar que el directorio de salida final exista print(f"(x3) Generando SCL/MD desde: '{os.path.relpath(input_json_file)}'")
print(f"(x3) Directorio de salida final: '{os.path.relpath(final_output_dir)}'")
print(f"(x3) Usando ruta raíz del proyecto: '{project_root_dir}' para buscar UDTs.")
# Asegurar que el directorio de salida final exista ANTES de llamar a la función
try: try:
os.makedirs(final_output_dir, exist_ok=True) os.makedirs(final_output_dir, exist_ok=True)
except OSError as e: except OSError as e:
@ -198,27 +175,25 @@ if __name__ == "__main__":
f"Error Crítico (x3): No se pudo crear el directorio de salida '{final_output_dir}': {e}", f"Error Crítico (x3): No se pudo crear el directorio de salida '{final_output_dir}': {e}",
file=sys.stderr, file=sys.stderr,
) )
# sys.exit(1) # No usar sys.exit sys.exit(1)
success = False # Marcar como fallo para evitar la llamada
else:
success = True # Marcar como éxito para proceder
if success: # Solo intentar si se pudo crear el directorio if not os.path.exists(input_json_file):
print(
f"Error Fatal (x3): JSON procesado no encontrado: '{input_json_file}'",
file=sys.stderr,
)
sys.exit(1)
else:
try: try:
# Llamar a la función principal # Pasar el directorio de salida FINAL y la ruta raíz
success = generate_scl_or_markdown( success = generate_scl_or_markdown(
input_json_file, final_output_dir, project_root_dir input_json_file, final_output_dir, project_root_dir
) ) # <-- MODIFICADO
if success: if success:
print("\nGeneración de SCL/MD completada exitosamente.") sys.exit(0)
else: else:
# La función generate_scl_or_markdown ya imprime el error sys.exit(1) # La función ya imprimió el error
print(f"\nError durante la generación desde '{os.path.relpath(input_json_file)}'.", file=sys.stderr)
# sys.exit(1) # No usar sys.exit
except Exception as e: except Exception as e:
print(f"Error Crítico no manejado en x3: {e}", file=sys.stderr) print(f"Error Crítico no manejado en x3: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
# sys.exit(1) # No usar sys.exit sys.exit(1)
else:
# Mensajes de cancelación ya impresos si aplica
pass

View File

@ -1,9 +1,3 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script genera documentacion MD de Cross Reference para Obsidian
"""
# ToUpload/x4_cross_reference.py # ToUpload/x4_cross_reference.py
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
@ -16,11 +10,6 @@ import re
import urllib.parse import urllib.parse
import shutil # <-- NUEVO: Para copiar archivos import shutil # <-- NUEVO: Para copiar archivos
from collections import defaultdict from collections import defaultdict
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Importar format_variable_name (sin cambios) --- # --- Importar format_variable_name (sin cambios) ---
try: try:
@ -51,14 +40,14 @@ except ImportError:
# --- Constantes --- # --- Constantes ---
# SCL_OUTPUT_DIRNAME = "scl_output" # Se leerá de config SCL_OUTPUT_DIRNAME = "scl_output"
# XREF_SOURCE_SUBDIR = "source" # Se leerá de config XREF_SOURCE_SUBDIR = "source" # <-- NUEVO: Subdirectorio para fuentes MD
# CALL_XREF_FILENAME = "xref_calls_tree.md" # Se leerá de config CALL_XREF_FILENAME = "xref_calls_tree.md"
# DB_USAGE_XREF_FILENAME = "xref_db_usage_summary.md" # Se leerá de config DB_USAGE_XREF_FILENAME = "xref_db_usage_summary.md"
# PLC_TAG_XREF_FILENAME = "xref_plc_tags_summary.md" # Se leerá de config PLC_TAG_XREF_FILENAME = "xref_plc_tags_summary.md"
# MAX_CALL_DEPTH = 5 # Se leerá de config MAX_CALL_DEPTH = 5
INDENT_STEP = " " INDENT_STEP = " "
# MAX_USERS_LIST = 20 # Se leerá de config MAX_USERS_LIST = 20
# --- Funciones de Análisis (find_calls_in_scl, find_db_tag_usage, find_plc_tag_usage sin cambios) --- # --- Funciones de Análisis (find_calls_in_scl, find_db_tag_usage, find_plc_tag_usage sin cambios) ---
@ -223,14 +212,13 @@ def find_plc_tag_usage(scl_code, plc_tag_names_set):
# <-- NUEVA FUNCION --> # <-- NUEVA FUNCION -->
def copy_and_prepare_source_files(project_root_dir, xref_output_dir, scl_output_dirname, xref_source_subdir): def copy_and_prepare_source_files(project_root_dir, xref_output_dir):
""" """
Copia archivos .scl y .md desde scl_output a xref_output/source, Copia archivos .scl y .md desde scl_output a xref_output/source,
convirtiendo .scl a .md con formato de bloque de código. convirtiendo .scl a .md con formato de bloque de código.
Usa los nombres de directorios pasados como argumentos.
""" """
scl_source_dir = os.path.join(project_root_dir, scl_output_dirname) scl_source_dir = os.path.join(project_root_dir, SCL_OUTPUT_DIRNAME)
md_target_dir = os.path.join(xref_output_dir, xref_source_subdir) md_target_dir = os.path.join(xref_output_dir, XREF_SOURCE_SUBDIR)
if not os.path.isdir(scl_source_dir): if not os.path.isdir(scl_source_dir):
print( print(
@ -305,7 +293,7 @@ def copy_and_prepare_source_files(project_root_dir, xref_output_dir, scl_output_
# <-- MODIFICADO: get_scl_link --> # <-- MODIFICADO: get_scl_link -->
def get_scl_link( def get_scl_link(
block_name, block_entry, xref_source_subdir block_name, block_entry, base_xref_dir
): # Ya no necesita project_root_dir ): # Ya no necesita project_root_dir
""" """
Genera un enlace Markdown relativo al archivo .md correspondiente DENTRO de xref_output/source. Genera un enlace Markdown relativo al archivo .md correspondiente DENTRO de xref_output/source.
@ -314,10 +302,10 @@ def get_scl_link(
return f"`{block_name}`" return f"`{block_name}`"
# El nombre del archivo destino siempre será .md # El nombre del archivo destino siempre será .md
md_filename = format_variable_name(block_name) + ".md" # Asegurar que format_variable_name esté disponible md_filename = format_variable_name(block_name) + ".md"
# La ruta siempre estará dentro del subdirectorio fuente de xref # La ruta siempre estará dentro del subdirectorio 'source'
link_target_path = f"{xref_source_subdir}/{md_filename}" link_target_path = f"{XREF_SOURCE_SUBDIR}/{md_filename}"
# Codificar para URL/Markdown # Codificar para URL/Markdown
try: try:
@ -332,7 +320,7 @@ def get_scl_link(
# <-- MODIFICADO: build_call_tree_recursive (ya no necesita project_root_dir) --> # <-- MODIFICADO: build_call_tree_recursive (ya no necesita project_root_dir) -->
def build_call_tree_recursive( # Añadido max_call_depth, xref_source_subdir def build_call_tree_recursive(
current_node, current_node,
call_graph, call_graph,
block_data, block_data,
@ -340,8 +328,6 @@ def build_call_tree_recursive( # Añadido max_call_depth, xref_source_subdir
visited_in_path, visited_in_path,
base_xref_dir, base_xref_dir,
current_depth=0, current_depth=0,
max_call_depth=5,
xref_source_subdir="source"
): ):
""" """
Función recursiva para construir el árbol de llamadas indentado CON ENLACES Función recursiva para construir el árbol de llamadas indentado CON ENLACES
@ -350,10 +336,10 @@ def build_call_tree_recursive( # Añadido max_call_depth, xref_source_subdir
indent = INDENT_STEP * current_depth indent = INDENT_STEP * current_depth
block_entry = block_data.get(current_node) block_entry = block_data.get(current_node)
# Llamar a get_scl_link modificado # Llamar a get_scl_link modificado
node_link = get_scl_link(current_node, block_entry, xref_source_subdir) node_link = get_scl_link(current_node, block_entry, base_xref_dir)
output_lines.append(f"{indent}- {node_link}") output_lines.append(f"{indent}- {node_link}")
if current_depth >= max_call_depth: if current_depth >= MAX_CALL_DEPTH:
output_lines.append( output_lines.append(
f"{indent}{INDENT_STEP}[... Profundidad máxima alcanzada ...]" f"{indent}{INDENT_STEP}[... Profundidad máxima alcanzada ...]"
) )
@ -373,22 +359,20 @@ def build_call_tree_recursive( # Añadido max_call_depth, xref_source_subdir
block_data, block_data,
output_lines, output_lines,
visited_in_path.copy(), visited_in_path.copy(),
base_xref_dir, # base_xref_dir no se usa en la recursión, podría quitarse base_xref_dir,
current_depth + 1, current_depth + 1,
max_call_depth=max_call_depth, # Pasar parámetro
xref_source_subdir=xref_source_subdir # Pasar parámetro
) )
# <-- MODIFICADO: generate_call_tree_output (ya no necesita project_root_dir) --> # <-- MODIFICADO: generate_call_tree_output (ya no necesita project_root_dir) -->
def generate_call_tree_output(call_graph, block_data, base_xref_dir, max_call_depth, xref_source_subdir): # Añadido max_call_depth, xref_source_subdir def generate_call_tree_output(call_graph, block_data, base_xref_dir):
""" """
Genera las líneas de texto para el archivo de árbol de llamadas CON ENLACES Genera las líneas de texto para el archivo de árbol de llamadas CON ENLACES
a los archivos .md en xref_output/source. a los archivos .md en xref_output/source.
""" """
output_lines = ["# Árbol de Referencias Cruzadas de Llamadas\n"] output_lines = ["# Árbol de Referencias Cruzadas de Llamadas\n"]
output_lines.append(f"(Profundidad máxima: {MAX_CALL_DEPTH})\n") output_lines.append(f"(Profundidad máxima: {MAX_CALL_DEPTH})\n")
root_nodes = sorted( # Encontrar OBs root_nodes = sorted(
[ [
name name
for name, data in block_data.items() for name, data in block_data.items()
@ -403,7 +387,7 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir, max_call_de
for ob_name in root_nodes: for ob_name in root_nodes:
ob_entry = block_data.get(ob_name) ob_entry = block_data.get(ob_name)
ob_link = get_scl_link( ob_link = get_scl_link(
ob_name, ob_entry, xref_source_subdir ob_name, ob_entry, base_xref_dir
) # Llamar a get_scl_link modificado ) # Llamar a get_scl_link modificado
output_lines.append(f"\n### Iniciando desde: {ob_link}\n") output_lines.append(f"\n### Iniciando desde: {ob_link}\n")
build_call_tree_recursive( build_call_tree_recursive(
@ -412,10 +396,8 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir, max_call_de
block_data, block_data,
output_lines, output_lines,
set(), set(),
base_xref_dir, # No se usa en recursión base_xref_dir,
current_depth=0, current_depth=0,
max_call_depth=max_call_depth, # Pasar parámetro
xref_source_subdir=xref_source_subdir # Pasar parámetro
) )
all_callers = set(call_graph.keys()) all_callers = set(call_graph.keys())
@ -434,7 +416,7 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir, max_call_de
for block_name in unreached: for block_name in unreached:
block_entry = block_data.get(block_name) block_entry = block_data.get(block_name)
block_link = get_scl_link( block_link = get_scl_link(
block_name, block_entry, xref_source_subdir block_name, block_entry, base_xref_dir
) # Llamar a get_scl_link modificado ) # Llamar a get_scl_link modificado
output_lines.append(f"- {block_link}") output_lines.append(f"- {block_link}")
return output_lines return output_lines
@ -442,7 +424,7 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir, max_call_de
# --- Funciones para Salida Resumida (generate_db_usage_summary_output, generate_plc_tag_summary_output SIN CAMBIOS) --- # --- Funciones para Salida Resumida (generate_db_usage_summary_output, generate_plc_tag_summary_output SIN CAMBIOS) ---
# (Se omiten por brevedad) # (Se omiten por brevedad)
def generate_db_usage_summary_output(db_users, max_users_list): # Añadido max_users_list def generate_db_usage_summary_output(db_users):
"""Genera las líneas para el archivo Markdown de resumen de uso de DBs.""" """Genera las líneas para el archivo Markdown de resumen de uso de DBs."""
output_lines = ["# Resumen de Uso de DB Globales por Bloque\n\n"] output_lines = ["# Resumen de Uso de DB Globales por Bloque\n\n"]
if not db_users: if not db_users:
@ -458,7 +440,7 @@ def generate_db_usage_summary_output(db_users, max_users_list): # Añadido max_u
output_lines.append("- No utilizado directamente.\n") output_lines.append("- No utilizado directamente.\n")
else: else:
output_lines.append("Utilizado por:\n") output_lines.append("Utilizado por:\n")
display_users = users_list[:max_users_list] # Usar parámetro display_users = users_list[:MAX_USERS_LIST]
remaining_count = len(users_list) - len(display_users) remaining_count = len(users_list) - len(display_users)
for user_block in display_users: for user_block in display_users:
output_lines.append(f"- `{user_block}`") output_lines.append(f"- `{user_block}`")
@ -468,7 +450,7 @@ def generate_db_usage_summary_output(db_users, max_users_list): # Añadido max_u
return output_lines return output_lines
def generate_plc_tag_summary_output(plc_tag_users, max_users_list): # Añadido max_users_list def generate_plc_tag_summary_output(plc_tag_users):
"""Genera las líneas para el archivo Markdown de resumen de uso de PLC Tags.""" """Genera las líneas para el archivo Markdown de resumen de uso de PLC Tags."""
output_lines = ["# Resumen de Uso de PLC Tags Globales por Bloque\n\n"] output_lines = ["# Resumen de Uso de PLC Tags Globales por Bloque\n\n"]
if not plc_tag_users: if not plc_tag_users:
@ -484,7 +466,7 @@ def generate_plc_tag_summary_output(plc_tag_users, max_users_list): # Añadido m
output_lines.append("- No utilizado.\n") output_lines.append("- No utilizado.\n")
else: else:
output_lines.append("Utilizado por:\n") output_lines.append("Utilizado por:\n")
display_users = users_list[:max_users_list] # Usar parámetro display_users = users_list[:MAX_USERS_LIST]
remaining_count = len(users_list) - len(display_users) remaining_count = len(users_list) - len(display_users)
for user_block in display_users: for user_block in display_users:
output_lines.append(f"- `{user_block}`") output_lines.append(f"- `{user_block}`")
@ -495,33 +477,20 @@ def generate_plc_tag_summary_output(plc_tag_users, max_users_list): # Añadido m
# --- Función Principal (MODIFICADA para llamar a copy_and_prepare_source_files) --- # --- Función Principal (MODIFICADA para llamar a copy_and_prepare_source_files) ---
def generate_cross_references( def generate_cross_references(project_root_dir, output_dir):
project_root_dir,
output_dir,
scl_output_dirname,
xref_source_subdir,
call_xref_filename,
db_usage_xref_filename,
plc_tag_xref_filename,
max_call_depth,
max_users_list
):
""" """
Genera archivos de referencias cruzadas y prepara archivos fuente (.md) Genera archivos de referencias cruzadas y prepara archivos fuente (.md)
para visualización en Obsidian. para visualización en Obsidian.
Utiliza los parámetros de configuración pasados como argumentos.
""" """
print(f"--- Iniciando Generación de Referencias Cruzadas y Fuentes MD (x4) ---") print(f"--- Iniciando Generación de Referencias Cruzadas y Fuentes MD (x4) ---")
print(f"Buscando archivos JSON procesados en: {project_root_dir}") print(f"Buscando archivos JSON procesados en: {project_root_dir}")
print(f"Directorio de salida XRef: {output_dir}") print(f"Directorio de salida XRef: {output_dir}")
print(f"Directorio fuente SCL/MD: {scl_output_dirname}")
print(f"Subdirectorio fuentes MD para XRef: {xref_source_subdir}")
output_dir_abs = os.path.abspath(output_dir) output_dir_abs = os.path.abspath(output_dir)
# <-- NUEVO: Crear directorio y preparar archivos fuente ANTES de generar XRefs --> # <-- NUEVO: Crear directorio y preparar archivos fuente ANTES de generar XRefs -->
# Pasar los nombres de directorios leídos de la config copy_and_prepare_source_files(project_root_dir, output_dir_abs)
copy_and_prepare_source_files(project_root_dir, output_dir_abs, scl_output_dirname, xref_source_subdir)
# <-- FIN NUEVO --> # <-- FIN NUEVO -->
json_files = glob.glob( json_files = glob.glob(
os.path.join(project_root_dir, "**", "*_processed.json"), recursive=True os.path.join(project_root_dir, "**", "*_processed.json"), recursive=True
) )
@ -608,14 +577,14 @@ def generate_cross_references(
# 3. Generar Archivos de Salida XRef (MODIFICADO para usar la nueva función de árbol) # 3. Generar Archivos de Salida XRef (MODIFICADO para usar la nueva función de árbol)
os.makedirs(output_dir_abs, exist_ok=True) os.makedirs(output_dir_abs, exist_ok=True)
call_xref_path = os.path.join(output_dir_abs, call_xref_filename) # Usar parámetro call_xref_path = os.path.join(output_dir_abs, CALL_XREF_FILENAME)
db_usage_xref_path = os.path.join(output_dir_abs, db_usage_xref_filename) # Usar parámetro db_usage_xref_path = os.path.join(output_dir_abs, DB_USAGE_XREF_FILENAME)
plc_tag_xref_path = os.path.join(output_dir_abs, plc_tag_xref_filename) # Usar parámetro plc_tag_xref_path = os.path.join(output_dir_abs, PLC_TAG_XREF_FILENAME)
print(f"Generando ÁRBOL XRef de llamadas en: {call_xref_path}") print(f"Generando ÁRBOL XRef de llamadas en: {call_xref_path}")
try: try:
# <-- MODIFICADO: Llamar a la nueva función sin project_root_dir --> # <-- MODIFICADO: Llamar a la nueva función sin project_root_dir -->
call_tree_lines = generate_call_tree_output( # Pasar parámetros call_tree_lines = generate_call_tree_output(
call_graph, block_data, output_dir_abs call_graph, block_data, output_dir_abs
) )
with open(call_xref_path, "w", encoding="utf-8") as f: with open(call_xref_path, "w", encoding="utf-8") as f:
@ -629,7 +598,7 @@ def generate_cross_references(
# Generar Resumen de Uso de DB (sin cambios aquí) # Generar Resumen de Uso de DB (sin cambios aquí)
print(f"Generando RESUMEN XRef de uso de DBs en: {db_usage_xref_path}") print(f"Generando RESUMEN XRef de uso de DBs en: {db_usage_xref_path}")
try: try:
db_summary_lines = generate_db_usage_summary_output(db_users, max_users_list) # Pasar parámetro db_summary_lines = generate_db_usage_summary_output(db_users)
with open(db_usage_xref_path, "w", encoding="utf-8") as f: with open(db_usage_xref_path, "w", encoding="utf-8") as f:
[f.write(line + "\n") for line in db_summary_lines] [f.write(line + "\n") for line in db_summary_lines]
except Exception as e: except Exception as e:
@ -642,7 +611,7 @@ def generate_cross_references(
# Generar Resumen de Uso de PLC Tags (sin cambios aquí) # Generar Resumen de Uso de PLC Tags (sin cambios aquí)
print(f"Generando RESUMEN XRef de uso de PLC Tags en: {plc_tag_xref_path}") print(f"Generando RESUMEN XRef de uso de PLC Tags en: {plc_tag_xref_path}")
try: try:
plc_tag_lines = generate_plc_tag_summary_output(plc_tag_users, max_users_list) # Pasar parámetro plc_tag_lines = generate_plc_tag_summary_output(plc_tag_users)
with open(plc_tag_xref_path, "w", encoding="utf-8") as f: with open(plc_tag_xref_path, "w", encoding="utf-8") as f:
[f.write(line + "\n") for line in plc_tag_lines] [f.write(line + "\n") for line in plc_tag_lines]
except Exception as e: except Exception as e:
@ -658,53 +627,35 @@ def generate_cross_references(
# --- Punto de Entrada (sin cambios) --- # --- Punto de Entrada (sin cambios) ---
if __name__ == "__main__": if __name__ == "__main__":
print("(x4 - Standalone) Ejecutando generación de referencias cruzadas...") parser = argparse.ArgumentParser(
description="Genera refs cruzadas y prepara archivos fuente MD para Obsidian."
# Cargar configuración para obtener rutas
configs = load_configuration()
working_directory = configs.get("working_directory")
# Acceder a la configuración específica del grupo
group_config = configs.get("level2", {})
# Leer parámetros con valores por defecto (usando los defaults del esquema como guía)
# Parámetros necesarios para x4
cfg_scl_output_dirname = group_config.get("scl_output_dir", "scl_output")
cfg_xref_output_dirname = group_config.get("xref_output_dir", "xref_output")
cfg_xref_source_subdir = group_config.get("xref_source_subdir", "source")
cfg_call_xref_filename = group_config.get("call_xref_filename", "xref_calls_tree.md")
cfg_db_usage_xref_filename = group_config.get("db_usage_xref_filename", "xref_db_usage_summary.md")
cfg_plc_tag_xref_filename = group_config.get("plc_tag_xref_filename", "xref_plc_tags_summary.md")
cfg_max_call_depth = group_config.get("max_call_depth", 5)
cfg_max_users_list = group_config.get("max_users_list", 20)
# Calcular rutas
if not working_directory:
print("Error: 'working_directory' no encontrado en la configuración.", file=sys.stderr)
# No usamos sys.exit(1)
else:
# Calcular rutas basadas en la configuración
plc_subdir_name = "PLC" # Asumir nombre estándar
project_root_dir = os.path.join(working_directory, plc_subdir_name)
xref_output_dir = os.path.join(project_root_dir, cfg_xref_output_dirname) # Usar nombre de dir leído
if not os.path.isdir(project_root_dir):
print(f"Error: Directorio del proyecto '{project_root_dir}' no encontrado.", file=sys.stderr)
else:
# Llamar a la función principal
success = generate_cross_references(
project_root_dir,
xref_output_dir,
cfg_scl_output_dirname,
cfg_xref_source_subdir,
cfg_call_xref_filename,
cfg_db_usage_xref_filename,
cfg_plc_tag_xref_filename,
cfg_max_call_depth,
cfg_max_users_list
) )
parser.add_argument("project_root_dir", help="Ruta dir raíz proyecto XML.")
parser.add_argument(
"-o",
"--output",
help="Directorio para guardar salida XRef (incluyendo subdir 'source').",
)
args = parser.parse_args()
if not os.path.isdir(args.project_root_dir):
print(
f"Error: Dir proyecto no existe: '{args.project_root_dir}'", file=sys.stderr
)
sys.exit(1)
if not args.output:
print(
"Error: Se requiere el argumento -o/--output para especificar el directorio de salida XRef.",
file=sys.stderr,
)
sys.exit(1)
output_destination = args.output
success = generate_cross_references(args.project_root_dir, output_destination)
if success: if success:
print("\n(x4 - Standalone) Proceso completado exitosamente.") print(
f"Archivos XRef y fuentes MD generados en: {os.path.abspath(output_destination)}"
)
sys.exit(0)
else: else:
print("\n(x4 - Standalone) Proceso finalizado con errores.", file=sys.stderr) print("Hubo errores durante la generación de refs cruzadas.", file=sys.stderr)
sys.exit(1)

View File

@ -1,9 +1,3 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script genera documentación en Markdown y SCL a partir de un proyecto XML de Siemens LAD/FUP.
"""
# ToUpload/x5_aggregate.py # ToUpload/x5_aggregate.py
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
@ -11,36 +5,29 @@ import argparse
import sys import sys
import glob import glob
import traceback import traceback
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Constantes --- # --- Constantes ---
# Nombre del archivo de salida por defecto (se creará en el directorio raíz del proyecto) # Nombre del archivo de salida por defecto (se creará en el directorio raíz del proyecto)
# AGGREGATED_FILENAME = "full_project_representation.md" # Se leerá de config AGGREGATED_FILENAME = "full_project_representation.md"
# Directorio donde x4 guarda sus salidas (relativo al directorio raíz del proyecto) # Directorio donde x4 guarda sus salidas (relativo al directorio raíz del proyecto)
# XREF_OUTPUT_SUBDIR = "xref_output" # Se leerá de config XREF_OUTPUT_SUBDIR = "xref_output"
# SCL_OUTPUT_DIRNAME = "scl_output" # Se leerá de config
def aggregate_outputs(project_root_dir, output_filepath, scl_output_dirname, xref_output_dirname): # Añadido scl_output_dirname, xref_output_dirname def aggregate_files(project_root_dir, output_filepath):
""" """
Busca archivos .scl y .md generados y los agrega en un único archivo Markdown. Busca archivos .scl y .md generados y los agrega en un único archivo Markdown.
""" """
print(f"--- Iniciando Agregación de Archivos (x5) ---") print(f"--- Iniciando Agregación de Archivos (x5) ---")
print(f"Leyendo desde directorios: '{scl_output_dirname}' y '{xref_output_dirname}' (relativos a la raíz)")
print(f"Directorio Raíz del Proyecto: {project_root_dir}") print(f"Directorio Raíz del Proyecto: {project_root_dir}")
print(f"Archivo de Salida: {output_filepath}") print(f"Archivo de Salida: {output_filepath}")
# Patrones para buscar archivos generados # Patrones para buscar archivos generados
# Buscamos .scl en cualquier subdirectorio (generados por x3 junto a los XML) # Buscamos .scl en cualquier subdirectorio (generados por x3 junto a los XML)
scl_pattern = os.path.join(project_root_dir, "**", "*.scl") scl_pattern = os.path.join(project_root_dir, "**", "*.scl")
# Buscamos .md en cualquier subdirectorio (UDT/TagTable generados por x3, XRef por x4) # Buscamos .md en cualquier subdirectorio (UDT/TagTable generados por x3)
md_pattern_general = os.path.join(project_root_dir, "**", "*.md") md_pattern_general = os.path.join(project_root_dir, "**", "*.md")
# Directorio de salida de x4 # Buscamos .md específicamente en el directorio de salida de x4
xref_dir_abs = os.path.join(project_root_dir, xref_output_dirname) xref_dir = os.path.join(project_root_dir, XREF_OUTPUT_SUBDIR)
scl_dir_abs = os.path.join(project_root_dir, scl_output_dirname) # xref_pattern = os.path.join(xref_dir, "*.md") # No es necesario, el general los incluye
print(f"Buscando archivos SCL con patrón: {scl_pattern}") print(f"Buscando archivos SCL con patrón: {scl_pattern}")
print(f"Buscando archivos MD con patrón: {md_pattern_general}") print(f"Buscando archivos MD con patrón: {md_pattern_general}")
@ -48,18 +35,16 @@ def aggregate_outputs(project_root_dir, output_filepath, scl_output_dirname, xre
scl_files = glob.glob(scl_pattern, recursive=True) scl_files = glob.glob(scl_pattern, recursive=True)
md_files = glob.glob(md_pattern_general, recursive=True) md_files = glob.glob(md_pattern_general, recursive=True)
# Filtrar los archivos para asegurar que provienen de los directorios esperados # Filtrar los archivos de salida del propio x5 y los XRef para que no se incluyan dos veces
# y excluir el archivo de salida del propio x5. # si el patrón general los captura y están en el directorio raíz
output_filename_base = os.path.basename(output_filepath) output_filename_base = os.path.basename(output_filepath)
scl_files_filtered = [f for f in scl_files if os.path.dirname(f).startswith(scl_dir_abs)]
md_files_filtered = [ md_files_filtered = [
f for f in md_files f for f in md_files
if os.path.basename(f) != output_filename_base # Excluir el archivo de salida if os.path.basename(f) != output_filename_base # Excluir el archivo de salida
and (os.path.dirname(f).startswith(scl_dir_abs) or os.path.dirname(f).startswith(xref_dir_abs)) # Incluir MD de scl_output y xref_output # No es necesario excluir los XRef explícitamente si están en su subdir
# and XREF_OUTPUT_SUBDIR not in os.path.relpath(f, project_root_dir).split(os.sep)
] ]
all_files = sorted(scl_files_filtered + md_files_filtered) # Combinar y ordenar alfabéticamente
all_files = sorted(scl_files + md_files_filtered) # Combinar y ordenar alfabéticamente all_files = sorted(scl_files + md_files_filtered) # Combinar y ordenar alfabéticamente
@ -111,44 +96,42 @@ def aggregate_outputs(project_root_dir, output_filepath, scl_output_dirname, xre
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
return False return False
# --- Punto de Entrada --- # --- Punto de Entrada ---
if __name__ == "__main__": if __name__ == "__main__":
print("(x5 - Standalone) Ejecutando agregación de salidas...") parser = argparse.ArgumentParser(
description="Agrega archivos .scl y .md generados en un único archivo Markdown."
)
parser.add_argument(
"project_root_dir",
help="Ruta al directorio raíz del proyecto XML (donde se buscarán los archivos generados)."
)
parser.add_argument(
"-o", "--output",
help=f"Ruta completa para el archivo Markdown agregado (por defecto: '{AGGREGATED_FILENAME}' en project_root_dir)."
)
# Cargar configuración para obtener rutas args = parser.parse_args()
configs = load_configuration()
working_directory = configs.get("working_directory")
# Acceder a la configuración específica del grupo # Validar directorio de entrada
group_config = configs.get("level2", {}) if not os.path.isdir(args.project_root_dir):
print(f"Error: El directorio del proyecto no existe: '{args.project_root_dir}'", file=sys.stderr)
sys.exit(1)
# Leer parámetros con valores por defecto (usando los defaults del esquema como guía) # Determinar ruta de salida
# Parámetros necesarios para x5 output_file = args.output
cfg_scl_output_dirname = group_config.get("scl_output_dir", "scl_output") if not output_file:
cfg_xref_output_dirname = group_config.get("xref_output_dir", "xref_output") output_file = os.path.join(args.project_root_dir, AGGREGATED_FILENAME)
cfg_aggregated_filename = group_config.get("aggregated_filename", "full_project_representation.md")
if not working_directory:
print("Error: 'working_directory' no encontrado en la configuración.", file=sys.stderr)
else: else:
# Calcular rutas basadas en la configuración # Asegurarse de que el directorio de salida exista si se especifica una ruta completa
plc_subdir_name = "PLC" # Asumir nombre estándar output_dir = os.path.dirname(output_file)
project_root_dir = os.path.join(working_directory, plc_subdir_name) if output_dir and not os.path.exists(output_dir):
# El archivo agregado va al working_directory original os.makedirs(output_dir)
output_agg_file = os.path.join(working_directory, cfg_aggregated_filename) # Usar nombre de archivo leído
if not os.path.isdir(project_root_dir):
print(f"Error: Directorio del proyecto '{project_root_dir}' no encontrado.", file=sys.stderr)
else:
# Llamar a la función principal # Llamar a la función principal
# Pasar los nombres de directorios leídos success = aggregate_files(args.project_root_dir, output_file)
success = aggregate_outputs(
project_root_dir,
output_agg_file,
cfg_scl_output_dirname,
cfg_xref_output_dirname)
if success: if success:
print("\n(x5 - Standalone) Proceso completado exitosamente.") sys.exit(0)
else: else:
print("\n(x5 - Standalone) Proceso finalizado con errores.", file=sys.stderr) sys.exit(1)

View File

@ -2,13 +2,11 @@ import os
import json import json
import subprocess import subprocess
import re import re
import traceback from typing import Dict, Any, List
from typing import Dict, Any, List, Optional
import time # Add this import import time # Add this import
from datetime import datetime # Add this import from datetime import datetime # Add this import
# --- ConfigurationManager Class ---
class ConfigurationManager: class ConfigurationManager:
def __init__(self): def __init__(self):
self.base_path = os.path.dirname(os.path.abspath(__file__)) self.base_path = os.path.dirname(os.path.abspath(__file__))
@ -20,7 +18,6 @@ class ConfigurationManager:
self.log_file = os.path.join(self.data_path, "log.txt") self.log_file = os.path.join(self.data_path, "log.txt")
self._init_log_file() self._init_log_file()
self.last_execution_time = 0 # Add this attribute self.last_execution_time = 0 # Add this attribute
# Minimum seconds between script executions to prevent rapid clicks
self.min_execution_interval = 1 # Minimum seconds between executions self.min_execution_interval = 1 # Minimum seconds between executions
def _init_log_file(self): def _init_log_file(self):
@ -31,7 +28,6 @@ class ConfigurationManager:
with open(self.log_file, "w", encoding="utf-8") as f: with open(self.log_file, "w", encoding="utf-8") as f:
f.write("") f.write("")
# --- Logging Methods ---
def append_log(self, message: str) -> None: def append_log(self, message: str) -> None:
"""Append a message to the CENTRAL log file with timestamp.""" """Append a message to the CENTRAL log file with timestamp."""
# This function now primarily logs messages from the app itself, # This function now primarily logs messages from the app itself,
@ -42,7 +38,6 @@ class ConfigurationManager:
lines_with_timestamp = [] lines_with_timestamp = []
for line in lines: for line in lines:
if line.strip(): if line.strip():
# Add timestamp only if line doesn't already have one (e.g., from script output)
if not line.strip().startswith("["): if not line.strip().startswith("["):
line = f"{timestamp}{line}" line = f"{timestamp}{line}"
lines_with_timestamp.append(f"{line}\n") lines_with_timestamp.append(f"{line}\n")
@ -86,7 +81,6 @@ class ConfigurationManager:
print(f"Error clearing log file: {e}") print(f"Error clearing log file: {e}")
return False return False
# --- Working Directory Methods ---
def set_working_directory(self, path: str) -> Dict[str, str]: def set_working_directory(self, path: str) -> Dict[str, str]:
"""Set and validate working directory.""" """Set and validate working directory."""
if not os.path.exists(path): if not os.path.exists(path):
@ -95,67 +89,13 @@ class ConfigurationManager:
self.working_directory = path self.working_directory = path
# Create default data.json if it doesn't exist # Create default data.json if it doesn't exist
# This data.json will be populated with defaults by get_config later if needed
data_path = os.path.join(path, "data.json") data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path): if not os.path.exists(data_path):
try: with open(data_path, "w") as f:
with open(data_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2) json.dump({}, f, indent=2)
print(
f"Info: Created empty data.json in working directory: {data_path}"
)
except Exception as e:
print(f"Error creating data.json in working directory {path}: {e}")
# Non-fatal, get_config will handle missing file
return {"status": "success", "path": path} return {"status": "success", "path": path}
def get_work_dir(self, group: str) -> Optional[str]:
"""Get working directory path for a script group from work_dir.json."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
path = data.get("path", "")
# Normalizar separadores de ruta
if path:
path = os.path.normpath(path)
# Actualizar la variable de instancia si hay una ruta válida y existe
if path and os.path.isdir(path): # Check if it's a directory
self.working_directory = path
return path
elif path:
print(
f"Warning: Stored working directory for group '{group}' is invalid or does not exist: {path}"
)
self.working_directory = None # Reset if invalid
return None
else:
self.working_directory = None # Reset if no path stored
return None
except (FileNotFoundError, json.JSONDecodeError):
self.working_directory = None # Reset if file missing or invalid
return None
except Exception as e:
print(f"Error reading work_dir.json for group '{group}': {e}")
self.working_directory = None
return None
def get_directory_history(self, group: str) -> List[str]:
"""Get the directory history for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Normalizar todos los paths en el historial
history = [os.path.normpath(p) for p in data.get("history", [])]
# Filtrar solo directorios que existen
return [
p for p in history if os.path.isdir(p)
] # Check if directory exists
except (FileNotFoundError, json.JSONDecodeError):
return []
def get_script_groups(self) -> List[Dict[str, Any]]: def get_script_groups(self) -> List[Dict[str, Any]]:
"""Returns list of available script groups with their descriptions.""" """Returns list of available script groups with their descriptions."""
groups = [] groups = []
@ -187,506 +127,189 @@ class ConfigurationManager:
print(f"Error reading group description: {e}") print(f"Error reading group description: {e}")
return {} return {}
# --- Configuration (data.json) Methods ---
def get_config(self, level: str, group: str = None) -> Dict[str, Any]: def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
""" """Get configuration for specified level."""
Get configuration for specified level.
Applies default values from the corresponding schema if the config
file doesn't exist or is missing keys with defaults.
"""
config_data = {}
needs_save = False
schema = None
data_path = None
schema_path_for_debug = "N/A" # For logging
# 1. Determine data path based on level
if level == "1":
data_path = os.path.join(self.data_path, "data.json")
schema_path_for_debug = os.path.join(self.data_path, "esquema_general.json")
elif level == "2":
if not group:
return {"error": "Group required for level 2 config"}
data_path = os.path.join(self.script_groups_path, group, "data.json")
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
elif level == "3":
# Level 3 config is always in the current working directory
if not self.working_directory:
return {} # Return empty config if working directory not set
data_path = os.path.join(self.working_directory, "data.json")
# Level 3 config might be based on level 3 schema (esquema_work.json)
if group:
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
else:
# If no group, we can't determine the L3 schema for defaults.
schema_path_for_debug = "N/A (Level 3 without group)"
else:
return {"error": f"Invalid level specified for config: {level}"}
# 2. Get the corresponding schema to check for defaults
try:
# Only attempt to load schema if needed (e.g., not L3 without group)
if not (level == "3" and not group):
schema = self.get_schema(
level, group
) # Use the robust get_schema method
else:
schema = None # Cannot determine L3 schema without group
except Exception as e:
print(
f"Warning: Could not load schema for level {level}, group {group}. Defaults will not be applied. Error: {e}"
)
schema = None # Ensure schema is None if loading failed
# 3. Try to load existing data
data_file_exists = os.path.exists(data_path)
if data_file_exists:
try:
with open(data_path, "r", encoding="utf-8") as f_data:
content = f_data.read()
if content.strip():
config_data = json.loads(content)
else:
print(
f"Warning: Data file {data_path} is empty. Will initialize with defaults."
)
needs_save = True # Force save if file was empty
except json.JSONDecodeError:
print(
f"Warning: Could not decode JSON from {data_path}. Will initialize with defaults."
)
config_data = {}
needs_save = True
except Exception as e:
print(
f"Error reading data from {data_path}: {e}. Will attempt to initialize with defaults."
)
config_data = {}
needs_save = True
except FileNotFoundError:
print(
f"Info: Data file not found at {data_path}. Will initialize with defaults."
)
needs_save = True # Mark for saving as it's a new file
# 4. Apply defaults from schema if schema was loaded successfully
if schema and isinstance(schema, dict) and "properties" in schema:
schema_properties = schema.get("properties", {})
if isinstance(schema_properties, dict): # Ensure properties is a dict
for key, prop_definition in schema_properties.items():
# Ensure prop_definition is a dictionary before checking 'default'
if (
isinstance(prop_definition, dict)
and key not in config_data
and "default" in prop_definition
):
print(
f"Info: Applying default for '{key}' from schema {schema_path_for_debug}"
)
config_data[key] = prop_definition["default"]
needs_save = (
True # Mark for saving because a default was applied
)
else:
print(
f"Warning: 'properties' in schema {schema_path_for_debug} is not a dictionary. Cannot apply defaults."
)
# 5. Save the file if it was created or updated with defaults
if needs_save and data_path:
try:
print(f"Info: Saving updated config data to: {data_path}")
os.makedirs(os.path.dirname(data_path), exist_ok=True)
with open(data_path, "w", encoding="utf-8") as f_data:
json.dump(config_data, f_data, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error: Could not write data file to {data_path}: {e}")
except Exception as e:
print(f"Unexpected error saving data to {data_path}: {e}")
# 6. Return the final configuration
return config_data
def update_config(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
path = None
if level == "1": if level == "1":
path = os.path.join(self.data_path, "data.json") path = os.path.join(self.data_path, "data.json")
elif level == "2": elif level == "2":
if not group:
return {
"status": "error",
"message": "Group required for level 2 config update",
}
path = os.path.join(self.script_groups_path, group, "data.json") path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3": elif level == "3":
if not self.working_directory: if not self.working_directory:
return { return {} # Return empty config if working directory not set
"status": "error",
"message": "Working directory not set for level 3 config update",
}
path = os.path.join(self.working_directory, "data.json") path = os.path.join(self.working_directory, "data.json")
else:
return {
"status": "error",
"message": f"Invalid level for config update: {level}",
}
try: try:
# Ensure directory exists with open(path, "r") as f:
os.makedirs(os.path.dirname(path), exist_ok=True) return json.load(f)
with open(path, "w", encoding="utf-8") as f: except FileNotFoundError:
json.dump(data, f, indent=2, ensure_ascii=False) return {} # Return empty config if file doesn't exist
print(f"Info: Config successfully updated at {path}")
return {"status": "success"}
except Exception as e:
print(f"Error updating config at {path}: {str(e)}")
return {"status": "error", "message": str(e)}
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level.""" """Get schema for specified level."""
schema_path = None
try: try:
# Clean level parameter # Clean level parameter
clean_level = str(level).split("-")[0] level = str(level).split("-")[0]
# Determine schema path based on level # Determine schema path based on level
if clean_level == "1": if level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json") path = os.path.join(self.data_path, "esquema_general.json")
elif clean_level == "2": elif level == "2":
if not group: path = os.path.join(
raise ValueError("Group is required for level 2 schema")
schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json" self.script_groups_path, group, "esquema_group.json"
) )
elif clean_level == "3": elif level == "3":
if not group: if not group:
# Level 3 schema (esquema_work) is tied to a group.
# If no group, we can't know which schema to load.
print(
"Warning: Group needed to determine level 3 schema (esquema_work.json). Returning empty schema."
)
return {"type": "object", "properties": {}} return {"type": "object", "properties": {}}
schema_path = os.path.join( path = os.path.join(self.script_groups_path, group, "esquema_work.json")
self.script_groups_path, group, "esquema_work.json"
)
else: else:
print(
f"Warning: Invalid level '{level}' for schema retrieval. Returning empty schema."
)
return {"type": "object", "properties": {}} return {"type": "object", "properties": {}}
# Read existing schema or create default if it doesn't exist # Read existing schema from whichever file exists
if os.path.exists(schema_path): if os.path.exists(path):
try: with open(path, "r", encoding="utf-8") as f:
with open(schema_path, "r", encoding="utf-8") as f:
schema = json.load(f) schema = json.load(f)
# Basic validation return (
if ( schema
not isinstance(schema, dict) if isinstance(schema, dict)
or "properties" not in schema else {"type": "object", "properties": {}}
or "type" not in schema
):
print(
f"Warning: Schema file {schema_path} has invalid structure. Returning default."
) )
return {"type": "object", "properties": {}}
# Ensure properties is a dict
if not isinstance(schema.get("properties"), dict):
print(
f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing."
)
schema["properties"] = {}
return schema
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from schema file: {schema_path}. Returning default."
)
return {"type": "object", "properties": {}}
except Exception as e:
print(
f"Error reading schema file {schema_path}: {e}. Returning default."
)
return {"type": "object", "properties": {}}
else:
print(
f"Info: Schema file not found at {schema_path}. Creating default schema."
)
default_schema = {"type": "object", "properties": {}}
try:
# Ensure directory exists before writing
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2, ensure_ascii=False)
return default_schema
except Exception as e:
print(f"Error creating default schema file at {schema_path}: {e}")
return {
"type": "object",
"properties": {},
} # Return empty if creation fails
except ValueError as ve: # Catch specific errors like missing group # Create default schema if no file exists
print(f"Error getting schema path: {ve}") default_schema = {"type": "object", "properties": {}}
return {"type": "object", "properties": {}} os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2)
return default_schema
except Exception as e: except Exception as e:
# Log the full path in case of unexpected errors print(f"Error loading schema: {str(e)}")
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Unexpected error loading schema from {error_path}: {str(e)}")
return {"type": "object", "properties": {}} return {"type": "object", "properties": {}}
def update_schema( def update_schema(
self, level: str, data: Dict[str, Any], group: str = None self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]: ) -> Dict[str, str]:
"""Update schema for specified level and clean corresponding config.""" """Update schema for specified level and clean corresponding config."""
schema_path = None
config_path = None
try: try:
# Clean level parameter if it contains extra info like '-edit'
clean_level = str(level).split("-")[0]
# Determinar rutas de schema y config # Determinar rutas de schema y config
if clean_level == "1": if level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json") schema_path = os.path.join(self.data_path, "esquema_general.json")
config_path = os.path.join(self.data_path, "data.json") config_path = os.path.join(self.data_path, "data.json")
elif clean_level == "2": elif level == "2":
if not group:
return {
"status": "error",
"message": "Group is required for level 2 schema update",
}
schema_path = os.path.join( schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json" self.script_groups_path, group, "esquema_group.json"
) )
config_path = os.path.join(self.script_groups_path, group, "data.json") config_path = os.path.join(self.script_groups_path, group, "data.json")
elif clean_level == "3": elif level == "3":
if not group: if not group:
return { return {
"status": "error", "status": "error",
"message": "Group is required for level 3 schema update", "message": "Group is required for level 3",
} }
schema_path = os.path.join( schema_path = os.path.join(
self.script_groups_path, group, "esquema_work.json" self.script_groups_path, group, "esquema_work.json"
) )
# Config path depends on whether working_directory is set and valid
config_path = ( config_path = (
os.path.join(self.working_directory, "data.json") os.path.join(self.working_directory, "data.json")
if self.working_directory if self.working_directory
and os.path.isdir(self.working_directory) # Check it's a directory
else None else None
) )
if not config_path:
print(
f"Warning: Working directory not set or invalid ('{self.working_directory}'). Level 3 config file will not be cleaned."
)
else: else:
return {"status": "error", "message": "Invalid level"} return {"status": "error", "message": "Invalid level"}
# Ensure directory exists # Ensure directory exists
os.makedirs(os.path.dirname(schema_path), exist_ok=True) os.makedirs(os.path.dirname(schema_path), exist_ok=True)
# Basic validation and normalization of the schema data being saved # Validate schema structure
if not isinstance(data, dict): if (
print( not isinstance(data, dict)
f"Warning: Invalid schema data received (not a dict). Wrapping in default structure." or "type" not in data
) or "properties" not in data
data = {"type": "object", "properties": {}} # Reset to default empty ):
if "type" not in data: data = {
data["type"] = "object" # Ensure type exists "type": "object",
if "properties" not in data or not isinstance(data["properties"], dict): "properties": data if isinstance(data, dict) else {},
print( }
f"Warning: Invalid or missing 'properties' in schema data. Resetting properties."
)
data["properties"] = {} # Ensure properties exists and is a dict
# Write schema # Write schema
with open(schema_path, "w", encoding="utf-8") as f: with open(schema_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False) json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Schema successfully updated at {schema_path}")
# Clean the corresponding config file *if* its path is valid # Clean corresponding config file
if config_path:
self._clean_config_for_schema(config_path, data) self._clean_config_for_schema(config_path, data)
else:
print(
f"Info: Config cleaning skipped for level {level} (no valid config path)."
)
return {"status": "success"} return {"status": "success"}
except Exception as e: except Exception as e:
error_path = schema_path if schema_path else f"Level {level}, Group {group}" print(f"Error updating schema: {str(e)}")
print(f"Error updating schema at {error_path}: {str(e)}")
# Consider adding traceback here for debugging
print(traceback.format_exc())
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
def _clean_config_for_schema( def _clean_config_for_schema(
self, config_path: str, schema: Dict[str, Any] self, config_path: str, schema: Dict[str, Any]
) -> None: ) -> None:
"""Clean configuration file to match schema structure.""" """Clean configuration file to match schema structure."""
# Check existence *before* trying to open if not config_path or not os.path.exists(config_path):
try:
if not os.path.exists(config_path):
print(
f"Info: Config file {config_path} not found for cleaning. Skipping."
)
return return
try:
# Cargar configuración actual # Cargar configuración actual
config = {}
content = "" # Store original content for comparison
with open(config_path, "r", encoding="utf-8") as f: with open(config_path, "r", encoding="utf-8") as f:
content = f.read() config = json.load(f)
if content.strip(): # Avoid error on empty file
config = json.loads(content)
else:
print(
f"Info: Config file {config_path} is empty. Cleaning will result in an empty object."
)
# Limpiar configuración recursivamente # Limpiar configuración recursivamente
cleaned_config = self._clean_object_against_schema(config, schema) cleaned_config = self._clean_object_against_schema(config, schema)
# Guardar configuración limpia solo si cambió o si el original estaba vacío # Guardar configuración limpia
# (para evitar escrituras innecesarias)
# Use dumps for reliable comparison, handle potential errors during dumps
try:
original_config_str = json.dumps(config, sort_keys=True)
cleaned_config_str = json.dumps(cleaned_config, sort_keys=True)
except TypeError as te:
print(
f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save."
)
original_config_str = "" # Force inequality
cleaned_config_str = " " # Force inequality
if original_config_str != cleaned_config_str or not content.strip():
print(f"Info: Cleaning config file: {config_path}")
with open(config_path, "w", encoding="utf-8") as f: with open(config_path, "w", encoding="utf-8") as f:
json.dump(cleaned_config, f, indent=2, ensure_ascii=False) json.dump(cleaned_config, f, indent=2, ensure_ascii=False)
else:
print(
f"Info: Config file {config_path} already matches schema. No cleaning needed."
)
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean."
)
except IOError as e:
print(f"Error accessing config file {config_path} during cleaning: {e}")
except Exception as e: except Exception as e:
print(f"Unexpected error cleaning config {config_path}: {str(e)}") print(f"Error cleaning config: {str(e)}")
# Consider adding traceback here
print(traceback.format_exc())
def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any: def _clean_object_against_schema(
"""Recursively clean data to match schema structure.""" self, data: Dict[str, Any], schema: Dict[str, Any]
# Ensure schema is a dictionary, otherwise cannot proceed ) -> Dict[str, Any]:
if not isinstance(schema, dict): """Recursively clean object to match schema structure."""
print( if not isinstance(data, dict) or not isinstance(schema, dict):
f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}"
)
return data
schema_type = schema.get("type")
if schema_type == "object":
if not isinstance(data, dict):
# If data is not a dict, but schema expects object, return empty dict
return {} return {}
# This 'result' and the loop should be inside the 'if schema_type == "object":' block
result = {} result = {}
schema_props = schema.get("properties", {}) schema_props = schema.get("properties", {})
# Ensure schema_props is a dictionary
if not isinstance(schema_props, dict):
print(
f"Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object."
)
return {}
for key, value in data.items(): for key, value in data.items():
# Solo mantener campos que existen en el schema # Solo mantener campos que existen en el schema
if key in schema_props: if key in schema_props:
# Recursively clean the value based on the property's schema
# Ensure the property schema itself is a dict before recursing
prop_schema = schema_props[key] prop_schema = schema_props[key]
if isinstance(prop_schema, dict):
result[key] = self._clean_object_against_schema( # Si es un objeto anidado, limpiar recursivamente
value, prop_schema if prop_schema.get("type") == "object":
) result[key] = self._clean_object_against_schema(value, prop_schema)
# Si es un enum, verificar que el valor sea válido
elif "enum" in prop_schema:
if value in prop_schema["enum"]:
result[key] = value
# Para otros tipos, mantener el valor
else: else:
# If property schema is invalid, maybe keep original value or omit? Let's omit. result[key] = value
print(
f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data."
)
# Return result should be OUTSIDE the loop, but INSIDE the 'if object' block
return result return result
elif schema_type == "array": def update_config(
if not isinstance(data, list): self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
if level == "3" and not self.working_directory:
return {"status": "error", "message": "Working directory not set"}
# If data is not a list, but schema expects array, return empty list if level == "1":
return [] path = os.path.join(self.data_path, "data.json")
# If schema defines items structure, clean each item elif level == "2":
items_schema = schema.get("items") path = os.path.join(self.script_groups_path, group, "data.json")
if isinstance( elif level == "3":
items_schema, dict path = os.path.join(self.working_directory, "data.json")
): # Check if 'items' schema is a valid dict
return [
self._clean_object_against_schema(item, items_schema)
for item in data
]
else:
# If no valid item schema, return list as is (or potentially filter based on basic types if needed)
# Let's return as is for now.
return data # Keep array items as they are if no valid 'items' schema defined
elif "enum" in schema: with open(path, "w") as f:
# Ensure enum values are defined as a list json.dump(data, f, indent=2)
enum_values = schema.get("enum")
if isinstance(enum_values, list):
# If schema has enum, keep data only if it's one of the allowed values
if data in enum_values:
return data
else:
# If value not in enum, return None or potentially the default value if specified?
# For cleaning, returning None or omitting might be safer. Let's return None.
return None # Or consider returning schema.get('default') if cleaning should apply defaults too
else:
# Invalid enum definition, return original data or None? Let's return None.
print(
f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'."
)
return None
# For basic types (string, integer, number, boolean, null), just return the data
# We could add type checking here if strict cleaning is needed,
# e.g., return None if type(data) doesn't match schema_type
elif schema_type in ["string", "integer", "number", "boolean", "null"]:
# Optional: Add stricter type check if needed
# expected_type_map = { "string": str, "integer": int, "number": (int, float), "boolean": bool, "null": type(None) }
# expected_types = expected_type_map.get(schema_type)
# if expected_types and not isinstance(data, expected_types):
# print(f"Warning: Type mismatch during cleaning. Expected {schema_type}, got {type(data)}. Returning None.")
# return None # Or schema.get('default')
return data
# If schema type is unknown or not handled, return data as is
else:
# This case might indicate an issue with the schema definition itself
# print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.")
return data
# --- Script Listing and Execution Methods ---
def list_scripts(self, group: str) -> List[Dict[str, str]]: def list_scripts(self, group: str) -> List[Dict[str, str]]:
"""List all scripts in a group with their descriptions.""" """List all scripts in a group with their descriptions."""
try: try:
@ -695,7 +318,7 @@ class ConfigurationManager:
if not os.path.exists(scripts_dir): if not os.path.exists(scripts_dir):
print(f"Directory not found: {scripts_dir}") print(f"Directory not found: {scripts_dir}")
return [] # Return empty list if group directory doesn't exist return []
for file in os.listdir(scripts_dir): for file in os.listdir(scripts_dir):
# Modificar la condición para incluir cualquier archivo .py # Modificar la condición para incluir cualquier archivo .py
@ -703,15 +326,15 @@ class ConfigurationManager:
path = os.path.join(scripts_dir, file) path = os.path.join(scripts_dir, file)
description = self._extract_script_description(path) description = self._extract_script_description(path)
print( print(
f"Debug: Found script: {file} with description: {description}" f"Found script: {file} with description: {description}"
) # Debug line ) # Debug line
scripts.append({"name": file, "description": description}) scripts.append({"name": file, "description": description})
print(f"Debug: Total scripts found in group '{group}': {len(scripts)}") print(f"Total scripts found: {len(scripts)}") # Debug line
return scripts return scripts
except Exception as e: except Exception as e:
print(f"Error listing scripts for group '{group}': {str(e)}") print(f"Error listing scripts: {str(e)}") # Debug line
return [] # Return empty list on error return []
def _extract_script_description(self, script_path: str) -> str: def _extract_script_description(self, script_path: str) -> str:
"""Extract description from script's docstring or initial comments.""" """Extract description from script's docstring or initial comments."""
@ -731,7 +354,9 @@ class ConfigurationManager:
return "No description available" return "No description available"
except Exception as e: except Exception as e:
print(f"Error extracting description from {script_path}: {str(e)}") print(
f"Error extracting description from {script_path}: {str(e)}"
) # Debug line
return "Error reading script description" return "Error reading script description"
def execute_script( def execute_script(
@ -745,9 +370,7 @@ class ConfigurationManager:
time_since_last = current_time - self.last_execution_time time_since_last = current_time - self.last_execution_time
if time_since_last < self.min_execution_interval: if time_since_last < self.min_execution_interval:
msg = f"Por favor espere {self.min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones" msg = f"Por favor espere {self.min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones"
self.append_log(f"Warning: {msg}") # Log throttling attempt if broadcast_fn: broadcast_fn(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "throttled", "error": msg} return {"status": "throttled", "error": msg}
self.last_execution_time = current_time self.last_execution_time = current_time
@ -758,38 +381,27 @@ class ConfigurationManager:
script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt") script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt")
if not os.path.exists(script_path): if not os.path.exists(script_path):
msg = f"Error Fatal: Script no encontrado en {script_path}" msg = f"Error: Script no encontrado en {script_path}"
self.append_log(msg) if broadcast_fn: broadcast_fn(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Script not found"} return {"status": "error", "error": "Script not found"}
# Get working directory specific to the group
working_dir = self.get_work_dir(group) working_dir = self.get_work_dir(group)
if not working_dir: if not working_dir:
msg = f"Error Fatal: Directorio de trabajo no configurado o inválido para el grupo '{group}'" msg = f"Error: Directorio de trabajo no configurado para el grupo '{group}'"
self.append_log(msg) if broadcast_fn: broadcast_fn(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Working directory not set"} return {"status": "error", "error": "Working directory not set"}
# Double check validity (get_work_dir should already do this)
if not os.path.isdir(working_dir): if not os.path.isdir(working_dir):
msg = f"Error Fatal: El directorio de trabajo '{working_dir}' no es válido o no existe." msg = f"Error: El directorio de trabajo '{working_dir}' no es válido o no existe."
self.append_log(msg) if broadcast_fn: broadcast_fn(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Invalid working directory"} return {"status": "error", "error": "Invalid working directory"}
# Aggregate configurations using the updated get_config
configs = { configs = {
"level1": self.get_config("1"), "level1": self.get_config("1"),
"level2": self.get_config("2", group), "level2": self.get_config("2", group),
"level3": self.get_config( "level3": self.get_config("3", group), # get_config now handles working dir lookup
"3", group
), # get_config uses self.working_directory
"working_directory": working_dir, "working_directory": working_dir,
} }
print(f"Debug: Aggregated configs for script execution: {configs}")
config_file_path = os.path.join(script_dir, "script_config.json") config_file_path = os.path.join(script_dir, "script_config.json")
try: try:
@ -798,10 +410,8 @@ class ConfigurationManager:
# Don't broadcast config saving unless debugging # Don't broadcast config saving unless debugging
# if broadcast_fn: broadcast_fn(f"Configuraciones guardadas en {config_file_path}") # if broadcast_fn: broadcast_fn(f"Configuraciones guardadas en {config_file_path}")
except Exception as e: except Exception as e:
msg = f"Error Fatal: No se pudieron guardar las configuraciones temporales en {config_file_path}: {str(e)}" msg = f"Error guardando configuraciones temporales: {str(e)}"
self.append_log(msg) if broadcast_fn: broadcast_fn(msg)
if broadcast_fn:
broadcast_fn(msg)
# Optionally return error here if config saving is critical # Optionally return error here if config saving is critical
stdout_capture = [] stdout_capture = []
@ -811,18 +421,16 @@ class ConfigurationManager:
try: try:
if broadcast_fn: if broadcast_fn:
start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..." broadcast_fn(f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}...")
broadcast_fn(start_msg)
# Execute the script
process = subprocess.Popen( process = subprocess.Popen(
["python", "-u", script_path], # Added -u for unbuffered output ["python", "-u", script_path], # Added -u for unbuffered output
cwd=working_dir, cwd=working_dir,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, text=True,
encoding="utf-8", encoding='utf-8',
errors="replace", errors='replace',
bufsize=1, bufsize=1,
env=dict(os.environ, PYTHONIOENCODING="utf-8"), env=dict(os.environ, PYTHONIOENCODING="utf-8"),
) )
@ -858,6 +466,7 @@ class ConfigurationManager:
# Always include stderr in the final log if present # Always include stderr in the final log if present
completion_msg += f" Se detectaron errores (ver log)." completion_msg += f" Se detectaron errores (ver log)."
if broadcast_fn: if broadcast_fn:
broadcast_fn(completion_msg) broadcast_fn(completion_msg)
@ -870,9 +479,7 @@ class ConfigurationManager:
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Duración: {duration}\n") log_f.write(f"Duración: {duration}\n")
log_f.write( log_f.write(f"Estado: {status.upper()} (Código de Salida: {return_code})\n")
f"Estado: {status.upper()} (Código de Salida: {return_code})\n"
)
log_f.write("\n--- SALIDA ESTÁNDAR (STDOUT) ---\n") log_f.write("\n--- SALIDA ESTÁNDAR (STDOUT) ---\n")
log_f.write("\n".join(stdout_capture)) log_f.write("\n".join(stdout_capture))
log_f.write("\n\n--- ERRORES (STDERR) ---\n") log_f.write("\n\n--- ERRORES (STDERR) ---\n")
@ -880,39 +487,29 @@ class ConfigurationManager:
log_f.write("\n--- FIN DEL LOG ---\n") log_f.write("\n--- FIN DEL LOG ---\n")
if broadcast_fn: if broadcast_fn:
broadcast_fn(f"Log completo guardado en: {script_log_path}") broadcast_fn(f"Log completo guardado en: {script_log_path}")
print(f"Info: Script log saved to {script_log_path}")
except Exception as log_e: except Exception as log_e:
err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}" err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}"
print(err_msg) print(err_msg)
if broadcast_fn: if broadcast_fn: broadcast_fn(err_msg)
broadcast_fn(err_msg)
# ------------------------------------------ # ------------------------------------------
return { return {
"status": status, "status": status,
"return_code": return_code, "return_code": return_code,
"error": stderr_capture if stderr_capture else None, "error": stderr_capture if stderr_capture else None,
"log_file": script_log_path, # Return path to the specific log "log_file": script_log_path # Return path to the specific log
} }
except Exception as e: except Exception as e:
end_time = datetime.now() end_time = datetime.now()
duration = end_time - start_time duration = end_time - start_time
error_msg = ( error_msg = f"Error inesperado durante la ejecución de {script_name}: {str(e)}"
f"Error inesperado durante la ejecución de {script_name}: {str(e)}" traceback_info = traceback.format_exc() # Get traceback
)
traceback_info = traceback.format_exc() # Get full traceback
print(error_msg) # Print to console as well print(error_msg) # Print to console as well
print(traceback_info) print(traceback_info)
self.append_log(
f"ERROR FATAL: {error_msg}\n{traceback_info}"
) # Log centrally
if broadcast_fn: if broadcast_fn:
# Ensure fatal errors are clearly marked in UI broadcast_fn(f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}")
broadcast_fn(
f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}"
)
# Attempt to write error to script-specific log # Attempt to write error to script-specific log
try: try:
@ -921,9 +518,7 @@ class ConfigurationManager:
log_f.write(f"Grupo: {group}\n") log_f.write(f"Grupo: {group}\n")
log_f.write(f"Directorio de Trabajo: {working_dir}\n") log_f.write(f"Directorio de Trabajo: {working_dir}\n")
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write( log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n")
f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n"
)
log_f.write(f"Duración: {duration}\n") log_f.write(f"Duración: {duration}\n")
log_f.write(f"Estado: FATAL ERROR\n") log_f.write(f"Estado: FATAL ERROR\n")
log_f.write("\n--- ERROR ---\n") log_f.write("\n--- ERROR ---\n")
@ -932,10 +527,8 @@ class ConfigurationManager:
log_f.write(traceback_info) # Include traceback in log log_f.write(traceback_info) # Include traceback in log
log_f.write("\n--- FIN DEL LOG ---\n") log_f.write("\n--- FIN DEL LOG ---\n")
except Exception as log_e: except Exception as log_e:
err_msg_log = ( print(f"Error adicional al intentar guardar el log de error: {log_e}")
f"Error adicional al intentar guardar el log de error: {log_e}"
)
print(err_msg_log)
return {"status": "error", "error": error_msg, "traceback": traceback_info} return {"status": "error", "error": error_msg, "traceback": traceback_info}
finally: finally:
@ -946,6 +539,23 @@ class ConfigurationManager:
if process and process.stdout: if process and process.stdout:
process.stdout.close() process.stdout.close()
def get_work_dir(self, group: str) -> str:
"""Get working directory path for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r") as f:
data = json.load(f)
path = data.get("path", "")
# Normalizar separadores de ruta
if path:
path = os.path.normpath(path)
# Actualizar la variable de instancia si hay una ruta válida
if path and os.path.exists(path):
self.working_directory = path
return path
except (FileNotFoundError, json.JSONDecodeError):
return ""
def set_work_dir(self, group: str, path: str) -> Dict[str, str]: def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
"""Set working directory path for a script group and update history.""" """Set working directory path for a script group and update history."""
# Normalizar el path recibido # Normalizar el path recibido
@ -959,7 +569,7 @@ class ConfigurationManager:
try: try:
# Cargar datos existentes o crear nuevos # Cargar datos existentes o crear nuevos
try: try:
with open(work_dir_path, "r", encoding="utf-8") as f: with open(work_dir_path, "r") as f:
data = json.load(f) data = json.load(f)
# Normalizar paths existentes en el historial # Normalizar paths existentes en el historial
if "history" in data: if "history" in data:
@ -986,7 +596,7 @@ class ConfigurationManager:
data["history"] = data["history"][:10] data["history"] = data["history"][:10]
# Guardar datos actualizados # Guardar datos actualizados
with open(work_dir_path, "w", encoding="utf-8") as f: with open(work_dir_path, "w") as f:
json.dump(data, f, indent=2) json.dump(data, f, indent=2)
# Actualizar la variable de instancia # Actualizar la variable de instancia
@ -995,9 +605,22 @@ class ConfigurationManager:
# Crear data.json en el directorio de trabajo si no existe # Crear data.json en el directorio de trabajo si no existe
data_path = os.path.join(path, "data.json") data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path): if not os.path.exists(data_path):
with open(data_path, "w", encoding="utf-8") as f: with open(data_path, "w") as f:
json.dump({}, f, indent=2) json.dump({}, f, indent=2)
return {"status": "success", "path": path} return {"status": "success", "path": path}
except Exception as e: except Exception as e:
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
def get_directory_history(self, group: str) -> List[str]:
"""Get the directory history for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r") as f:
data = json.load(f)
# Normalizar todos los paths en el historial
history = [os.path.normpath(p) for p in data.get("history", [])]
# Filtrar solo directorios que existen
return [p for p in history if os.path.exists(p)]
except (FileNotFoundError, json.JSONDecodeError):
return []

View File

@ -1,21 +1,35 @@
[17:15:12] Iniciando ejecución de x1.py en C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS... [23:43:07] Iniciando ejecución de x3.py en C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport...
[17:15:14] Working directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS [23:43:07] --- AML (CAx Export) to Hierarchical JSON and Obsidian MD Converter (v28 - Working Directory Integration) ---
[17:15:14] Input directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS [23:43:07] Using Working Directory for Output: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport
[17:15:14] Output directory: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs [23:43:11] Input AML: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.aml
[17:15:14] Cronologia file: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md [23:43:11] Output Directory: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport
[17:15:14] Attachments directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\adjuntos [23:43:11] Output JSON: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.hierarchical.json
[17:15:14] Beautify rules file: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\config\beautify_rules.json [23:43:11] Output Main Tree MD: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_Hardware_Tree.md
[17:15:14] Found 1 .eml files [23:43:11] Output IO Debug Tree MD: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_IO_Upward_Debug.md
[17:15:14] Loaded 0 existing messages [23:43:11] Processing AML file: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.aml
[17:15:14] Processing C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS.eml [23:43:11] Pass 1: Found 203 InternalElement(s). Populating device dictionary...
[17:15:14] Aplicando reglas de prioridad 1 [23:43:11] Pass 2: Identifying PLCs and Networks (Refined v2)...
[17:15:14] Aplicando reglas de prioridad 2 [23:43:11] Identified Network: PROFIBUS_1 (bcc6f2bd-3d71-4407-90f2-bccff6064051) Type: Profibus
[17:15:14] Aplicando reglas de prioridad 3 [23:43:11] Identified Network: ETHERNET_1 (c6d49787-a076-4592-994d-876eea123dfd) Type: Ethernet/Profinet
[17:15:14] Aplicando reglas de prioridad 4 [23:43:11] Identified PLC: PLC (a48e038f-0bcc-4b48-8373-033da316c62b) - Type: CPU 1516F-3 PN/DP OrderNo: 6ES7 516-3FP03-0AB0
[17:15:14] Estadísticas de procesamiento: [23:43:11] Pass 3: Processing InternalLinks (Robust Network Mapping & IO)...
[17:15:14] - Total mensajes encontrados: 1 [23:43:11] Found 118 InternalLink(s).
[17:15:14] - Mensajes únicos añadidos: 1 [23:43:11] Mapping Device/Node 'E1' (NodeID:1643b51f-7067-4565-8f8e-109a1a775fed, Addr:10.1.33.11) to Network 'ETHERNET_1'
[17:15:14] - Mensajes duplicados ignorados: 0 [23:43:11] --> Associating Network 'ETHERNET_1' with PLC 'PLC' (via Node 'E1' Addr: 10.1.33.11)
[17:15:14] Writing 1 messages to C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md [23:43:11] Mapping Device/Node 'P1' (NodeID:5aff409b-2573-485f-82bf-0e08c9200086, Addr:1) to Network 'PROFIBUS_1'
[17:15:14] Ejecución de x1.py finalizada (success). Duración: 0:00:01.628641. [23:43:11] --> Associating Network 'PROFIBUS_1' with PLC 'PLC' (via Node 'P1' Addr: 1)
[17:15:14] Log completo guardado en: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\log_x1.txt [23:43:11] Mapping Device/Node 'PB1' (NodeID:c796e175-c770-43f0-8191-fc91996c0147, Addr:12) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:0b44f55a-63c1-49e8-beea-24dc5d3226e3, Addr:20) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:25cfc251-f946-40c5-992d-ad6387677acb, Addr:21) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:57999375-ec72-46ef-8ec2-6c3178e8acf8, Addr:22) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:54e8db6a-9443-41a4-a85b-cf0722c1d299, Addr:10) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:4786bab6-4097-4651-ac19-6cadfc7ea735, Addr:8) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:1f08afcb-111f-428f-915e-69363af1b09a, Addr:40) to Network 'PROFIBUS_1'
[23:43:11] Data extraction and structuring complete.
[23:43:11] Generating JSON output: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.hierarchical.json
[23:43:11] JSON data written successfully.
[23:43:11] Markdown summary written to: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_Hardware_Tree.md
[23:43:11] IO upward debug tree written to: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_IO_Upward_Debug.md
[23:43:11] Script finished.
[23:43:12] Ejecución de x3.py finalizada (success). Duración: 0:00:05.235415.
[23:43:12] Log completo guardado en: d:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\ObtainIOFromProjectTia\log_x3.txt

BIN
icon.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

View File

@ -1,16 +1,15 @@
beautifulsoup4==4.13.4 flask
Flask==3.1.0 flask-sock
flask_sock==0.7.0 lxml
html2text==2025.4.15 pandas
langid==1.1.6 google-cloud-translate
lxml==5.4.0 openai
mammoth==1.9.0 ollama
ollama==0.4.8 langid
openai==1.77.0 openpyxl
openpyxl==3.1.5 beautifulsoup4
pandas==2.2.3 requests
protobuf==6.30.2 mammoth
pypandoc==1.15 html2text
Requests==2.32.3 pypandoc
siemens_tia_scripting==1.0.7 # siemens-tia-scripting # Requiere instalación especial de TIA Portal Openness
sympy==1.13.3

View File

@ -418,12 +418,6 @@ function createFieldEditor(key, field) {
class="w-full p-2 border rounded" class="w-full p-2 border rounded"
onchange="updateVisualSchema()"> onchange="updateVisualSchema()">
</div> </div>
<div>
<label class="block text-sm font-bold mb-2">Valor por Defecto</label>
<input type="text" value="${field.default !== undefined ? field.default : ''}"
class="w-full p-2 border rounded"
onchange="updateVisualSchema()">
</div>
</div> </div>
${field.enum ? ` ${field.enum ? `
<div class="enum-container mt-4"> <div class="enum-container mt-4">
@ -500,55 +494,28 @@ function updateVisualSchema() {
const inputs = field.getElementsByTagName('input'); const inputs = field.getElementsByTagName('input');
const select = field.getElementsByTagName('select')[0]; const select = field.getElementsByTagName('select')[0];
const key = inputs[0].value; const key = inputs[0].value;
const fieldType = select.value; // string, directory, number, boolean, enum
const title = inputs[1].value;
const description = inputs[2].value;
const defaultValueInput = inputs[3]; // El nuevo input de valor por defecto
const defaultValueString = defaultValueInput.value;
let propertyDefinition = {
type: fieldType === 'directory' || fieldType === 'enum' ? 'string' : fieldType, // El tipo base
title: title,
description: description
};
// Añadir formato específico si es directorio
if (select.value === 'directory') { if (select.value === 'directory') {
propertyDefinition.format = 'directory'; schema.properties[key] = {
} type: 'string',
format: 'directory',
// Añadir enum si es de tipo enum title: inputs[1].value,
if (select.value === 'enum') { description: inputs[2].value
propertyDefinition.enum = field.querySelector('textarea').value.split('\n').filter(v => v.trim()); };
} } else if (select.value === 'enum') {
schema.properties[key] = {
// Procesar y añadir el valor por defecto si se proporcionó type: 'string',
if (defaultValueString !== null && defaultValueString.trim() !== '') { title: inputs[1].value,
let typedDefaultValue = defaultValueString; description: inputs[2].value,
try { enum: field.querySelector('textarea').value.split('\n').filter(v => v.trim())
if (propertyDefinition.type === 'number' || propertyDefinition.type === 'integer') { };
typedDefaultValue = Number(defaultValueString);
if (isNaN(typedDefaultValue)) {
console.warn(`Valor por defecto inválido para número en campo '${key}': ${defaultValueString}. Se omitirá.`);
// No añadir default si no es un número válido
} else { } else {
// Opcional: truncar si el tipo es integer schema.properties[key] = {
if (propertyDefinition.type === 'integer' && !Number.isInteger(typedDefaultValue)) { type: select.value,
typedDefaultValue = Math.trunc(typedDefaultValue); title: inputs[1].value,
description: inputs[2].value
};
} }
propertyDefinition.default = typedDefaultValue;
}
} else if (propertyDefinition.type === 'boolean') {
typedDefaultValue = ['true', '1', 'yes', 'on'].includes(defaultValueString.toLowerCase());
propertyDefinition.default = typedDefaultValue;
} else { // string, enum, directory
propertyDefinition.default = typedDefaultValue; // Ya es string
}
} catch (e) {
console.error(`Error procesando valor por defecto para campo '${key}':`, e);
}
}
schema.properties[key] = propertyDefinition;
}); });
const jsonEditor = document.getElementById('json-editor'); const jsonEditor = document.getElementById('json-editor');
@ -993,81 +960,6 @@ function collectFormData(level) {
return data; return data;
} }
// Añade esta función al final de tu archivo static/js/script.js
function shutdownServer() {
if (confirm("¿Estás seguro de que quieres detener el servidor? La aplicación se cerrará.")) {
fetch('/_shutdown', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
})
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
alert("El servidor se está deteniendo. Puede que necesites cerrar esta pestaña manualmente.");
// Opcionalmente, puedes intentar cerrar la ventana/pestaña
// window.close(); // Esto puede no funcionar en todos los navegadores por seguridad
document.body.innerHTML = '<div class="alert alert-info">El servidor se ha detenido. Cierra esta ventana.</div>';
} else {
alert("Error al intentar detener el servidor: " + data.message);
}
})
.catch(error => {
// Es normal recibir un error de red aquí porque el servidor se está apagando
console.warn("Error esperado al detener el servidor (puede que ya se haya detenido):", error);
alert("Solicitud de detención enviada. El servidor debería detenerse. Cierra esta ventana.");
document.body.innerHTML = '<div class="alert alert-info">El servidor se está deteniendo. Cierra esta ventana.</div>';
});
}
}
// Asegúrate de que las funciones fetchLogs y clearLogs también estén definidas en este archivo si las usas.
// Ejemplo de fetchLogs y clearLogs (si no las tienes ya):
function fetchLogs() {
fetch('/api/logs')
.then(response => response.json())
.then(data => {
const logOutput = document.getElementById('log-output');
logOutput.textContent = data.logs || 'No hay logs.';
logOutput.scrollTop = logOutput.scrollHeight; // Scroll to bottom
})
.catch(error => console.error('Error fetching logs:', error));
}
function clearLogs() {
if (confirm("¿Estás seguro de que quieres borrar los logs?")) {
fetch('/api/logs', { method: 'DELETE' })
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
fetchLogs(); // Refresh logs after clearing
showToast('Logs borrados correctamente.');
} else {
showToast('Error al borrar los logs.', 'error');
}
})
.catch(error => {
console.error('Error clearing logs:', error);
showToast('Error de red al borrar los logs.', 'error');
});
}
}
// Necesitarás una función showToast o similar si la usas
function showToast(message, type = 'success') {
// Implementa tu lógica de Toast aquí
console.log(`Toast (${type}): ${message}`);
alert(`Toast (${type}): ${message}`); // Simple alert como placeholder
}
// Llama a fetchLogs al cargar la página si es necesario
// document.addEventListener('DOMContentLoaded', fetchLogs);
// Agregar función para guardar configuración // Agregar función para guardar configuración
async function saveConfig(level) { async function saveConfig(level) {
const saveButton = document.getElementById(`save-config-${level}`); const saveButton = document.getElementById(`save-config-${level}`);

View File

@ -68,13 +68,6 @@
</div> </div>
</div> </div>
</div> </div>
<!-- Botón para detener el servidor -->
<div class="mt-8 pt-4 border-t border-gray-300">
<button class="w-full bg-red-600 hover:bg-red-700 text-white px-4 py-2 rounded shadow" onclick="shutdownServer()">
Detener Servidor
</button>
</div>
</div> </div>
</div> </div>