Compare commits

...

4 Commits

30 changed files with 6191 additions and 751 deletions

174
.gitignore vendored Normal file
View File

@ -0,0 +1,174 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

110
app.py
View File

@ -1,9 +1,18 @@
from flask import Flask, render_template, request, jsonify, url_for
from flask_sock import Sock
from config_manager import ConfigurationManager
from datetime import datetime
import os
import json # Added import
from datetime import datetime
import time # Added for shutdown delay
# --- Imports for System Tray Icon ---
import threading
import webbrowser
import sys
import requests # To send shutdown request
from PIL import Image
import pystray
app = Flask(
__name__, static_url_path="", static_folder="static", template_folder="templates"
@ -14,6 +23,9 @@ config_manager = ConfigurationManager()
# Lista global para mantener las conexiones WebSocket activas
websocket_connections = set()
# --- Globals for Tray Icon ---
tray_icon = None
@sock.route("/ws")
def handle_websocket(ws):
@ -227,5 +239,99 @@ def get_directory_history(group):
return jsonify(history)
# --- System Tray Icon Functions ---
def run_flask():
"""Runs the Flask app."""
print("Starting Flask server on http://127.0.0.1:5000/")
try:
# use_reloader=False is important when running in a thread
# For production, consider using waitress or gunicorn instead of app.run
app.run(host='127.0.0.1', port=5000, debug=True, use_reloader=False)
except Exception as e:
print(f"Error running Flask app: {e}")
# Optionally try to stop the tray icon if Flask fails critically
if tray_icon:
print("Attempting to stop tray icon due to Flask error.")
tray_icon.stop()
def open_app_browser(icon, item):
"""Callback function to open the browser."""
print("Opening application in browser...")
webbrowser.open("http://127.0.0.1:5000/")
def shutdown_flask_server():
"""Attempts to gracefully shut down the Werkzeug server."""
try:
# This requires the development server (werkzeug)
# Send a request to a special shutdown route
requests.post("http://127.0.0.1:5000/_shutdown", timeout=1)
except Exception as e:
print(f"Could not send shutdown request to Flask server: {e}")
print("Flask server might need to be closed manually.")
def stop_icon_thread():
"""Helper function to stop the icon after a delay, allowing HTTP response."""
time.sleep(0.1) # Small delay to allow the HTTP response to be sent
if tray_icon:
print("Stopping tray icon from shutdown route...")
tray_icon.stop()
else:
print("Tray icon not available to stop.")
# As a last resort if the icon isn't running for some reason
# print("Attempting os._exit(0) as fallback.")
# os._exit(0) # Force exit - use with caution
@app.route('/_shutdown', methods=['POST'])
def shutdown_route():
"""Internal route to shut down the application via the tray icon."""
print("Shutdown endpoint called.")
# Stop the main application thread by stopping the tray icon.
# Do this in a separate thread to allow the HTTP response to return first.
stopper = threading.Thread(target=stop_icon_thread, daemon=True)
stopper.start()
print("Shutdown signal sent to tray icon thread.")
return jsonify(status="success", message="Application shutdown initiated..."), 200
def exit_application(icon, item):
"""Callback function to exit the application."""
print("Exit requested via tray menu.")
# Just stop the icon. This will end the main thread, and the daemon Flask thread will exit.
print("Stopping tray icon...")
if icon: # pystray passes the icon object
icon.stop()
elif tray_icon: # Fallback just in case
tray_icon.stop()
if __name__ == "__main__":
app.run(debug=True)
# --- Start Flask in a background thread ---
flask_thread = threading.Thread(target=run_flask, daemon=True)
flask_thread.start()
# --- Setup and run the system tray icon ---
icon_path = r"d:\Proyectos\Scripts\ParamManagerScripts\icon.png" # Use absolute path
try:
image = Image.open(icon_path)
menu = pystray.Menu(
pystray.MenuItem("Abrir ParamManager", open_app_browser, default=True),
pystray.MenuItem("Salir", exit_application)
)
tray_icon = pystray.Icon("ParamManager", image, "ParamManager", menu)
print("Starting system tray icon...")
tray_icon.run() # This blocks the main thread until icon.stop() is called
except FileNotFoundError:
print(f"Error: Icono no encontrado en '{icon_path}'. El icono de notificación no se iniciará.", file=sys.stderr)
print("La aplicación Flask seguirá ejecutándose en segundo plano. Presiona Ctrl+C para detenerla si es necesario.")
# Keep the main thread alive so the Flask thread doesn't exit immediately
# This allows Flask to continue running even without the tray icon.
try:
while flask_thread.is_alive():
flask_thread.join(timeout=1.0) # Wait indefinitely
except KeyboardInterrupt:
print("\nCtrl+C detectado. Intentando detener Flask...")
shutdown_flask_server() # Try to shutdown Flask on Ctrl+C too
print("Saliendo.")
except Exception as e:
print(f"Error al iniciar el icono de notificación: {e}", file=sys.stderr)
print("Aplicación finalizada.")

View File

@ -0,0 +1,34 @@
--- Log de Ejecución: x1.py ---
Grupo: EmailCrono
Directorio de Trabajo: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Inicio: 2025-05-03 17:15:12
Fin: 2025-05-03 17:15:14
Duración: 0:00:01.628641
Estado: SUCCESS (Código de Salida: 0)
--- SALIDA ESTÁNDAR (STDOUT) ---
Working directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Input directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
Output directory: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs
Cronologia file: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
Attachments directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\adjuntos
Beautify rules file: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\config\beautify_rules.json
Found 1 .eml files
Loaded 0 existing messages
Processing C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS.eml
Aplicando reglas de prioridad 1
Aplicando reglas de prioridad 2
Aplicando reglas de prioridad 3
Aplicando reglas de prioridad 4
Estadísticas de procesamiento:
- Total mensajes encontrados: 1
- Mensajes únicos añadidos: 1
- Mensajes duplicados ignorados: 0
Writing 1 messages to C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
--- ERRORES (STDERR) ---
Ninguno
--- FIN DEL LOG ---

View File

@ -0,0 +1,14 @@
{
"level1": {
"api_key": "your-api-key-here",
"model": "gpt-3.5-turbo"
},
"level2": {
"attachments_dir": "adjuntos",
"cronologia_file": "cronologia.md"
},
"level3": {
"output_directory": "C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs"
},
"working_directory": "C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS"
}

View File

@ -1,6 +1,8 @@
{
"path": "C:\\Trabajo\\VM\\40 - 93040 - HENKEL - NEXT2 Problem\\Reporte\\EmailTody",
"path": "C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS",
"history": [
"C:\\Trabajo\\SIDEL\\EMAILs\\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS",
"C:\\Estudio",
"C:\\Trabajo\\VM\\40 - 93040 - HENKEL - NEXT2 Problem\\Reporte\\EmailTody",
"C:\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Reporte\\Emails",
"C:\\Users\\migue\\OneDrive\\Miguel\\Obsidean\\Trabajo\\VM\\30 - 9.3941- Kosme - Portogallo (Modifica + Linea)\\Emails",

View File

@ -0,0 +1,6 @@
{
"name": "Exportador de objetos de Tia Portal y procesador de CAx",
"description": "Este conjunto de scripts exporta desde Tia Portal los objetos en fomarto XML y los objetos CAx. Luego se puede generar documentacion desde estos CAx de la periferia IO del PLC exportado.",
"version": "1.0",
"author": "Miguel"
}

View File

@ -0,0 +1,11 @@
{
"scl_output_dir": "scl_output",
"xref_output_dir": "xref_output",
"xref_source_subdir": "source",
"call_xref_filename": "xref_calls_tree.md",
"db_usage_xref_filename": "xref_db_usage_summary.md",
"plc_tag_xref_filename": "xref_plc_tags_summary.md",
"max_call_depth": 5,
"max_users_list": 20,
"aggregated_filename": "full_project_representation.md"
}

View File

@ -0,0 +1,6 @@
{
"name": "Procesador de XML exportado de TIA",
"description": "Conjunto de scripts que procesan archivos XML exportados de TIA, conviertiendo los objetos LAD a SCL y generando documentación en formato Markdown. ",
"version": "1.0",
"author": "Miguel"
}

View File

@ -1,4 +1,59 @@
{
"type": "object",
"properties": {}
"properties": {
"scl_output_dir": {
"type": "string",
"title": "Directorio Salida SCL/MD (x3)",
"description": "Nombre del directorio (relativo a la raíz del proyecto PLC) donde x3 genera archivos .scl/.md, y x4/x5 leen.",
"default": "scl_output"
},
"xref_output_dir": {
"type": "string",
"title": "Directorio Salida XRef (x4)",
"description": "Nombre del directorio (relativo a la raíz del proyecto PLC) donde x4 genera archivos de referencias cruzadas.",
"default": "xref_output"
},
"xref_source_subdir": {
"type": "string",
"title": "Subdirectorio Fuentes XRef (x4)",
"description": "Nombre del subdirectorio dentro de xref_output_dir donde x4 coloca archivos fuente (.md) preparados para enlaces Obsidian.",
"default": "source"
},
"call_xref_filename": {
"type": "string",
"title": "Nombre Archivo Árbol Llamadas (x4)",
"description": "Nombre del archivo para la salida del árbol de llamadas generado por x4.",
"default": "xref_calls_tree.md"
},
"db_usage_xref_filename": {
"type": "string",
"title": "Nombre Archivo Uso DBs (x4)",
"description": "Nombre del archivo para el resumen de uso de DBs generado por x4.",
"default": "xref_db_usage_summary.md"
},
"plc_tag_xref_filename": {
"type": "string",
"title": "Nombre Archivo Uso PLC Tags (x4)",
"description": "Nombre del archivo para el resumen de uso de PLC Tags generado por x4.",
"default": "xref_plc_tags_summary.md"
},
"max_call_depth": {
"type": "integer",
"title": "Profundidad Máx. Árbol Llamadas (x4)",
"description": "Profundidad máxima de recursión para el árbol de llamadas generado por x4.",
"default": 5
},
"max_users_list": {
"type": "integer",
"title": "Máx. Usuarios Listados (x4)",
"description": "Número máximo de usuarios listados por DB/Tag en los resúmenes generados por x4.",
"default": 20
},
"aggregated_filename": {
"type": "string",
"title": "Nombre Archivo Agregado (x5)",
"description": "Nombre del archivo Markdown agregado final generado por x5 (se guarda en el directorio de trabajo principal).",
"default": "full_project_representation.md"
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,19 @@
{
"level1": {
"api_key": "your-api-key-here",
"model": "gpt-3.5-turbo"
},
"level2": {
"scl_output_dir": "scl_output",
"xref_output_dir": "xref_output",
"xref_source_subdir": "source",
"call_xref_filename": "xref_calls_tree.md",
"db_usage_xref_filename": "xref_db_usage_summary.md",
"plc_tag_xref_filename": "xref_plc_tags_summary.md",
"max_call_depth": 5,
"max_users_list": 20,
"aggregated_filename": "full_project_representation.md"
},
"level3": {},
"working_directory": "C:\\Trabajo\\SIDEL\\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\\Reporte\\IOExport"
}

View File

@ -0,0 +1,6 @@
{
"path": "C:\\Trabajo\\SIDEL\\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\\Reporte\\IOExport",
"history": [
"C:\\Trabajo\\SIDEL\\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\\Reporte\\IOExport"
]
}

View File

@ -23,15 +23,16 @@ script_root = os.path.dirname(
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Funciones (get_console_encoding - sin cambios) ---
def get_console_encoding():
try:
return locale.getpreferredencoding(False)
except Exception:
return "cp1252"
# <-- NUEVO: Importar funciones directamente -->
from x1_to_json import convert_xml_to_json
from x2_process import process_json_to_scl
from x3_generate_scl import generate_scl_or_markdown
# <-- NUEVO: Importar funciones de x4 y x5 -->
from x4_cross_reference import generate_cross_references # Asumiendo que x4_cross_reference.py tiene esta función
from x5_aggregate import aggregate_outputs
CONSOLE_ENCODING = get_console_encoding()
CONSOLE_ENCODING = "utf-8"
# <-- NUEVO: Importar format_variable_name (necesario para predecir nombre de salida) -->
try:
@ -85,117 +86,7 @@ def log_message(message, log_file_handle, also_print=True):
# <-- FIN NUEVO -->
# <-- MODIFICADO: run_script para aceptar log_file_handle -->
def run_script(script_name, xml_arg, log_file_handle, *extra_args):
"""Runs a given script, logs output, and returns success status."""
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), script_name)
python_executable = sys.executable
command = [python_executable, script_path, os.path.abspath(xml_arg)]
command.extend(extra_args)
# Loguear el comando que se va a ejecutar
log_message(
f"--- Running {script_name} with arguments: {[os.path.relpath(arg) if isinstance(arg, str) and os.path.exists(arg) else arg for arg in command[2:]]} ---",
log_file_handle,
)
try:
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
encoding=CONSOLE_ENCODING,
errors="replace",
)
stdout_clean = result.stdout.strip() if result.stdout else ""
stderr_clean = result.stderr.strip() if result.stderr else ""
# Loguear stdout si existe
if stdout_clean:
log_message(
f"--- Stdout ({script_name}) ---", log_file_handle, also_print=False
) # Loguear encabezado
log_message(
stdout_clean, log_file_handle, also_print=True
) # Loguear y mostrar contenido
log_message(
f"--- End Stdout ({script_name}) ---", log_file_handle, also_print=False
) # Loguear fin
# Loguear stderr si existe
if stderr_clean:
# Usar log_message también para stderr, pero imprimir en consola como error
log_message(
f"--- Stderr ({script_name}) ---", log_file_handle, also_print=False
) # Loguear encabezado
log_message(
stderr_clean, log_file_handle, also_print=False
) # Loguear contenido
log_message(
f"--- End Stderr ({script_name}) ---", log_file_handle, also_print=False
) # Loguear fin
# Imprimir stderr en la consola de error estándar
print(f"--- Stderr ({script_name}) ---", file=sys.stderr)
print(stderr_clean, file=sys.stderr)
print("--------------------------", file=sys.stderr)
return True # Éxito
except FileNotFoundError:
error_msg = f"Error: Script '{script_path}' or Python executable '{python_executable}' not found."
log_message(error_msg, log_file_handle, also_print=False) # Loguear error
print(error_msg, file=sys.stderr) # Mostrar error en consola
return False
except subprocess.CalledProcessError as e:
error_msg = f"Error running {script_name}: Script returned non-zero exit code {e.returncode}."
log_message(error_msg, log_file_handle, also_print=False) # Loguear error
print(error_msg, file=sys.stderr) # Mostrar error en consola
stdout_decoded = e.stdout.strip() if e.stdout else ""
stderr_decoded = e.stderr.strip() if e.stderr else ""
if stdout_decoded:
log_message(
f"--- Stdout ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
log_message(stdout_decoded, log_file_handle, also_print=False)
log_message(
f"--- End Stdout ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
print(f"--- Stdout ({script_name}) ---", file=sys.stderr)
print(stdout_decoded, file=sys.stderr)
if stderr_decoded:
log_message(
f"--- Stderr ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
log_message(stderr_decoded, log_file_handle, also_print=False)
log_message(
f"--- End Stderr ({script_name} - Error) ---",
log_file_handle,
also_print=False,
)
print(f"--- Stderr ({script_name}) ---", file=sys.stderr)
print(stderr_decoded, file=sys.stderr)
print("--------------------------", file=sys.stderr)
return False
except Exception as e:
error_msg = f"An unexpected error occurred while running {script_name}: {e}"
log_message(error_msg, log_file_handle, also_print=False) # Loguear error
traceback_str = traceback.format_exc()
log_message(
traceback_str, log_file_handle, also_print=False
) # Loguear traceback
print(error_msg, file=sys.stderr) # Mostrar error en consola
traceback.print_exc(file=sys.stderr) # Mostrar traceback en consola
return False
# <-- run_script ya no es necesaria -->
# --- Función check_skip_status (sin cambios en su lógica interna) ---
@ -266,16 +157,29 @@ def check_skip_status(
return status
# --- Constantes ---
AGGREGATED_FILENAME = "full_project_representation.md"
SCL_OUTPUT_DIRNAME = "scl_output"
XREF_OUTPUT_DIRNAME = "xref_output"
# --- Bloque Principal ---
if __name__ == "__main__":
configs = load_configuration()
working_directory = configs.get("working_directory")
group_config = configs.get("level2", {})
# <-- NUEVO: Leer parámetros de configuración para x3, x4, x5 -->
xml_parser_config = configs.get("XML Parser to SCL", {})
cfg_scl_output_dirname = xml_parser_config.get("scl_output_dir", "scl_output")
cfg_xref_output_dirname = xml_parser_config.get("xref_output_dir", "xref_output")
cfg_xref_source_subdir = xml_parser_config.get("xref_source_subdir", "source")
cfg_call_xref_filename = xml_parser_config.get("call_xref_filename", "xref_calls_tree.md")
cfg_db_usage_xref_filename = xml_parser_config.get("db_usage_xref_filename", "xref_db_usage_summary.md")
cfg_plc_tag_xref_filename = xml_parser_config.get("plc_tag_xref_filename", "xref_plc_tags_summary.md")
cfg_max_call_depth = xml_parser_config.get("max_call_depth", 5)
cfg_max_users_list = xml_parser_config.get("max_users_list", 20)
cfg_aggregated_filename = xml_parser_config.get("aggregated_filename", "full_project_representation.md")
# <-- FIN NUEVO -->
# Directorio donde se encuentra este script (x0_main.py)
script_dir = os.path.dirname(os.path.abspath(__file__))
# <-- MODIFICADO: Abrir archivo log -->
log_filepath = os.path.join(
@ -287,19 +191,28 @@ if __name__ == "__main__":
log_message("=" * 40 + " LOG START " + "=" * 40, log_f)
# --- PARTE 1: BUSCAR ARCHIVOS ---
xml_project_dir = working_directory
# <-- MODIFICADO: Apuntar al subdirectorio 'PLC' dentro del working_directory -->
plc_subdir_name = "PLC" # Nombre estándar del subdirectorio de TIA Portal
xml_project_dir = os.path.join(working_directory, plc_subdir_name)
log_message(
f"Buscando archivos XML recursivamente en: '{xml_project_dir}'", log_f
f"Directorio de trabajo base configurado: '{working_directory}'", log_f
)
log_message(
f"Buscando archivos XML recursivamente en el subdirectorio: '{xml_project_dir}'", log_f
)
# Verificar si el directorio PLC existe
if not os.path.isdir(xml_project_dir):
log_message(
f"Error: El directorio '{xml_project_dir}' no existe.",
f"Error: El subdirectorio '{plc_subdir_name}' no existe dentro de '{working_directory}'. "
f"Se esperaba encontrar la estructura del proyecto TIA Portal en '{xml_project_dir}'.",
log_f,
also_print=False,
)
print(
f"Error: El directorio '{xml_project_dir}' no existe.", file=sys.stderr
f"Error: El subdirectorio '{plc_subdir_name}' no existe dentro de '{working_directory}'. "
f"Asegúrese de que la ruta del directorio de trabajo apunte a la carpeta que *contiene* la carpeta '{plc_subdir_name}'.", file=sys.stderr
)
sys.exit(1)
search_pattern = os.path.join(xml_project_dir, "**", "*.xml")
@ -315,35 +228,37 @@ if __name__ == "__main__":
)
xml_files_found.sort()
[
log_message(f" - {os.path.relpath(xml_file, script_dir)}", log_f)
log_message(f" - {os.path.relpath(xml_file, working_directory)}", log_f) # Mostrar ruta relativa al working_directory original
for xml_file in xml_files_found
]
# --- Directorios de salida ---
scl_output_dir = os.path.join(xml_project_dir, SCL_OUTPUT_DIRNAME)
xref_output_dir = os.path.join(xml_project_dir, XREF_OUTPUT_DIRNAME)
# Estos directorios ahora se crearán DENTRO de xml_project_dir (es decir, dentro de 'PLC')
scl_output_dir = os.path.join(xml_project_dir, cfg_scl_output_dirname) # Usar valor de config
xref_output_dir = os.path.join(xml_project_dir, cfg_xref_output_dirname) # Usar valor de config
# --- PARTE 2: PROCESAMIENTO INDIVIDUAL (x1, x2, x3) ---
log_message("\n--- Fase 1: Procesamiento Individual (x1, x2, x3) ---", log_f)
script1 = "x1_to_json.py"
script2 = "x2_process.py"
script3 = "x3_generate_scl.py"
file_status = {}
# Los nombres de script ya no se usan directamente para x1, x2, x3
# script1 = "x1_to_json.py"
# script2 = "x2_process.py"
# script3 = "x3_generate_scl.py"
processed_count = 0
skipped_full_count = 0
failed_count = 0
skipped_partial_count = 0
for xml_filepath in xml_files_found:
relative_path = os.path.relpath(xml_filepath, script_dir)
for i, xml_filepath in enumerate(xml_files_found):
relative_path = os.path.relpath(xml_filepath, working_directory)
log_message(f"\n--- Procesando archivo: {relative_path} ---", log_f)
status = {"x1_ok": None, "x2_ok": None, "x3_ok": None}
file_status[relative_path] = status
base_filename = os.path.splitext(os.path.basename(xml_filepath))[0]
parsing_dir = os.path.join(os.path.dirname(xml_filepath), "parsing")
# Crear directorio de parsing si no existe
os.makedirs(parsing_dir, exist_ok=True)
json_output_file = os.path.join(parsing_dir, f"{base_filename}.json")
processed_json_filepath = os.path.join(
parsing_dir, f"{base_filename}_processed.json"
parsing_dir, f"{base_filename}_processed.json" # <-- Corregido: nombre correcto
)
# 1. Comprobar estado de salto
@ -353,139 +268,184 @@ if __name__ == "__main__":
skip_x1_x2 = skip_info["skip_x1_x2"]
skip_x3 = skip_info["skip_x3"]
# 2. Ejecutar/Saltar x1
if skip_x1_x2:
# Si se salta todo, registrar y continuar
if skip_x1_x2 and skip_x3:
log_message(
f"--- SALTANDO x1 para: {relative_path} (archivo XML no modificado y JSON procesado existe)",
f"--- SALTANDO TODO (x1, x2, x3) para: {relative_path} (XML no modificado, salida final actualizada)",
log_f,
)
status["x1_ok"] = True
else:
if run_script(script1, xml_filepath, log_f): # Pasar log_f
# Mensaje ya logueado por run_script
status["x1_ok"] = True
else:
log_message(
f"--- {script1} FALLÓ para: {relative_path} ---",
log_f,
also_print=False,
) # Ya impreso por run_script
status["x1_ok"] = False
failed_count += 1
skipped_full_count += 1
processed_count += 1 # Contar como procesado si se salta todo
continue
# 3. Ejecutar/Saltar x2
# Usar try/except para capturar errores en las llamadas directas
try:
# 2. Ejecutar/Saltar x1 (convert_xml_to_json)
if skip_x1_x2:
log_message(
f"--- SALTANDO x1 para: {relative_path} (XML no modificado, JSON procesado existe)",
log_f,
)
success_x1 = True # Asumir éxito si se salta
else:
log_message(
f"--- Ejecutando x1 (convert_xml_to_json) para: {relative_path} ---", log_f
)
success_x1 = convert_xml_to_json(xml_filepath, json_output_file)
if not success_x1:
log_message(f"--- x1 FALLÓ para: {relative_path} ---", log_f, also_print=False) # La función ya imprime el error
if not success_x1:
failed_count += 1
continue # No continuar si x1 falló
# 3. Ejecutar/Saltar x2 (process_json_to_scl)
if skip_x1_x2: # Si se saltó x1, también se salta x2
log_message(
f"--- SALTANDO x2 para: {relative_path} (razón anterior)", log_f
)
status["x2_ok"] = True
else:
if run_script(script2, xml_filepath, log_f): # Pasar log_f
status["x2_ok"] = True
success_x2 = True # Asumir éxito si se salta
else:
log_message(
f"--- {script2} FALLÓ para: {relative_path} ---",
log_f,
also_print=False,
f"--- Ejecutando x2 (process_json_to_scl) para: {relative_path} ---", log_f
)
status["x2_ok"] = False
failed_count += 1
continue
success_x2 = process_json_to_scl(json_output_file, processed_json_filepath)
if not success_x2:
log_message(f"--- x2 FALLÓ para: {relative_path} ---", log_f, also_print=False)
# 4. Ejecutar/Saltar x3
if skip_x3: # Solo puede ser True si skip_x1_x2 era True
log_message(
f"--- SALTANDO x3 para: {relative_path} (archivo de salida en '{SCL_OUTPUT_DIRNAME}' está actualizado)",
log_f,
)
status["x3_ok"] = True
skipped_full_count += 1
processed_count += 1
else:
if not success_x2:
failed_count += 1
continue # No continuar si x2 falló
# 4. Ejecutar x3 (generate_scl_or_markdown) - skip_x3 ya se manejó al principio
# Si llegamos aquí, x3 SIEMPRE debe ejecutarse (porque skip_x3 era False)
if skip_x1_x2:
skipped_partial_count += 1 # Se saltó x1/x2 pero se ejecuta x3
if run_script(
script3, xml_filepath, log_f, xml_project_dir
): # Pasar log_f y project_root_dir
status["x3_ok"] = True
processed_count += 1
else:
log_message(
f"--- {script3} FALLÓ para: {relative_path} ---",
log_f,
also_print=False,
f"--- Ejecutando x3 (generate_scl_or_markdown) para: {relative_path} ---", log_f
)
status["x3_ok"] = False
# Asegurar que el directorio de salida final exista ANTES de llamar a la función
os.makedirs(scl_output_dir, exist_ok=True)
success_x3 = generate_scl_or_markdown(
processed_json_filepath, scl_output_dir, xml_project_dir
)
if not success_x3:
log_message(f"--- x3 FALLÓ para: {relative_path} ---", log_f, also_print=False)
failed_count += 1
continue
continue # No continuar si x3 falló
# Si todo fue bien
processed_count += 1
except Exception as e:
# Capturar cualquier error inesperado durante las llamadas a funciones
log_message(f"--- ERROR INESPERADO procesando {relative_path}: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO procesando {relative_path}: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False) # Loguear traceback
traceback.print_exc(file=sys.stderr) # Mostrar traceback en consola
failed_count += 1
continue # Pasar al siguiente archivo
# --- PARTE 3: EJECUTAR x4 (Referencias Cruzadas) ---
log_message(
f"\n--- Fase 2: Ejecutando x4_cross_reference.py (salida en '{XREF_OUTPUT_DIRNAME}/') ---",
f"\n--- Fase 2: Ejecutando x4_cross_reference.py (salida en '{cfg_xref_output_dirname}/') ---", # Usar valor de config
log_f,
)
script4 = "x4_cross_reference.py"
run_x4 = True
success_x4 = False
can_run_x4 = any(s["x1_ok"] and s["x2_ok"] for s in file_status.values())
if not can_run_x4:
# La condición para ejecutar x4 ahora depende de si *algún* archivo tuvo éxito en x1 y x2
# (Necesitamos una forma de rastrear esto, o simplemente intentarlo si no hubo fallos fatales antes)
# Simplificación: Ejecutar x4 si no todos los archivos fallaron en x1/x2.
# Una mejor comprobación sería ver si existe algún archivo _processed.json
can_run_x4 = failed_count < len(xml_files_found) # Aproximación simple
if not can_run_x4 and len(xml_files_found) > 0:
log_message(
"Advertencia: Ningún archivo completó x1/x2. Saltando x4.", log_f
)
run_x4 = False
script4_path = os.path.join(script_dir, script4)
if not os.path.exists(script4_path):
log_message(
f"Advertencia: Script '{script4}' no encontrado. Saltando x4.", log_f
"Advertencia: Todos los archivos fallaron en x1/x2. Saltando x4.", log_f
)
run_x4 = False
elif len(xml_files_found) == 0:
run_x4 = False # No hay archivos, no ejecutar
if run_x4:
log_message(
f"Ejecutando {script4} sobre: {xml_project_dir}, salida en: {xref_output_dir}",
f"Ejecutando x4 (generate_cross_references) sobre: {xml_project_dir}, salida en: {xref_output_dir}",
log_f,
)
success_x4 = run_script(
script4, xml_project_dir, log_f, "-o", xref_output_dir
) # Pasar log_f
try:
# Llamada directa a la función de x4
# <-- MODIFICADO: Pasar todos los parámetros leídos de la config -->
success_x4 = generate_cross_references(
xml_project_dir,
xref_output_dir,
cfg_scl_output_dirname,
cfg_xref_source_subdir,
cfg_call_xref_filename,
cfg_db_usage_xref_filename,
cfg_plc_tag_xref_filename,
cfg_max_call_depth,
cfg_max_users_list)
if not success_x4:
log_message(f"--- {script4} FALLÓ. ---", log_f, also_print=False)
# Mensaje de éxito ya logueado por run_script
# La función interna ya debería haber impreso/logueado el error específico
log_message(f"--- x4 (generate_cross_references) FALLÓ. ---", log_f, also_print=False)
except Exception as e:
# Capturar error inesperado en la llamada a x4
log_message(f"--- ERROR INESPERADO ejecutando x4: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO ejecutando x4: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False)
traceback.print_exc(file=sys.stderr)
success_x4 = False # Marcar como fallo
else:
log_message("Fase 2 (x4) omitida.", log_f)
# --- PARTE 4: EJECUTAR x5 (Agregación) ---
log_message(f"\n--- Fase 3: Ejecutando x5_aggregate.py ---", log_f)
script5 = "x5_aggregate.py"
log_message(
f"\n--- Fase 3: Ejecutando x5_aggregate.py (salida en '{cfg_aggregated_filename}') ---", # Usar valor de config
log_f
)
run_x5 = True
success_x5 = False
can_run_x5 = any(s["x3_ok"] for s in file_status.values())
if not can_run_x5:
log_message("Advertencia: Ningún archivo completó x3. Saltando x5.", log_f)
run_x5 = False
script5_path = os.path.join(script_dir, script5)
if not os.path.exists(script5_path):
# Condición similar a x4: ejecutar si no todo falló en x1/x2/x3
can_run_x5 = failed_count < len(xml_files_found)
if not can_run_x5 and len(xml_files_found) > 0:
log_message(
f"Advertencia: Script '{script5}' no encontrado. Saltando x5.", log_f
"Advertencia: Todos los archivos fallaron en x1/x2/x3. Saltando x5.", log_f
)
run_x5 = False
elif len(xml_files_found) == 0:
run_x5 = False
if run_x5:
output_agg_file = os.path.join(xml_project_dir, AGGREGATED_FILENAME)
output_agg_file = os.path.join(working_directory, cfg_aggregated_filename) # Usar valor de config
log_message(
f"Ejecutando {script5} sobre: {xml_project_dir}, salida en: {output_agg_file}",
log_f,
f"Ejecutando x5 (aggregate_outputs) sobre: {xml_project_dir}, salida agregada en: {output_agg_file}",
log_f
)
success_x5 = run_script(
script5, xml_project_dir, log_f, "-o", output_agg_file
) # Pasar log_f
try:
# Llamada directa a la función de x5
# <-- MODIFICADO: Pasar los parámetros necesarios leídos de la config -->
success_x5 = aggregate_outputs(
xml_project_dir,
output_agg_file,
cfg_scl_output_dirname,
cfg_xref_output_dirname)
if not success_x5:
log_message(f"--- {script5} FALLÓ. ---", log_f, also_print=False)
# Mensaje de éxito ya logueado por run_script
# La función interna ya debería haber impreso/logueado el error específico
log_message(f"--- x5 (aggregate_outputs) FALLÓ. ---", log_f, also_print=False)
except Exception as e:
# Capturar error inesperado en la llamada a x5
log_message(f"--- ERROR INESPERADO ejecutando x5: {e} ---", log_f, also_print=False)
print(f"--- ERROR INESPERADO ejecutando x5: {e} ---", file=sys.stderr)
traceback_str = traceback.format_exc()
log_message(traceback_str, log_f, also_print=False)
traceback.print_exc(file=sys.stderr)
success_x5 = False # Marcar como fallo
else:
log_message("Fase 3 (x5) omitida.", log_f)
# --- PARTE 5: RESUMEN FINAL --- (MOVIDO AQUÍ)
# --- PARTE 5: RESUMEN FINAL ---
log_message(
"\n" + "-" * 20 + " Resumen Final del Procesamiento Completo " + "-" * 20,
@ -503,21 +463,13 @@ if __name__ == "__main__":
f"Archivos parcialmente saltados (x1, x2 saltados; x3 ejecutado): {skipped_partial_count}",
log_f,
)
log_message(f"Archivos fallidos (en x1, x2 o x3): {failed_count}", log_f)
if failed_count > 0:
log_message("Archivos fallidos:", log_f)
for f, s in file_status.items():
if not (
s.get("x1_ok", False)
and s.get("x2_ok", False)
and s.get("x3_ok", False)
):
failed_step = (
"x1"
if not s.get("x1_ok", False)
else ("x2" if not s.get("x2_ok", False) else "x3")
)
log_message(f" - {f} (falló en {failed_step})", log_f)
log_message(f"Archivos fallidos (en x1, x2, x3 o error inesperado): {failed_count}", log_f)
# El detalle de archivos fallidos es más difícil de rastrear ahora sin el dict 'file_status'
# Se podría reintroducir si es necesario, actualizándolo en cada paso.
# Por ahora, solo mostramos el conteo.
# if failed_count > 0:
# log_message("Archivos fallidos:", log_f)
# ... (lógica para mostrar cuáles fallaron) ...
log_message(
f"Fase 2 (Generación XRef - x4): {'Completada' if run_x4 and success_x4 else ('Fallida' if run_x4 and not success_x4 else 'Omitida')}",
log_f,
@ -555,5 +507,5 @@ if __name__ == "__main__":
print(f"Advertencia: Error durante flush/fsync final del log: {flush_err}", file=sys.stderr)
# <-- FIN NUEVO -->
print(f"\n{final_console_message} Consulta '{LOG_FILENAME}' para detalles.")
# Mensaje final ya impreso antes del flush
sys.exit(exit_code) # Salir con el código apropiado

View File

@ -1,3 +1,9 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script convierte archivos XML de Siemens LAD/FUP a un formato JSON simplificado.
"""
# ToUpload/x1_to_json.py
# -*- coding: utf-8 -*-
import json
@ -7,9 +13,15 @@ import sys
import traceback
import importlib
from lxml import etree
from lxml.etree import XMLSyntaxError as etree_XMLSyntaxError # Alias para evitar conflicto
from collections import defaultdict
import copy
import time # <-- NUEVO: Para obtener metadatos
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Importar funciones comunes y namespaces desde el nuevo módulo de utils
try:
@ -209,12 +221,18 @@ def load_parsers(parsers_dir="parsers"):
return parser_map
def convert_xml_to_json(xml_filepath, json_filepath, parser_map):
# <-- MODIFICADO: parser_map ya no es un argumento, se carga dentro -->
def convert_xml_to_json(xml_filepath, json_filepath):
"""
Convierte XML a JSON, detectando tipo, añadiendo metadatos del XML
y extrayendo comentarios/títulos de red de forma centralizada. (v3)
Carga los parsers necesarios internamente.
"""
print(f"Iniciando conversión de '{xml_filepath}' a '{json_filepath}'...")
# <-- NUEVO: Cargar parsers aquí -->
print("Cargando parsers de red...")
parser_map = load_parsers()
# <-- FIN NUEVO -->
if not os.path.exists(xml_filepath):
print(f"Error Crítico: Archivo XML no encontrado: '{xml_filepath}'")
return False
@ -438,7 +456,7 @@ def convert_xml_to_json(xml_filepath, json_filepath, parser_map):
print("Error Crítico: No se generó ningún resultado para el archivo XML.")
return False
except etree.XMLSyntaxError as e:
except etree_XMLSyntaxError as e: # Usar alias
print(f"Error Crítico: Sintaxis XML inválida en '{xml_filepath}'. Detalles: {e}")
return False
except Exception as e:
@ -448,29 +466,35 @@ def convert_xml_to_json(xml_filepath, json_filepath, parser_map):
# --- Punto de Entrada Principal (__main__) ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Convert Simatic XML (FC/FB/OB/DB/UDT/TagTable) to simplified JSON using dynamic parsers and add XML metadata."
)
parser.add_argument(
"xml_filepath",
help="Path to the input XML file passed from the main script (x0_main.py).",
)
args = parser.parse_args()
xml_input_file = args.xml_filepath
# Lógica para ejecución standalone
try:
import tkinter as tk
from tkinter import filedialog
except ImportError:
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr)
# No salimos, podríamos intentar obtener el path de otra forma o fallar más adelante
tk = None # Marcar como no disponible
if not os.path.exists(xml_input_file):
xml_input_file = ""
if tk:
root = tk.Tk()
root.withdraw() # Ocultar la ventana principal de Tkinter
print("Por favor, selecciona el archivo XML de entrada...")
xml_input_file = filedialog.askopenfilename(
title="Selecciona el archivo XML de entrada",
filetypes=[("XML files", "*.xml"), ("All files", "*.*")]
)
root.destroy() # Cerrar Tkinter
if not xml_input_file:
print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr)
# sys.exit(1) # No usar sys.exit aquí
else:
print(
f"Error Crítico (x1): Archivo XML no encontrado: '{xml_input_file}'",
file=sys.stderr,
)
sys.exit(1)
loaded_parsers = load_parsers()
if not loaded_parsers:
print(
"Advertencia (x1): No se cargaron parsers de red. Se continuará para UDT/TagTable/DB."
f"Archivo XML seleccionado: {xml_input_file}"
)
# Calcular ruta de salida JSON
xml_filename_base = os.path.splitext(os.path.basename(xml_input_file))[0]
base_dir = os.path.dirname(xml_input_file)
output_dir = os.path.join(base_dir, "parsing")
@ -478,16 +502,13 @@ if __name__ == "__main__":
json_output_file = os.path.join(output_dir, f"{xml_filename_base}.json")
print(
f"(x1) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'"
f"(x1 - Standalone) Convirtiendo: '{os.path.relpath(xml_input_file)}' -> '{os.path.relpath(json_output_file)}'"
)
success = convert_xml_to_json(xml_input_file, json_output_file, loaded_parsers)
# Llamar a la función principal (que ahora carga los parsers)
success = convert_xml_to_json(xml_input_file, json_output_file)
if success:
sys.exit(0)
print("\nConversión completada exitosamente.")
else:
print(
f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.",
file=sys.stderr,
)
sys.exit(1)
print(f"\nError durante la conversión de '{os.path.relpath(xml_input_file)}'.", file=sys.stderr)

View File

@ -1,3 +1,10 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script convierte un archivo JSON simplificado (resultado de un análisis de un XML de Siemens) a un
JSON enriquecido con lógica SCL. Se enfoca en la lógica de programación y la agrupación de instrucciones IF.
"""
# -*- coding: utf-8 -*-
import json
import argparse
@ -8,6 +15,11 @@ import re
import importlib
import sys
import sympy
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# Import necessary components from processors directory
from processors.processor_utils import format_variable_name, sympy_expr_to_scl
@ -520,57 +532,54 @@ def process_json_to_scl(json_filepath, output_json_filepath):
# --- Ejecución (MODIFICADO) ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Process simplified JSON to embed SCL logic, copying XML metadata. Expects original XML filepath."
) # <-- MODIFICADO
parser.add_argument(
"source_xml_filepath",
help="Path to the original source XML file (passed from x0_main.py).",
# Lógica para ejecución standalone
try:
import tkinter as tk
from tkinter import filedialog
except ImportError:
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr)
tk = None
input_json_file = ""
if tk:
root = tk.Tk()
root.withdraw()
print("Por favor, selecciona el archivo JSON de entrada (generado por x1)...")
input_json_file = filedialog.askopenfilename(
title="Selecciona el archivo JSON de entrada (.json)",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
args = parser.parse_args()
source_xml_file = args.source_xml_filepath
root.destroy()
if not os.path.exists(source_xml_file):
print(
f"Advertencia (x2): Archivo XML original no encontrado: '{source_xml_file}', pero se intentará encontrar el JSON correspondiente.",
file=sys.stderr,
)
# No salir, intentar encontrar el JSON de todas formas
if not input_json_file:
print("No se seleccionó ningún archivo. Saliendo.", file=sys.stderr)
else:
print(f"Archivo JSON de entrada seleccionado: {input_json_file}")
xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0]
base_dir = os.path.dirname(source_xml_file)
parsing_dir = os.path.join(base_dir, "parsing")
# x2 LEE el .json y ESCRIBE el _processed.json
input_json_file = os.path.join(parsing_dir, f"{xml_filename_base}.json")
output_json_file = os.path.join(parsing_dir, f"{xml_filename_base}_processed.json")
# Calcular ruta de salida JSON procesado
json_filename_base = os.path.splitext(os.path.basename(input_json_file))[0]
# Asumimos que el _processed.json va al mismo directorio 'parsing'
parsing_dir = os.path.dirname(input_json_file)
output_json_file = os.path.join(parsing_dir, f"{json_filename_base}_processed.json")
# Asegurarse de que el directorio de salida exista (aunque debería si el input existe)
os.makedirs(parsing_dir, exist_ok=True)
print(
f"(x2) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'"
f"(x2 - Standalone) Procesando: '{os.path.relpath(input_json_file)}' -> '{os.path.relpath(output_json_file)}'"
)
if not os.path.exists(input_json_file):
print(
f"Error Fatal (x2): El archivo de entrada JSON no existe: '{input_json_file}'",
file=sys.stderr,
)
print(
f"Asegúrate de que 'x1_to_json.py' se ejecutó correctamente para '{os.path.relpath(source_xml_file)}'.",
file=sys.stderr,
)
sys.exit(1)
else:
try:
success = process_json_to_scl(input_json_file, output_json_file)
if success:
sys.exit(0)
print("\nProcesamiento completado exitosamente.")
else:
sys.exit(1)
print(f"\nError durante el procesamiento de '{os.path.relpath(input_json_file)}'.", file=sys.stderr)
# sys.exit(1) # No usar sys.exit
except Exception as e:
print(
f"Error Crítico (x2) durante el procesamiento de '{input_json_file}': {e}",
file=sys.stderr,
)
traceback.print_exc(file=sys.stderr)
sys.exit(1)
# sys.exit(1) # No usar sys.exit

View File

@ -1,3 +1,9 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script es parte de un conjunto de herramientas para convertir proyectos de Siemens LAD/FUP a SCL.
"""
# ToUpload/x3_generate_scl.py
# -*- coding: utf-8 -*-
import json
@ -6,6 +12,11 @@ import re
import argparse
import sys
import traceback
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Importar Generadores Específicos ---
try:
@ -25,7 +36,7 @@ except ImportError as e:
sys.exit(1)
# --- Constantes ---
SCL_OUTPUT_DIRNAME = "scl_output" # <-- NUEVO: Nombre del directorio de salida final
# SCL_OUTPUT_DIRNAME = "scl_output" # <-- Ya no se usa directamente en __main__, se lee de config
# --- Modificar generate_scl_or_markdown para usar el nuevo directorio de salida ---
@ -132,42 +143,54 @@ def generate_scl_or_markdown(
# --- Ejecución (MODIFICADO para usar SCL_OUTPUT_DIRNAME) ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=f"Generate final SCL/Markdown file into '{SCL_OUTPUT_DIRNAME}/'."
) # <-- MODIFICADO
parser.add_argument(
"source_xml_filepath", help="Path to the original source XML file."
# Lógica para ejecución standalone
try:
import tkinter as tk
from tkinter import filedialog
except ImportError:
print("Error: Tkinter no está instalado. No se puede mostrar el diálogo de archivo.", file=sys.stderr)
tk = None
input_json_file = ""
project_root_dir = ""
if tk:
root = tk.Tk()
root.withdraw()
print("Por favor, selecciona el archivo JSON procesado de entrada (generado por x2)...")
input_json_file = filedialog.askopenfilename(
title="Selecciona el archivo JSON procesado de entrada (_processed.json)",
filetypes=[("Processed JSON files", "*_processed.json"), ("JSON files", "*.json"), ("All files", "*.*")]
)
parser.add_argument(
"project_root_dir",
help="Path to the root directory of the XML project structure.",
if input_json_file:
print(f"Archivo JSON procesado seleccionado: {input_json_file}")
print("Por favor, selecciona el directorio raíz del proyecto XML (ej. la carpeta 'PLC')...")
project_root_dir = filedialog.askdirectory(
title="Selecciona el directorio raíz del proyecto XML"
)
args = parser.parse_args()
source_xml_file = args.source_xml_filepath
project_root_dir = args.project_root_dir
if project_root_dir:
print(f"Directorio raíz del proyecto seleccionado: {project_root_dir}")
else:
print("No se seleccionó directorio raíz. Saliendo.", file=sys.stderr)
else:
print("No se seleccionó archivo JSON procesado. Saliendo.", file=sys.stderr)
root.destroy()
if not os.path.exists(source_xml_file):
print(
f"Advertencia (x3): Archivo XML original no encontrado: '{source_xml_file}'. Se intentará continuar.",
file=sys.stderr,
)
# No salir necesariamente, podríamos tener el JSON procesado
if input_json_file and project_root_dir:
# Calcular directorio de salida final
# <-- NUEVO: Leer nombre del directorio de salida desde la configuración -->
configs = load_configuration()
xml_parser_config = configs.get("XML Parser to SCL", {})
cfg_scl_output_dirname = xml_parser_config.get("scl_output_dir", "scl_output") # Leer con default
# <-- FIN NUEVO -->
xml_filename_base = os.path.splitext(os.path.basename(source_xml_file))[0]
xml_dir = os.path.dirname(source_xml_file)
parsing_dir = os.path.join(xml_dir, "parsing")
input_json_file = os.path.join(parsing_dir, f"{xml_filename_base}_processed.json")
final_output_dir = os.path.join(project_root_dir, cfg_scl_output_dirname) # Usar valor leído
# <-- MODIFICADO: Calcular directorio de salida final -->
# Siempre será 'scl_output' bajo la raíz del proyecto
final_output_dir = os.path.join(project_root_dir, SCL_OUTPUT_DIRNAME)
# <-- FIN MODIFICADO -->
print(f"(x3 - Standalone) Generando SCL/MD desde: '{os.path.relpath(input_json_file)}'")
print(f"(x3 - Standalone) Directorio de salida final: '{os.path.relpath(final_output_dir)}'")
print(f"(x3 - Standalone) Usando ruta raíz del proyecto: '{project_root_dir}' para buscar UDTs.")
print(f"(x3) Generando SCL/MD desde: '{os.path.relpath(input_json_file)}'")
print(f"(x3) Directorio de salida final: '{os.path.relpath(final_output_dir)}'")
print(f"(x3) Usando ruta raíz del proyecto: '{project_root_dir}' para buscar UDTs.")
# Asegurar que el directorio de salida final exista ANTES de llamar a la función
# Asegurar que el directorio de salida final exista
try:
os.makedirs(final_output_dir, exist_ok=True)
except OSError as e:
@ -175,25 +198,27 @@ if __name__ == "__main__":
f"Error Crítico (x3): No se pudo crear el directorio de salida '{final_output_dir}': {e}",
file=sys.stderr,
)
sys.exit(1)
if not os.path.exists(input_json_file):
print(
f"Error Fatal (x3): JSON procesado no encontrado: '{input_json_file}'",
file=sys.stderr,
)
sys.exit(1)
# sys.exit(1) # No usar sys.exit
success = False # Marcar como fallo para evitar la llamada
else:
success = True # Marcar como éxito para proceder
if success: # Solo intentar si se pudo crear el directorio
try:
# Pasar el directorio de salida FINAL y la ruta raíz
# Llamar a la función principal
success = generate_scl_or_markdown(
input_json_file, final_output_dir, project_root_dir
) # <-- MODIFICADO
)
if success:
sys.exit(0)
print("\nGeneración de SCL/MD completada exitosamente.")
else:
sys.exit(1) # La función ya imprimió el error
# La función generate_scl_or_markdown ya imprime el error
print(f"\nError durante la generación desde '{os.path.relpath(input_json_file)}'.", file=sys.stderr)
# sys.exit(1) # No usar sys.exit
except Exception as e:
print(f"Error Crítico no manejado en x3: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
sys.exit(1)
# sys.exit(1) # No usar sys.exit
else:
# Mensajes de cancelación ya impresos si aplica
pass

View File

@ -1,3 +1,9 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script genera documentacion MD de Cross Reference para Obsidian
"""
# ToUpload/x4_cross_reference.py
# -*- coding: utf-8 -*-
import json
@ -10,6 +16,11 @@ import re
import urllib.parse
import shutil # <-- NUEVO: Para copiar archivos
from collections import defaultdict
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Importar format_variable_name (sin cambios) ---
try:
@ -40,14 +51,14 @@ except ImportError:
# --- Constantes ---
SCL_OUTPUT_DIRNAME = "scl_output"
XREF_SOURCE_SUBDIR = "source" # <-- NUEVO: Subdirectorio para fuentes MD
CALL_XREF_FILENAME = "xref_calls_tree.md"
DB_USAGE_XREF_FILENAME = "xref_db_usage_summary.md"
PLC_TAG_XREF_FILENAME = "xref_plc_tags_summary.md"
MAX_CALL_DEPTH = 5
# SCL_OUTPUT_DIRNAME = "scl_output" # Se leerá de config
# XREF_SOURCE_SUBDIR = "source" # Se leerá de config
# CALL_XREF_FILENAME = "xref_calls_tree.md" # Se leerá de config
# DB_USAGE_XREF_FILENAME = "xref_db_usage_summary.md" # Se leerá de config
# PLC_TAG_XREF_FILENAME = "xref_plc_tags_summary.md" # Se leerá de config
# MAX_CALL_DEPTH = 5 # Se leerá de config
INDENT_STEP = " "
MAX_USERS_LIST = 20
# MAX_USERS_LIST = 20 # Se leerá de config
# --- Funciones de Análisis (find_calls_in_scl, find_db_tag_usage, find_plc_tag_usage sin cambios) ---
@ -212,13 +223,14 @@ def find_plc_tag_usage(scl_code, plc_tag_names_set):
# <-- NUEVA FUNCION -->
def copy_and_prepare_source_files(project_root_dir, xref_output_dir):
def copy_and_prepare_source_files(project_root_dir, xref_output_dir, scl_output_dirname, xref_source_subdir):
"""
Copia archivos .scl y .md desde scl_output a xref_output/source,
convirtiendo .scl a .md con formato de bloque de código.
Usa los nombres de directorios pasados como argumentos.
"""
scl_source_dir = os.path.join(project_root_dir, SCL_OUTPUT_DIRNAME)
md_target_dir = os.path.join(xref_output_dir, XREF_SOURCE_SUBDIR)
scl_source_dir = os.path.join(project_root_dir, scl_output_dirname)
md_target_dir = os.path.join(xref_output_dir, xref_source_subdir)
if not os.path.isdir(scl_source_dir):
print(
@ -293,7 +305,7 @@ def copy_and_prepare_source_files(project_root_dir, xref_output_dir):
# <-- MODIFICADO: get_scl_link -->
def get_scl_link(
block_name, block_entry, base_xref_dir
block_name, block_entry, xref_source_subdir
): # Ya no necesita project_root_dir
"""
Genera un enlace Markdown relativo al archivo .md correspondiente DENTRO de xref_output/source.
@ -302,10 +314,10 @@ def get_scl_link(
return f"`{block_name}`"
# El nombre del archivo destino siempre será .md
md_filename = format_variable_name(block_name) + ".md"
md_filename = format_variable_name(block_name) + ".md" # Asegurar que format_variable_name esté disponible
# La ruta siempre estará dentro del subdirectorio 'source'
link_target_path = f"{XREF_SOURCE_SUBDIR}/{md_filename}"
# La ruta siempre estará dentro del subdirectorio fuente de xref
link_target_path = f"{xref_source_subdir}/{md_filename}"
# Codificar para URL/Markdown
try:
@ -320,7 +332,7 @@ def get_scl_link(
# <-- MODIFICADO: build_call_tree_recursive (ya no necesita project_root_dir) -->
def build_call_tree_recursive(
def build_call_tree_recursive( # Añadido max_call_depth, xref_source_subdir
current_node,
call_graph,
block_data,
@ -328,6 +340,8 @@ def build_call_tree_recursive(
visited_in_path,
base_xref_dir,
current_depth=0,
max_call_depth=5,
xref_source_subdir="source"
):
"""
Función recursiva para construir el árbol de llamadas indentado CON ENLACES
@ -336,10 +350,10 @@ def build_call_tree_recursive(
indent = INDENT_STEP * current_depth
block_entry = block_data.get(current_node)
# Llamar a get_scl_link modificado
node_link = get_scl_link(current_node, block_entry, base_xref_dir)
node_link = get_scl_link(current_node, block_entry, xref_source_subdir)
output_lines.append(f"{indent}- {node_link}")
if current_depth >= MAX_CALL_DEPTH:
if current_depth >= max_call_depth:
output_lines.append(
f"{indent}{INDENT_STEP}[... Profundidad máxima alcanzada ...]"
)
@ -359,20 +373,22 @@ def build_call_tree_recursive(
block_data,
output_lines,
visited_in_path.copy(),
base_xref_dir,
base_xref_dir, # base_xref_dir no se usa en la recursión, podría quitarse
current_depth + 1,
max_call_depth=max_call_depth, # Pasar parámetro
xref_source_subdir=xref_source_subdir # Pasar parámetro
)
# <-- MODIFICADO: generate_call_tree_output (ya no necesita project_root_dir) -->
def generate_call_tree_output(call_graph, block_data, base_xref_dir):
def generate_call_tree_output(call_graph, block_data, base_xref_dir, max_call_depth, xref_source_subdir): # Añadido max_call_depth, xref_source_subdir
"""
Genera las líneas de texto para el archivo de árbol de llamadas CON ENLACES
a los archivos .md en xref_output/source.
"""
output_lines = ["# Árbol de Referencias Cruzadas de Llamadas\n"]
output_lines.append(f"(Profundidad máxima: {MAX_CALL_DEPTH})\n")
root_nodes = sorted(
root_nodes = sorted( # Encontrar OBs
[
name
for name, data in block_data.items()
@ -387,7 +403,7 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir):
for ob_name in root_nodes:
ob_entry = block_data.get(ob_name)
ob_link = get_scl_link(
ob_name, ob_entry, base_xref_dir
ob_name, ob_entry, xref_source_subdir
) # Llamar a get_scl_link modificado
output_lines.append(f"\n### Iniciando desde: {ob_link}\n")
build_call_tree_recursive(
@ -396,8 +412,10 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir):
block_data,
output_lines,
set(),
base_xref_dir,
base_xref_dir, # No se usa en recursión
current_depth=0,
max_call_depth=max_call_depth, # Pasar parámetro
xref_source_subdir=xref_source_subdir # Pasar parámetro
)
all_callers = set(call_graph.keys())
@ -416,7 +434,7 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir):
for block_name in unreached:
block_entry = block_data.get(block_name)
block_link = get_scl_link(
block_name, block_entry, base_xref_dir
block_name, block_entry, xref_source_subdir
) # Llamar a get_scl_link modificado
output_lines.append(f"- {block_link}")
return output_lines
@ -424,7 +442,7 @@ def generate_call_tree_output(call_graph, block_data, base_xref_dir):
# --- Funciones para Salida Resumida (generate_db_usage_summary_output, generate_plc_tag_summary_output SIN CAMBIOS) ---
# (Se omiten por brevedad)
def generate_db_usage_summary_output(db_users):
def generate_db_usage_summary_output(db_users, max_users_list): # Añadido max_users_list
"""Genera las líneas para el archivo Markdown de resumen de uso de DBs."""
output_lines = ["# Resumen de Uso de DB Globales por Bloque\n\n"]
if not db_users:
@ -440,7 +458,7 @@ def generate_db_usage_summary_output(db_users):
output_lines.append("- No utilizado directamente.\n")
else:
output_lines.append("Utilizado por:\n")
display_users = users_list[:MAX_USERS_LIST]
display_users = users_list[:max_users_list] # Usar parámetro
remaining_count = len(users_list) - len(display_users)
for user_block in display_users:
output_lines.append(f"- `{user_block}`")
@ -450,7 +468,7 @@ def generate_db_usage_summary_output(db_users):
return output_lines
def generate_plc_tag_summary_output(plc_tag_users):
def generate_plc_tag_summary_output(plc_tag_users, max_users_list): # Añadido max_users_list
"""Genera las líneas para el archivo Markdown de resumen de uso de PLC Tags."""
output_lines = ["# Resumen de Uso de PLC Tags Globales por Bloque\n\n"]
if not plc_tag_users:
@ -466,7 +484,7 @@ def generate_plc_tag_summary_output(plc_tag_users):
output_lines.append("- No utilizado.\n")
else:
output_lines.append("Utilizado por:\n")
display_users = users_list[:MAX_USERS_LIST]
display_users = users_list[:max_users_list] # Usar parámetro
remaining_count = len(users_list) - len(display_users)
for user_block in display_users:
output_lines.append(f"- `{user_block}`")
@ -477,20 +495,33 @@ def generate_plc_tag_summary_output(plc_tag_users):
# --- Función Principal (MODIFICADA para llamar a copy_and_prepare_source_files) ---
def generate_cross_references(project_root_dir, output_dir):
def generate_cross_references(
project_root_dir,
output_dir,
scl_output_dirname,
xref_source_subdir,
call_xref_filename,
db_usage_xref_filename,
plc_tag_xref_filename,
max_call_depth,
max_users_list
):
"""
Genera archivos de referencias cruzadas y prepara archivos fuente (.md)
para visualización en Obsidian.
Utiliza los parámetros de configuración pasados como argumentos.
"""
print(f"--- Iniciando Generación de Referencias Cruzadas y Fuentes MD (x4) ---")
print(f"Buscando archivos JSON procesados en: {project_root_dir}")
print(f"Directorio de salida XRef: {output_dir}")
print(f"Directorio fuente SCL/MD: {scl_output_dirname}")
print(f"Subdirectorio fuentes MD para XRef: {xref_source_subdir}")
output_dir_abs = os.path.abspath(output_dir)
# <-- NUEVO: Crear directorio y preparar archivos fuente ANTES de generar XRefs -->
copy_and_prepare_source_files(project_root_dir, output_dir_abs)
# Pasar los nombres de directorios leídos de la config
copy_and_prepare_source_files(project_root_dir, output_dir_abs, scl_output_dirname, xref_source_subdir)
# <-- FIN NUEVO -->
json_files = glob.glob(
os.path.join(project_root_dir, "**", "*_processed.json"), recursive=True
)
@ -577,14 +608,14 @@ def generate_cross_references(project_root_dir, output_dir):
# 3. Generar Archivos de Salida XRef (MODIFICADO para usar la nueva función de árbol)
os.makedirs(output_dir_abs, exist_ok=True)
call_xref_path = os.path.join(output_dir_abs, CALL_XREF_FILENAME)
db_usage_xref_path = os.path.join(output_dir_abs, DB_USAGE_XREF_FILENAME)
plc_tag_xref_path = os.path.join(output_dir_abs, PLC_TAG_XREF_FILENAME)
call_xref_path = os.path.join(output_dir_abs, call_xref_filename) # Usar parámetro
db_usage_xref_path = os.path.join(output_dir_abs, db_usage_xref_filename) # Usar parámetro
plc_tag_xref_path = os.path.join(output_dir_abs, plc_tag_xref_filename) # Usar parámetro
print(f"Generando ÁRBOL XRef de llamadas en: {call_xref_path}")
try:
# <-- MODIFICADO: Llamar a la nueva función sin project_root_dir -->
call_tree_lines = generate_call_tree_output(
call_tree_lines = generate_call_tree_output( # Pasar parámetros
call_graph, block_data, output_dir_abs
)
with open(call_xref_path, "w", encoding="utf-8") as f:
@ -598,7 +629,7 @@ def generate_cross_references(project_root_dir, output_dir):
# Generar Resumen de Uso de DB (sin cambios aquí)
print(f"Generando RESUMEN XRef de uso de DBs en: {db_usage_xref_path}")
try:
db_summary_lines = generate_db_usage_summary_output(db_users)
db_summary_lines = generate_db_usage_summary_output(db_users, max_users_list) # Pasar parámetro
with open(db_usage_xref_path, "w", encoding="utf-8") as f:
[f.write(line + "\n") for line in db_summary_lines]
except Exception as e:
@ -611,7 +642,7 @@ def generate_cross_references(project_root_dir, output_dir):
# Generar Resumen de Uso de PLC Tags (sin cambios aquí)
print(f"Generando RESUMEN XRef de uso de PLC Tags en: {plc_tag_xref_path}")
try:
plc_tag_lines = generate_plc_tag_summary_output(plc_tag_users)
plc_tag_lines = generate_plc_tag_summary_output(plc_tag_users, max_users_list) # Pasar parámetro
with open(plc_tag_xref_path, "w", encoding="utf-8") as f:
[f.write(line + "\n") for line in plc_tag_lines]
except Exception as e:
@ -627,35 +658,53 @@ def generate_cross_references(project_root_dir, output_dir):
# --- Punto de Entrada (sin cambios) ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Genera refs cruzadas y prepara archivos fuente MD para Obsidian."
)
parser.add_argument("project_root_dir", help="Ruta dir raíz proyecto XML.")
parser.add_argument(
"-o",
"--output",
help="Directorio para guardar salida XRef (incluyendo subdir 'source').",
)
args = parser.parse_args()
if not os.path.isdir(args.project_root_dir):
print(
f"Error: Dir proyecto no existe: '{args.project_root_dir}'", file=sys.stderr
)
sys.exit(1)
if not args.output:
print(
"Error: Se requiere el argumento -o/--output para especificar el directorio de salida XRef.",
file=sys.stderr,
)
sys.exit(1)
print("(x4 - Standalone) Ejecutando generación de referencias cruzadas...")
output_destination = args.output
success = generate_cross_references(args.project_root_dir, output_destination)
if success:
print(
f"Archivos XRef y fuentes MD generados en: {os.path.abspath(output_destination)}"
)
sys.exit(0)
# Cargar configuración para obtener rutas
configs = load_configuration()
working_directory = configs.get("working_directory")
# Acceder a la configuración específica del grupo
group_config = configs.get("level2", {})
# Leer parámetros con valores por defecto (usando los defaults del esquema como guía)
# Parámetros necesarios para x4
cfg_scl_output_dirname = group_config.get("scl_output_dir", "scl_output")
cfg_xref_output_dirname = group_config.get("xref_output_dir", "xref_output")
cfg_xref_source_subdir = group_config.get("xref_source_subdir", "source")
cfg_call_xref_filename = group_config.get("call_xref_filename", "xref_calls_tree.md")
cfg_db_usage_xref_filename = group_config.get("db_usage_xref_filename", "xref_db_usage_summary.md")
cfg_plc_tag_xref_filename = group_config.get("plc_tag_xref_filename", "xref_plc_tags_summary.md")
cfg_max_call_depth = group_config.get("max_call_depth", 5)
cfg_max_users_list = group_config.get("max_users_list", 20)
# Calcular rutas
if not working_directory:
print("Error: 'working_directory' no encontrado en la configuración.", file=sys.stderr)
# No usamos sys.exit(1)
else:
print("Hubo errores durante la generación de refs cruzadas.", file=sys.stderr)
sys.exit(1)
# Calcular rutas basadas en la configuración
plc_subdir_name = "PLC" # Asumir nombre estándar
project_root_dir = os.path.join(working_directory, plc_subdir_name)
xref_output_dir = os.path.join(project_root_dir, cfg_xref_output_dirname) # Usar nombre de dir leído
if not os.path.isdir(project_root_dir):
print(f"Error: Directorio del proyecto '{project_root_dir}' no encontrado.", file=sys.stderr)
else:
# Llamar a la función principal
success = generate_cross_references(
project_root_dir,
xref_output_dir,
cfg_scl_output_dirname,
cfg_xref_source_subdir,
cfg_call_xref_filename,
cfg_db_usage_xref_filename,
cfg_plc_tag_xref_filename,
cfg_max_call_depth,
cfg_max_users_list
)
if success:
print("\n(x4 - Standalone) Proceso completado exitosamente.")
else:
print("\n(x4 - Standalone) Proceso finalizado con errores.", file=sys.stderr)

View File

@ -1,3 +1,9 @@
"""
LadderToSCL - Conversor de Siemens LAD/FUP XML a SCL
Este script genera documentación en Markdown y SCL a partir de un proyecto XML de Siemens LAD/FUP.
"""
# ToUpload/x5_aggregate.py
# -*- coding: utf-8 -*-
import os
@ -5,29 +11,36 @@ import argparse
import sys
import glob
import traceback
script_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
sys.path.append(script_root)
from backend.script_utils import load_configuration
# --- Constantes ---
# Nombre del archivo de salida por defecto (se creará en el directorio raíz del proyecto)
AGGREGATED_FILENAME = "full_project_representation.md"
# AGGREGATED_FILENAME = "full_project_representation.md" # Se leerá de config
# Directorio donde x4 guarda sus salidas (relativo al directorio raíz del proyecto)
XREF_OUTPUT_SUBDIR = "xref_output"
# XREF_OUTPUT_SUBDIR = "xref_output" # Se leerá de config
# SCL_OUTPUT_DIRNAME = "scl_output" # Se leerá de config
def aggregate_files(project_root_dir, output_filepath):
def aggregate_outputs(project_root_dir, output_filepath, scl_output_dirname, xref_output_dirname): # Añadido scl_output_dirname, xref_output_dirname
"""
Busca archivos .scl y .md generados y los agrega en un único archivo Markdown.
"""
print(f"--- Iniciando Agregación de Archivos (x5) ---")
print(f"Leyendo desde directorios: '{scl_output_dirname}' y '{xref_output_dirname}' (relativos a la raíz)")
print(f"Directorio Raíz del Proyecto: {project_root_dir}")
print(f"Archivo de Salida: {output_filepath}")
# Patrones para buscar archivos generados
# Buscamos .scl en cualquier subdirectorio (generados por x3 junto a los XML)
scl_pattern = os.path.join(project_root_dir, "**", "*.scl")
# Buscamos .md en cualquier subdirectorio (UDT/TagTable generados por x3)
# Buscamos .md en cualquier subdirectorio (UDT/TagTable generados por x3, XRef por x4)
md_pattern_general = os.path.join(project_root_dir, "**", "*.md")
# Buscamos .md específicamente en el directorio de salida de x4
xref_dir = os.path.join(project_root_dir, XREF_OUTPUT_SUBDIR)
# xref_pattern = os.path.join(xref_dir, "*.md") # No es necesario, el general los incluye
# Directorio de salida de x4
xref_dir_abs = os.path.join(project_root_dir, xref_output_dirname)
scl_dir_abs = os.path.join(project_root_dir, scl_output_dirname)
print(f"Buscando archivos SCL con patrón: {scl_pattern}")
print(f"Buscando archivos MD con patrón: {md_pattern_general}")
@ -35,16 +48,18 @@ def aggregate_files(project_root_dir, output_filepath):
scl_files = glob.glob(scl_pattern, recursive=True)
md_files = glob.glob(md_pattern_general, recursive=True)
# Filtrar los archivos de salida del propio x5 y los XRef para que no se incluyan dos veces
# si el patrón general los captura y están en el directorio raíz
# Filtrar los archivos para asegurar que provienen de los directorios esperados
# y excluir el archivo de salida del propio x5.
output_filename_base = os.path.basename(output_filepath)
scl_files_filtered = [f for f in scl_files if os.path.dirname(f).startswith(scl_dir_abs)]
md_files_filtered = [
f for f in md_files
if os.path.basename(f) != output_filename_base # Excluir el archivo de salida
# No es necesario excluir los XRef explícitamente si están en su subdir
# and XREF_OUTPUT_SUBDIR not in os.path.relpath(f, project_root_dir).split(os.sep)
and (os.path.dirname(f).startswith(scl_dir_abs) or os.path.dirname(f).startswith(xref_dir_abs)) # Incluir MD de scl_output y xref_output
]
all_files = sorted(scl_files_filtered + md_files_filtered) # Combinar y ordenar alfabéticamente
all_files = sorted(scl_files + md_files_filtered) # Combinar y ordenar alfabéticamente
@ -96,42 +111,44 @@ def aggregate_files(project_root_dir, output_filepath):
traceback.print_exc(file=sys.stderr)
return False
# --- Punto de Entrada ---
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Agrega archivos .scl y .md generados en un único archivo Markdown."
)
parser.add_argument(
"project_root_dir",
help="Ruta al directorio raíz del proyecto XML (donde se buscarán los archivos generados)."
)
parser.add_argument(
"-o", "--output",
help=f"Ruta completa para el archivo Markdown agregado (por defecto: '{AGGREGATED_FILENAME}' en project_root_dir)."
)
print("(x5 - Standalone) Ejecutando agregación de salidas...")
args = parser.parse_args()
# Cargar configuración para obtener rutas
configs = load_configuration()
working_directory = configs.get("working_directory")
# Validar directorio de entrada
if not os.path.isdir(args.project_root_dir):
print(f"Error: El directorio del proyecto no existe: '{args.project_root_dir}'", file=sys.stderr)
sys.exit(1)
# Acceder a la configuración específica del grupo
group_config = configs.get("level2", {})
# Determinar ruta de salida
output_file = args.output
if not output_file:
output_file = os.path.join(args.project_root_dir, AGGREGATED_FILENAME)
# Leer parámetros con valores por defecto (usando los defaults del esquema como guía)
# Parámetros necesarios para x5
cfg_scl_output_dirname = group_config.get("scl_output_dir", "scl_output")
cfg_xref_output_dirname = group_config.get("xref_output_dir", "xref_output")
cfg_aggregated_filename = group_config.get("aggregated_filename", "full_project_representation.md")
if not working_directory:
print("Error: 'working_directory' no encontrado en la configuración.", file=sys.stderr)
else:
# Asegurarse de que el directorio de salida exista si se especifica una ruta completa
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Calcular rutas basadas en la configuración
plc_subdir_name = "PLC" # Asumir nombre estándar
project_root_dir = os.path.join(working_directory, plc_subdir_name)
# El archivo agregado va al working_directory original
output_agg_file = os.path.join(working_directory, cfg_aggregated_filename) # Usar nombre de archivo leído
if not os.path.isdir(project_root_dir):
print(f"Error: Directorio del proyecto '{project_root_dir}' no encontrado.", file=sys.stderr)
else:
# Llamar a la función principal
success = aggregate_files(args.project_root_dir, output_file)
# Pasar los nombres de directorios leídos
success = aggregate_outputs(
project_root_dir,
output_agg_file,
cfg_scl_output_dirname,
cfg_xref_output_dirname)
if success:
sys.exit(0)
print("\n(x5 - Standalone) Proceso completado exitosamente.")
else:
sys.exit(1)
print("\n(x5 - Standalone) Proceso finalizado con errores.", file=sys.stderr)

View File

@ -2,11 +2,13 @@ import os
import json
import subprocess
import re
from typing import Dict, Any, List
import traceback
from typing import Dict, Any, List, Optional
import time # Add this import
from datetime import datetime # Add this import
# --- ConfigurationManager Class ---
class ConfigurationManager:
def __init__(self):
self.base_path = os.path.dirname(os.path.abspath(__file__))
@ -18,6 +20,7 @@ class ConfigurationManager:
self.log_file = os.path.join(self.data_path, "log.txt")
self._init_log_file()
self.last_execution_time = 0 # Add this attribute
# Minimum seconds between script executions to prevent rapid clicks
self.min_execution_interval = 1 # Minimum seconds between executions
def _init_log_file(self):
@ -28,6 +31,7 @@ class ConfigurationManager:
with open(self.log_file, "w", encoding="utf-8") as f:
f.write("")
# --- Logging Methods ---
def append_log(self, message: str) -> None:
"""Append a message to the CENTRAL log file with timestamp."""
# This function now primarily logs messages from the app itself,
@ -38,6 +42,7 @@ class ConfigurationManager:
lines_with_timestamp = []
for line in lines:
if line.strip():
# Add timestamp only if line doesn't already have one (e.g., from script output)
if not line.strip().startswith("["):
line = f"{timestamp}{line}"
lines_with_timestamp.append(f"{line}\n")
@ -81,6 +86,7 @@ class ConfigurationManager:
print(f"Error clearing log file: {e}")
return False
# --- Working Directory Methods ---
def set_working_directory(self, path: str) -> Dict[str, str]:
"""Set and validate working directory."""
if not os.path.exists(path):
@ -89,13 +95,67 @@ class ConfigurationManager:
self.working_directory = path
# Create default data.json if it doesn't exist
# This data.json will be populated with defaults by get_config later if needed
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
with open(data_path, "w") as f:
try:
with open(data_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
print(
f"Info: Created empty data.json in working directory: {data_path}"
)
except Exception as e:
print(f"Error creating data.json in working directory {path}: {e}")
# Non-fatal, get_config will handle missing file
return {"status": "success", "path": path}
def get_work_dir(self, group: str) -> Optional[str]:
"""Get working directory path for a script group from work_dir.json."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
path = data.get("path", "")
# Normalizar separadores de ruta
if path:
path = os.path.normpath(path)
# Actualizar la variable de instancia si hay una ruta válida y existe
if path and os.path.isdir(path): # Check if it's a directory
self.working_directory = path
return path
elif path:
print(
f"Warning: Stored working directory for group '{group}' is invalid or does not exist: {path}"
)
self.working_directory = None # Reset if invalid
return None
else:
self.working_directory = None # Reset if no path stored
return None
except (FileNotFoundError, json.JSONDecodeError):
self.working_directory = None # Reset if file missing or invalid
return None
except Exception as e:
print(f"Error reading work_dir.json for group '{group}': {e}")
self.working_directory = None
return None
def get_directory_history(self, group: str) -> List[str]:
"""Get the directory history for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Normalizar todos los paths en el historial
history = [os.path.normpath(p) for p in data.get("history", [])]
# Filtrar solo directorios que existen
return [
p for p in history if os.path.isdir(p)
] # Check if directory exists
except (FileNotFoundError, json.JSONDecodeError):
return []
def get_script_groups(self) -> List[Dict[str, Any]]:
"""Returns list of available script groups with their descriptions."""
groups = []
@ -127,189 +187,506 @@ class ConfigurationManager:
print(f"Error reading group description: {e}")
return {}
# --- Configuration (data.json) Methods ---
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get configuration for specified level."""
"""
Get configuration for specified level.
Applies default values from the corresponding schema if the config
file doesn't exist or is missing keys with defaults.
"""
config_data = {}
needs_save = False
schema = None
data_path = None
schema_path_for_debug = "N/A" # For logging
# 1. Determine data path based on level
if level == "1":
path = os.path.join(self.data_path, "data.json")
data_path = os.path.join(self.data_path, "data.json")
schema_path_for_debug = os.path.join(self.data_path, "esquema_general.json")
elif level == "2":
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
if not self.working_directory:
return {} # Return empty config if working directory not set
path = os.path.join(self.working_directory, "data.json")
try:
with open(path, "r") as f:
return json.load(f)
except FileNotFoundError:
return {} # Return empty config if file doesn't exist
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level."""
try:
# Clean level parameter
level = str(level).split("-")[0]
# Determine schema path based on level
if level == "1":
path = os.path.join(self.data_path, "esquema_general.json")
elif level == "2":
path = os.path.join(
if not group:
return {"error": "Group required for level 2 config"}
data_path = os.path.join(self.script_groups_path, group, "data.json")
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
elif level == "3":
if not group:
return {"type": "object", "properties": {}}
path = os.path.join(self.script_groups_path, group, "esquema_work.json")
# Level 3 config is always in the current working directory
if not self.working_directory:
return {} # Return empty config if working directory not set
data_path = os.path.join(self.working_directory, "data.json")
# Level 3 config might be based on level 3 schema (esquema_work.json)
if group:
schema_path_for_debug = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
else:
return {"type": "object", "properties": {}}
# If no group, we can't determine the L3 schema for defaults.
schema_path_for_debug = "N/A (Level 3 without group)"
else:
return {"error": f"Invalid level specified for config: {level}"}
# Read existing schema from whichever file exists
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
schema = json.load(f)
return (
schema
if isinstance(schema, dict)
else {"type": "object", "properties": {}}
# 2. Get the corresponding schema to check for defaults
try:
# Only attempt to load schema if needed (e.g., not L3 without group)
if not (level == "3" and not group):
schema = self.get_schema(
level, group
) # Use the robust get_schema method
else:
schema = None # Cannot determine L3 schema without group
except Exception as e:
print(
f"Warning: Could not load schema for level {level}, group {group}. Defaults will not be applied. Error: {e}"
)
schema = None # Ensure schema is None if loading failed
# 3. Try to load existing data
data_file_exists = os.path.exists(data_path)
if data_file_exists:
try:
with open(data_path, "r", encoding="utf-8") as f_data:
content = f_data.read()
if content.strip():
config_data = json.loads(content)
else:
print(
f"Warning: Data file {data_path} is empty. Will initialize with defaults."
)
needs_save = True # Force save if file was empty
except json.JSONDecodeError:
print(
f"Warning: Could not decode JSON from {data_path}. Will initialize with defaults."
)
config_data = {}
needs_save = True
except Exception as e:
print(
f"Error reading data from {data_path}: {e}. Will attempt to initialize with defaults."
)
config_data = {}
needs_save = True
except FileNotFoundError:
print(
f"Info: Data file not found at {data_path}. Will initialize with defaults."
)
needs_save = True # Mark for saving as it's a new file
# 4. Apply defaults from schema if schema was loaded successfully
if schema and isinstance(schema, dict) and "properties" in schema:
schema_properties = schema.get("properties", {})
if isinstance(schema_properties, dict): # Ensure properties is a dict
for key, prop_definition in schema_properties.items():
# Ensure prop_definition is a dictionary before checking 'default'
if (
isinstance(prop_definition, dict)
and key not in config_data
and "default" in prop_definition
):
print(
f"Info: Applying default for '{key}' from schema {schema_path_for_debug}"
)
config_data[key] = prop_definition["default"]
needs_save = (
True # Mark for saving because a default was applied
)
else:
print(
f"Warning: 'properties' in schema {schema_path_for_debug} is not a dictionary. Cannot apply defaults."
)
# Create default schema if no file exists
default_schema = {"type": "object", "properties": {}}
# 5. Save the file if it was created or updated with defaults
if needs_save and data_path:
try:
print(f"Info: Saving updated config data to: {data_path}")
os.makedirs(os.path.dirname(data_path), exist_ok=True)
with open(data_path, "w", encoding="utf-8") as f_data:
json.dump(config_data, f_data, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error: Could not write data file to {data_path}: {e}")
except Exception as e:
print(f"Unexpected error saving data to {data_path}: {e}")
# 6. Return the final configuration
return config_data
def update_config(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
path = None
if level == "1":
path = os.path.join(self.data_path, "data.json")
elif level == "2":
if not group:
return {
"status": "error",
"message": "Group required for level 2 config update",
}
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
if not self.working_directory:
return {
"status": "error",
"message": "Working directory not set for level 3 config update",
}
path = os.path.join(self.working_directory, "data.json")
else:
return {
"status": "error",
"message": f"Invalid level for config update: {level}",
}
try:
# Ensure directory exists
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2)
return default_schema
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Config successfully updated at {path}")
return {"status": "success"}
except Exception as e:
print(f"Error loading schema: {str(e)}")
print(f"Error updating config at {path}: {str(e)}")
return {"status": "error", "message": str(e)}
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level."""
schema_path = None
try:
# Clean level parameter
clean_level = str(level).split("-")[0]
# Determine schema path based on level
if clean_level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json")
elif clean_level == "2":
if not group:
raise ValueError("Group is required for level 2 schema")
schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
elif clean_level == "3":
if not group:
# Level 3 schema (esquema_work) is tied to a group.
# If no group, we can't know which schema to load.
print(
"Warning: Group needed to determine level 3 schema (esquema_work.json). Returning empty schema."
)
return {"type": "object", "properties": {}}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
else:
print(
f"Warning: Invalid level '{level}' for schema retrieval. Returning empty schema."
)
return {"type": "object", "properties": {}}
# Read existing schema or create default if it doesn't exist
if os.path.exists(schema_path):
try:
with open(schema_path, "r", encoding="utf-8") as f:
schema = json.load(f)
# Basic validation
if (
not isinstance(schema, dict)
or "properties" not in schema
or "type" not in schema
):
print(
f"Warning: Schema file {schema_path} has invalid structure. Returning default."
)
return {"type": "object", "properties": {}}
# Ensure properties is a dict
if not isinstance(schema.get("properties"), dict):
print(
f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing."
)
schema["properties"] = {}
return schema
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from schema file: {schema_path}. Returning default."
)
return {"type": "object", "properties": {}}
except Exception as e:
print(
f"Error reading schema file {schema_path}: {e}. Returning default."
)
return {"type": "object", "properties": {}}
else:
print(
f"Info: Schema file not found at {schema_path}. Creating default schema."
)
default_schema = {"type": "object", "properties": {}}
try:
# Ensure directory exists before writing
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2, ensure_ascii=False)
return default_schema
except Exception as e:
print(f"Error creating default schema file at {schema_path}: {e}")
return {
"type": "object",
"properties": {},
} # Return empty if creation fails
except ValueError as ve: # Catch specific errors like missing group
print(f"Error getting schema path: {ve}")
return {"type": "object", "properties": {}}
except Exception as e:
# Log the full path in case of unexpected errors
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Unexpected error loading schema from {error_path}: {str(e)}")
return {"type": "object", "properties": {}}
def update_schema(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update schema for specified level and clean corresponding config."""
schema_path = None
config_path = None
try:
# Clean level parameter if it contains extra info like '-edit'
clean_level = str(level).split("-")[0]
# Determinar rutas de schema y config
if level == "1":
if clean_level == "1":
schema_path = os.path.join(self.data_path, "esquema_general.json")
config_path = os.path.join(self.data_path, "data.json")
elif level == "2":
elif clean_level == "2":
if not group:
return {
"status": "error",
"message": "Group is required for level 2 schema update",
}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_group.json"
)
config_path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
elif clean_level == "3":
if not group:
return {
"status": "error",
"message": "Group is required for level 3",
"message": "Group is required for level 3 schema update",
}
schema_path = os.path.join(
self.script_groups_path, group, "esquema_work.json"
)
# Config path depends on whether working_directory is set and valid
config_path = (
os.path.join(self.working_directory, "data.json")
if self.working_directory
and os.path.isdir(self.working_directory) # Check it's a directory
else None
)
if not config_path:
print(
f"Warning: Working directory not set or invalid ('{self.working_directory}'). Level 3 config file will not be cleaned."
)
else:
return {"status": "error", "message": "Invalid level"}
# Ensure directory exists
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
# Validate schema structure
if (
not isinstance(data, dict)
or "type" not in data
or "properties" not in data
):
data = {
"type": "object",
"properties": data if isinstance(data, dict) else {},
}
# Basic validation and normalization of the schema data being saved
if not isinstance(data, dict):
print(
f"Warning: Invalid schema data received (not a dict). Wrapping in default structure."
)
data = {"type": "object", "properties": {}} # Reset to default empty
if "type" not in data:
data["type"] = "object" # Ensure type exists
if "properties" not in data or not isinstance(data["properties"], dict):
print(
f"Warning: Invalid or missing 'properties' in schema data. Resetting properties."
)
data["properties"] = {} # Ensure properties exists and is a dict
# Write schema
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Info: Schema successfully updated at {schema_path}")
# Clean corresponding config file
# Clean the corresponding config file *if* its path is valid
if config_path:
self._clean_config_for_schema(config_path, data)
else:
print(
f"Info: Config cleaning skipped for level {level} (no valid config path)."
)
return {"status": "success"}
except Exception as e:
print(f"Error updating schema: {str(e)}")
error_path = schema_path if schema_path else f"Level {level}, Group {group}"
print(f"Error updating schema at {error_path}: {str(e)}")
# Consider adding traceback here for debugging
print(traceback.format_exc())
return {"status": "error", "message": str(e)}
def _clean_config_for_schema(
self, config_path: str, schema: Dict[str, Any]
) -> None:
"""Clean configuration file to match schema structure."""
if not config_path or not os.path.exists(config_path):
# Check existence *before* trying to open
try:
if not os.path.exists(config_path):
print(
f"Info: Config file {config_path} not found for cleaning. Skipping."
)
return
try:
# Cargar configuración actual
config = {}
content = "" # Store original content for comparison
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
content = f.read()
if content.strip(): # Avoid error on empty file
config = json.loads(content)
else:
print(
f"Info: Config file {config_path} is empty. Cleaning will result in an empty object."
)
# Limpiar configuración recursivamente
cleaned_config = self._clean_object_against_schema(config, schema)
# Guardar configuración limpia
# Guardar configuración limpia solo si cambió o si el original estaba vacío
# (para evitar escrituras innecesarias)
# Use dumps for reliable comparison, handle potential errors during dumps
try:
original_config_str = json.dumps(config, sort_keys=True)
cleaned_config_str = json.dumps(cleaned_config, sort_keys=True)
except TypeError as te:
print(
f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save."
)
original_config_str = "" # Force inequality
cleaned_config_str = " " # Force inequality
if original_config_str != cleaned_config_str or not content.strip():
print(f"Info: Cleaning config file: {config_path}")
with open(config_path, "w", encoding="utf-8") as f:
json.dump(cleaned_config, f, indent=2, ensure_ascii=False)
else:
print(
f"Info: Config file {config_path} already matches schema. No cleaning needed."
)
except json.JSONDecodeError:
print(
f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean."
)
except IOError as e:
print(f"Error accessing config file {config_path} during cleaning: {e}")
except Exception as e:
print(f"Error cleaning config: {str(e)}")
print(f"Unexpected error cleaning config {config_path}: {str(e)}")
# Consider adding traceback here
print(traceback.format_exc())
def _clean_object_against_schema(
self, data: Dict[str, Any], schema: Dict[str, Any]
) -> Dict[str, Any]:
"""Recursively clean object to match schema structure."""
if not isinstance(data, dict) or not isinstance(schema, dict):
def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any:
"""Recursively clean data to match schema structure."""
# Ensure schema is a dictionary, otherwise cannot proceed
if not isinstance(schema, dict):
print(
f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}"
)
return data
schema_type = schema.get("type")
if schema_type == "object":
if not isinstance(data, dict):
# If data is not a dict, but schema expects object, return empty dict
return {}
# This 'result' and the loop should be inside the 'if schema_type == "object":' block
result = {}
schema_props = schema.get("properties", {})
# Ensure schema_props is a dictionary
if not isinstance(schema_props, dict):
print(
f"Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object."
)
return {}
for key, value in data.items():
# Solo mantener campos que existen en el schema
if key in schema_props:
# Recursively clean the value based on the property's schema
# Ensure the property schema itself is a dict before recursing
prop_schema = schema_props[key]
# Si es un objeto anidado, limpiar recursivamente
if prop_schema.get("type") == "object":
result[key] = self._clean_object_against_schema(value, prop_schema)
# Si es un enum, verificar que el valor sea válido
elif "enum" in prop_schema:
if value in prop_schema["enum"]:
result[key] = value
# Para otros tipos, mantener el valor
if isinstance(prop_schema, dict):
result[key] = self._clean_object_against_schema(
value, prop_schema
)
else:
result[key] = value
# If property schema is invalid, maybe keep original value or omit? Let's omit.
print(
f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data."
)
# Return result should be OUTSIDE the loop, but INSIDE the 'if object' block
return result
def update_config(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
if level == "3" and not self.working_directory:
return {"status": "error", "message": "Working directory not set"}
elif schema_type == "array":
if not isinstance(data, list):
if level == "1":
path = os.path.join(self.data_path, "data.json")
elif level == "2":
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
path = os.path.join(self.working_directory, "data.json")
# If data is not a list, but schema expects array, return empty list
return []
# If schema defines items structure, clean each item
items_schema = schema.get("items")
if isinstance(
items_schema, dict
): # Check if 'items' schema is a valid dict
return [
self._clean_object_against_schema(item, items_schema)
for item in data
]
else:
# If no valid item schema, return list as is (or potentially filter based on basic types if needed)
# Let's return as is for now.
return data # Keep array items as they are if no valid 'items' schema defined
with open(path, "w") as f:
json.dump(data, f, indent=2)
elif "enum" in schema:
# Ensure enum values are defined as a list
enum_values = schema.get("enum")
if isinstance(enum_values, list):
# If schema has enum, keep data only if it's one of the allowed values
if data in enum_values:
return data
else:
# If value not in enum, return None or potentially the default value if specified?
# For cleaning, returning None or omitting might be safer. Let's return None.
return None # Or consider returning schema.get('default') if cleaning should apply defaults too
else:
# Invalid enum definition, return original data or None? Let's return None.
print(
f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'."
)
return None
# For basic types (string, integer, number, boolean, null), just return the data
# We could add type checking here if strict cleaning is needed,
# e.g., return None if type(data) doesn't match schema_type
elif schema_type in ["string", "integer", "number", "boolean", "null"]:
# Optional: Add stricter type check if needed
# expected_type_map = { "string": str, "integer": int, "number": (int, float), "boolean": bool, "null": type(None) }
# expected_types = expected_type_map.get(schema_type)
# if expected_types and not isinstance(data, expected_types):
# print(f"Warning: Type mismatch during cleaning. Expected {schema_type}, got {type(data)}. Returning None.")
# return None # Or schema.get('default')
return data
# If schema type is unknown or not handled, return data as is
else:
# This case might indicate an issue with the schema definition itself
# print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.")
return data
# --- Script Listing and Execution Methods ---
def list_scripts(self, group: str) -> List[Dict[str, str]]:
"""List all scripts in a group with their descriptions."""
try:
@ -318,7 +695,7 @@ class ConfigurationManager:
if not os.path.exists(scripts_dir):
print(f"Directory not found: {scripts_dir}")
return []
return [] # Return empty list if group directory doesn't exist
for file in os.listdir(scripts_dir):
# Modificar la condición para incluir cualquier archivo .py
@ -326,15 +703,15 @@ class ConfigurationManager:
path = os.path.join(scripts_dir, file)
description = self._extract_script_description(path)
print(
f"Found script: {file} with description: {description}"
f"Debug: Found script: {file} with description: {description}"
) # Debug line
scripts.append({"name": file, "description": description})
print(f"Total scripts found: {len(scripts)}") # Debug line
print(f"Debug: Total scripts found in group '{group}': {len(scripts)}")
return scripts
except Exception as e:
print(f"Error listing scripts: {str(e)}") # Debug line
return []
print(f"Error listing scripts for group '{group}': {str(e)}")
return [] # Return empty list on error
def _extract_script_description(self, script_path: str) -> str:
"""Extract description from script's docstring or initial comments."""
@ -354,9 +731,7 @@ class ConfigurationManager:
return "No description available"
except Exception as e:
print(
f"Error extracting description from {script_path}: {str(e)}"
) # Debug line
print(f"Error extracting description from {script_path}: {str(e)}")
return "Error reading script description"
def execute_script(
@ -370,7 +745,9 @@ class ConfigurationManager:
time_since_last = current_time - self.last_execution_time
if time_since_last < self.min_execution_interval:
msg = f"Por favor espere {self.min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones"
if broadcast_fn: broadcast_fn(msg)
self.append_log(f"Warning: {msg}") # Log throttling attempt
if broadcast_fn:
broadcast_fn(msg)
return {"status": "throttled", "error": msg}
self.last_execution_time = current_time
@ -381,27 +758,38 @@ class ConfigurationManager:
script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt")
if not os.path.exists(script_path):
msg = f"Error: Script no encontrado en {script_path}"
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: Script no encontrado en {script_path}"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Script not found"}
# Get working directory specific to the group
working_dir = self.get_work_dir(group)
if not working_dir:
msg = f"Error: Directorio de trabajo no configurado para el grupo '{group}'"
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: Directorio de trabajo no configurado o inválido para el grupo '{group}'"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Working directory not set"}
# Double check validity (get_work_dir should already do this)
if not os.path.isdir(working_dir):
msg = f"Error: El directorio de trabajo '{working_dir}' no es válido o no existe."
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: El directorio de trabajo '{working_dir}' no es válido o no existe."
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
return {"status": "error", "error": "Invalid working directory"}
# Aggregate configurations using the updated get_config
configs = {
"level1": self.get_config("1"),
"level2": self.get_config("2", group),
"level3": self.get_config("3", group), # get_config now handles working dir lookup
"level3": self.get_config(
"3", group
), # get_config uses self.working_directory
"working_directory": working_dir,
}
print(f"Debug: Aggregated configs for script execution: {configs}")
config_file_path = os.path.join(script_dir, "script_config.json")
try:
@ -410,8 +798,10 @@ class ConfigurationManager:
# Don't broadcast config saving unless debugging
# if broadcast_fn: broadcast_fn(f"Configuraciones guardadas en {config_file_path}")
except Exception as e:
msg = f"Error guardando configuraciones temporales: {str(e)}"
if broadcast_fn: broadcast_fn(msg)
msg = f"Error Fatal: No se pudieron guardar las configuraciones temporales en {config_file_path}: {str(e)}"
self.append_log(msg)
if broadcast_fn:
broadcast_fn(msg)
# Optionally return error here if config saving is critical
stdout_capture = []
@ -421,16 +811,18 @@ class ConfigurationManager:
try:
if broadcast_fn:
broadcast_fn(f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}...")
start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..."
broadcast_fn(start_msg)
# Execute the script
process = subprocess.Popen(
["python", "-u", script_path], # Added -u for unbuffered output
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace',
encoding="utf-8",
errors="replace",
bufsize=1,
env=dict(os.environ, PYTHONIOENCODING="utf-8"),
)
@ -466,7 +858,6 @@ class ConfigurationManager:
# Always include stderr in the final log if present
completion_msg += f" Se detectaron errores (ver log)."
if broadcast_fn:
broadcast_fn(completion_msg)
@ -479,7 +870,9 @@ class ConfigurationManager:
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Duración: {duration}\n")
log_f.write(f"Estado: {status.upper()} (Código de Salida: {return_code})\n")
log_f.write(
f"Estado: {status.upper()} (Código de Salida: {return_code})\n"
)
log_f.write("\n--- SALIDA ESTÁNDAR (STDOUT) ---\n")
log_f.write("\n".join(stdout_capture))
log_f.write("\n\n--- ERRORES (STDERR) ---\n")
@ -487,29 +880,39 @@ class ConfigurationManager:
log_f.write("\n--- FIN DEL LOG ---\n")
if broadcast_fn:
broadcast_fn(f"Log completo guardado en: {script_log_path}")
print(f"Info: Script log saved to {script_log_path}")
except Exception as log_e:
err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}"
print(err_msg)
if broadcast_fn: broadcast_fn(err_msg)
if broadcast_fn:
broadcast_fn(err_msg)
# ------------------------------------------
return {
"status": status,
"return_code": return_code,
"error": stderr_capture if stderr_capture else None,
"log_file": script_log_path # Return path to the specific log
"log_file": script_log_path, # Return path to the specific log
}
except Exception as e:
end_time = datetime.now()
duration = end_time - start_time
error_msg = f"Error inesperado durante la ejecución de {script_name}: {str(e)}"
traceback_info = traceback.format_exc() # Get traceback
error_msg = (
f"Error inesperado durante la ejecución de {script_name}: {str(e)}"
)
traceback_info = traceback.format_exc() # Get full traceback
print(error_msg) # Print to console as well
print(traceback_info)
self.append_log(
f"ERROR FATAL: {error_msg}\n{traceback_info}"
) # Log centrally
if broadcast_fn:
broadcast_fn(f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}")
# Ensure fatal errors are clearly marked in UI
broadcast_fn(
f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}"
)
# Attempt to write error to script-specific log
try:
@ -518,7 +921,9 @@ class ConfigurationManager:
log_f.write(f"Grupo: {group}\n")
log_f.write(f"Directorio de Trabajo: {working_dir}\n")
log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n")
log_f.write(
f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n"
)
log_f.write(f"Duración: {duration}\n")
log_f.write(f"Estado: FATAL ERROR\n")
log_f.write("\n--- ERROR ---\n")
@ -527,8 +932,10 @@ class ConfigurationManager:
log_f.write(traceback_info) # Include traceback in log
log_f.write("\n--- FIN DEL LOG ---\n")
except Exception as log_e:
print(f"Error adicional al intentar guardar el log de error: {log_e}")
err_msg_log = (
f"Error adicional al intentar guardar el log de error: {log_e}"
)
print(err_msg_log)
return {"status": "error", "error": error_msg, "traceback": traceback_info}
finally:
@ -539,23 +946,6 @@ class ConfigurationManager:
if process and process.stdout:
process.stdout.close()
def get_work_dir(self, group: str) -> str:
"""Get working directory path for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r") as f:
data = json.load(f)
path = data.get("path", "")
# Normalizar separadores de ruta
if path:
path = os.path.normpath(path)
# Actualizar la variable de instancia si hay una ruta válida
if path and os.path.exists(path):
self.working_directory = path
return path
except (FileNotFoundError, json.JSONDecodeError):
return ""
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
"""Set working directory path for a script group and update history."""
# Normalizar el path recibido
@ -569,7 +959,7 @@ class ConfigurationManager:
try:
# Cargar datos existentes o crear nuevos
try:
with open(work_dir_path, "r") as f:
with open(work_dir_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Normalizar paths existentes en el historial
if "history" in data:
@ -596,7 +986,7 @@ class ConfigurationManager:
data["history"] = data["history"][:10]
# Guardar datos actualizados
with open(work_dir_path, "w") as f:
with open(work_dir_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# Actualizar la variable de instancia
@ -605,22 +995,9 @@ class ConfigurationManager:
# Crear data.json en el directorio de trabajo si no existe
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
with open(data_path, "w") as f:
with open(data_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
return {"status": "success", "path": path}
except Exception as e:
return {"status": "error", "message": str(e)}
def get_directory_history(self, group: str) -> List[str]:
"""Get the directory history for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r") as f:
data = json.load(f)
# Normalizar todos los paths en el historial
history = [os.path.normpath(p) for p in data.get("history", [])]
# Filtrar solo directorios que existen
return [p for p in history if os.path.exists(p)]
except (FileNotFoundError, json.JSONDecodeError):
return []

View File

@ -1,35 +1,21 @@
[23:43:07] Iniciando ejecución de x3.py en C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport...
[23:43:07] --- AML (CAx Export) to Hierarchical JSON and Obsidian MD Converter (v28 - Working Directory Integration) ---
[23:43:07] Using Working Directory for Output: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport
[23:43:11] Input AML: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.aml
[23:43:11] Output Directory: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport
[23:43:11] Output JSON: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.hierarchical.json
[23:43:11] Output Main Tree MD: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_Hardware_Tree.md
[23:43:11] Output IO Debug Tree MD: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_IO_Upward_Debug.md
[23:43:11] Processing AML file: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.aml
[23:43:11] Pass 1: Found 203 InternalElement(s). Populating device dictionary...
[23:43:11] Pass 2: Identifying PLCs and Networks (Refined v2)...
[23:43:11] Identified Network: PROFIBUS_1 (bcc6f2bd-3d71-4407-90f2-bccff6064051) Type: Profibus
[23:43:11] Identified Network: ETHERNET_1 (c6d49787-a076-4592-994d-876eea123dfd) Type: Ethernet/Profinet
[23:43:11] Identified PLC: PLC (a48e038f-0bcc-4b48-8373-033da316c62b) - Type: CPU 1516F-3 PN/DP OrderNo: 6ES7 516-3FP03-0AB0
[23:43:11] Pass 3: Processing InternalLinks (Robust Network Mapping & IO)...
[23:43:11] Found 118 InternalLink(s).
[23:43:11] Mapping Device/Node 'E1' (NodeID:1643b51f-7067-4565-8f8e-109a1a775fed, Addr:10.1.33.11) to Network 'ETHERNET_1'
[23:43:11] --> Associating Network 'ETHERNET_1' with PLC 'PLC' (via Node 'E1' Addr: 10.1.33.11)
[23:43:11] Mapping Device/Node 'P1' (NodeID:5aff409b-2573-485f-82bf-0e08c9200086, Addr:1) to Network 'PROFIBUS_1'
[23:43:11] --> Associating Network 'PROFIBUS_1' with PLC 'PLC' (via Node 'P1' Addr: 1)
[23:43:11] Mapping Device/Node 'PB1' (NodeID:c796e175-c770-43f0-8191-fc91996c0147, Addr:12) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:0b44f55a-63c1-49e8-beea-24dc5d3226e3, Addr:20) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:25cfc251-f946-40c5-992d-ad6387677acb, Addr:21) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:57999375-ec72-46ef-8ec2-6c3178e8acf8, Addr:22) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:54e8db6a-9443-41a4-a85b-cf0722c1d299, Addr:10) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:4786bab6-4097-4651-ac19-6cadfc7ea735, Addr:8) to Network 'PROFIBUS_1'
[23:43:11] Mapping Device/Node 'PB1' (NodeID:1f08afcb-111f-428f-915e-69363af1b09a, Addr:40) to Network 'PROFIBUS_1'
[23:43:11] Data extraction and structuring complete.
[23:43:11] Generating JSON output: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export.hierarchical.json
[23:43:11] JSON data written successfully.
[23:43:11] Markdown summary written to: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_Hardware_Tree.md
[23:43:11] IO upward debug tree written to: C:\Trabajo\SIDEL\06 - E5.007363 - Modifica O&U - SAE196 (cip integrato)\Reporte\IOExport\SAE196_c0.2_CAx_Export_IO_Upward_Debug.md
[23:43:11] Script finished.
[23:43:12] Ejecución de x3.py finalizada (success). Duración: 0:00:05.235415.
[23:43:12] Log completo guardado en: d:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\ObtainIOFromProjectTia\log_x3.txt
[17:15:12] Iniciando ejecución de x1.py en C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS...
[17:15:14] Working directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
[17:15:14] Input directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS
[17:15:14] Output directory: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs
[17:15:14] Cronologia file: C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
[17:15:14] Attachments directory: C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\adjuntos
[17:15:14] Beautify rules file: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\config\beautify_rules.json
[17:15:14] Found 1 .eml files
[17:15:14] Loaded 0 existing messages
[17:15:14] Processing C:\Trabajo\SIDEL\EMAILs\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS\I_ E5.007727 _ Evo On - SFSRFH300172 + SFSRFH300109 - ANDIA LACTEOS.eml
[17:15:14] Aplicando reglas de prioridad 1
[17:15:14] Aplicando reglas de prioridad 2
[17:15:14] Aplicando reglas de prioridad 3
[17:15:14] Aplicando reglas de prioridad 4
[17:15:14] Estadísticas de procesamiento:
[17:15:14] - Total mensajes encontrados: 1
[17:15:14] - Mensajes únicos añadidos: 1
[17:15:14] - Mensajes duplicados ignorados: 0
[17:15:14] Writing 1 messages to C:/Users/migue/OneDrive/Miguel/Obsidean/Trabajo/VM/04-SIDEL/00 - MASTER/EMAILs\cronologia.md
[17:15:14] Ejecución de x1.py finalizada (success). Duración: 0:00:01.628641.
[17:15:14] Log completo guardado en: D:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\EmailCrono\log_x1.txt

BIN
icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

View File

@ -1,15 +1,16 @@
flask
flask-sock
lxml
pandas
google-cloud-translate
openai
ollama
langid
openpyxl
beautifulsoup4
requests
mammoth
html2text
pypandoc
# siemens-tia-scripting # Requiere instalación especial de TIA Portal Openness
beautifulsoup4==4.13.4
Flask==3.1.0
flask_sock==0.7.0
html2text==2025.4.15
langid==1.1.6
lxml==5.4.0
mammoth==1.9.0
ollama==0.4.8
openai==1.77.0
openpyxl==3.1.5
pandas==2.2.3
protobuf==6.30.2
pypandoc==1.15
Requests==2.32.3
siemens_tia_scripting==1.0.7
sympy==1.13.3

View File

@ -418,6 +418,12 @@ function createFieldEditor(key, field) {
class="w-full p-2 border rounded"
onchange="updateVisualSchema()">
</div>
<div>
<label class="block text-sm font-bold mb-2">Valor por Defecto</label>
<input type="text" value="${field.default !== undefined ? field.default : ''}"
class="w-full p-2 border rounded"
onchange="updateVisualSchema()">
</div>
</div>
${field.enum ? `
<div class="enum-container mt-4">
@ -494,28 +500,55 @@ function updateVisualSchema() {
const inputs = field.getElementsByTagName('input');
const select = field.getElementsByTagName('select')[0];
const key = inputs[0].value;
const fieldType = select.value; // string, directory, number, boolean, enum
const title = inputs[1].value;
const description = inputs[2].value;
const defaultValueInput = inputs[3]; // El nuevo input de valor por defecto
const defaultValueString = defaultValueInput.value;
let propertyDefinition = {
type: fieldType === 'directory' || fieldType === 'enum' ? 'string' : fieldType, // El tipo base
title: title,
description: description
};
// Añadir formato específico si es directorio
if (select.value === 'directory') {
schema.properties[key] = {
type: 'string',
format: 'directory',
title: inputs[1].value,
description: inputs[2].value
};
} else if (select.value === 'enum') {
schema.properties[key] = {
type: 'string',
title: inputs[1].value,
description: inputs[2].value,
enum: field.querySelector('textarea').value.split('\n').filter(v => v.trim())
};
} else {
schema.properties[key] = {
type: select.value,
title: inputs[1].value,
description: inputs[2].value
};
propertyDefinition.format = 'directory';
}
// Añadir enum si es de tipo enum
if (select.value === 'enum') {
propertyDefinition.enum = field.querySelector('textarea').value.split('\n').filter(v => v.trim());
}
// Procesar y añadir el valor por defecto si se proporcionó
if (defaultValueString !== null && defaultValueString.trim() !== '') {
let typedDefaultValue = defaultValueString;
try {
if (propertyDefinition.type === 'number' || propertyDefinition.type === 'integer') {
typedDefaultValue = Number(defaultValueString);
if (isNaN(typedDefaultValue)) {
console.warn(`Valor por defecto inválido para número en campo '${key}': ${defaultValueString}. Se omitirá.`);
// No añadir default si no es un número válido
} else {
// Opcional: truncar si el tipo es integer
if (propertyDefinition.type === 'integer' && !Number.isInteger(typedDefaultValue)) {
typedDefaultValue = Math.trunc(typedDefaultValue);
}
propertyDefinition.default = typedDefaultValue;
}
} else if (propertyDefinition.type === 'boolean') {
typedDefaultValue = ['true', '1', 'yes', 'on'].includes(defaultValueString.toLowerCase());
propertyDefinition.default = typedDefaultValue;
} else { // string, enum, directory
propertyDefinition.default = typedDefaultValue; // Ya es string
}
} catch (e) {
console.error(`Error procesando valor por defecto para campo '${key}':`, e);
}
}
schema.properties[key] = propertyDefinition;
});
const jsonEditor = document.getElementById('json-editor');
@ -960,6 +993,81 @@ function collectFormData(level) {
return data;
}
// Añade esta función al final de tu archivo static/js/script.js
function shutdownServer() {
if (confirm("¿Estás seguro de que quieres detener el servidor? La aplicación se cerrará.")) {
fetch('/_shutdown', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
})
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
alert("El servidor se está deteniendo. Puede que necesites cerrar esta pestaña manualmente.");
// Opcionalmente, puedes intentar cerrar la ventana/pestaña
// window.close(); // Esto puede no funcionar en todos los navegadores por seguridad
document.body.innerHTML = '<div class="alert alert-info">El servidor se ha detenido. Cierra esta ventana.</div>';
} else {
alert("Error al intentar detener el servidor: " + data.message);
}
})
.catch(error => {
// Es normal recibir un error de red aquí porque el servidor se está apagando
console.warn("Error esperado al detener el servidor (puede que ya se haya detenido):", error);
alert("Solicitud de detención enviada. El servidor debería detenerse. Cierra esta ventana.");
document.body.innerHTML = '<div class="alert alert-info">El servidor se está deteniendo. Cierra esta ventana.</div>';
});
}
}
// Asegúrate de que las funciones fetchLogs y clearLogs también estén definidas en este archivo si las usas.
// Ejemplo de fetchLogs y clearLogs (si no las tienes ya):
function fetchLogs() {
fetch('/api/logs')
.then(response => response.json())
.then(data => {
const logOutput = document.getElementById('log-output');
logOutput.textContent = data.logs || 'No hay logs.';
logOutput.scrollTop = logOutput.scrollHeight; // Scroll to bottom
})
.catch(error => console.error('Error fetching logs:', error));
}
function clearLogs() {
if (confirm("¿Estás seguro de que quieres borrar los logs?")) {
fetch('/api/logs', { method: 'DELETE' })
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
fetchLogs(); // Refresh logs after clearing
showToast('Logs borrados correctamente.');
} else {
showToast('Error al borrar los logs.', 'error');
}
})
.catch(error => {
console.error('Error clearing logs:', error);
showToast('Error de red al borrar los logs.', 'error');
});
}
}
// Necesitarás una función showToast o similar si la usas
function showToast(message, type = 'success') {
// Implementa tu lógica de Toast aquí
console.log(`Toast (${type}): ${message}`);
alert(`Toast (${type}): ${message}`); // Simple alert como placeholder
}
// Llama a fetchLogs al cargar la página si es necesario
// document.addEventListener('DOMContentLoaded', fetchLogs);
// Agregar función para guardar configuración
async function saveConfig(level) {
const saveButton = document.getElementById(`save-config-${level}`);

View File

@ -68,6 +68,13 @@
</div>
</div>
</div>
<!-- Botón para detener el servidor -->
<div class="mt-8 pt-4 border-t border-gray-300">
<button class="w-full bg-red-600 hover:bg-red-700 text-white px-4 py-2 rounded shadow" onclick="shutdownServer()">
Detener Servidor
</button>
</div>
</div>
</div>