From 006e2ed7d604b6c5304aababa80c7eeb393b9a66 Mon Sep 17 00:00:00 2001 From: Miguel Date: Sat, 3 May 2025 21:58:06 +0200 Subject: [PATCH] Agregada la opcion de ocultar scripts y de editar las descripcion --- .gitignore | 1 - app.py | 25 +- .../script_groups/example_group/log_x1.txt | 6 +- .../example_group/scripts_description.json | 14 + data/log.txt | 22 ++ lib/claude_file_organizer.py | 171 +++++++++ lib/config_handler.py | 168 +++++++++ lib/config_manager.py | 324 ++++++++++++++++++ lib/directory_manager.py | 97 ++++++ lib/group_manager.py | 45 +++ lib/logger.py | 54 +++ lib/schema_handler.py | 285 +++++++++++++++ lib/script_executor.py | 233 +++++++++++++ static/js/scripts.js | 130 ++++++- templates/index.html | 43 ++- 15 files changed, 1601 insertions(+), 17 deletions(-) create mode 100644 backend/script_groups/example_group/scripts_description.json create mode 100644 lib/claude_file_organizer.py create mode 100644 lib/config_handler.py create mode 100644 lib/config_manager.py create mode 100644 lib/directory_manager.py create mode 100644 lib/group_manager.py create mode 100644 lib/logger.py create mode 100644 lib/schema_handler.py create mode 100644 lib/script_executor.py diff --git a/.gitignore b/.gitignore index 0a19790..f2f3b67 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,6 @@ dist/ downloads/ eggs/ .eggs/ -lib/ lib64/ parts/ sdist/ diff --git a/app.py b/app.py index 6b9280b..a64c82b 100644 --- a/app.py +++ b/app.py @@ -136,7 +136,10 @@ def handle_schema(level): @app.route("/api/scripts/") def get_scripts(group): - return jsonify(config_manager.list_scripts(group)) + # list_scripts ahora devuelve detalles y filtra los ocultos + scripts = config_manager.list_scripts(group) + # El frontend espera 'name' y 'description', mapeamos desde 'display_name' y 'short_description' + return jsonify([{"name": s['display_name'], "description": s['short_description'], "filename": s['filename']} for s in scripts]) @app.route("/api/working-directory", methods=["POST"]) @@ -222,6 +225,26 @@ def handle_group_description(group): return jsonify({"status": "error", "message": str(e)}), 500 +@app.route("/api/script-details//", methods=["GET", "POST"]) +def handle_script_details(group, script_filename): + if request.method == "GET": + try: + details = config_manager.get_script_details(group, script_filename) + return jsonify(details) + except Exception as e: + print(f"Error getting script details for {group}/{script_filename}: {e}") + return jsonify({"status": "error", "message": str(e)}), 500 + else: # POST + try: + data = request.json + print(f"DEBUG: Received data for update_script_details ({group}/{script_filename}): {data}") # <-- Añade esta línea + result = config_manager.update_script_details(group, script_filename, data) + return jsonify(result) + except Exception as e: + print(f"Error updating script details for {group}/{script_filename}: {e}") + return jsonify({"status": "error", "message": str(e)}), 500 + + @app.route("/api/directory-history/") def get_directory_history(group): history = config_manager.get_directory_history(group) diff --git a/backend/script_groups/example_group/log_x1.txt b/backend/script_groups/example_group/log_x1.txt index 193c09e..043b954 100644 --- a/backend/script_groups/example_group/log_x1.txt +++ b/backend/script_groups/example_group/log_x1.txt @@ -1,9 +1,9 @@ --- Log de Ejecución: x1.py --- Grupo: example_group Directorio de Trabajo: C:\Estudio -Inicio: 2025-05-03 20:54:12 -Fin: 2025-05-03 20:54:17 -Duración: 0:00:05.196719 +Inicio: 2025-05-03 21:21:53 +Fin: 2025-05-03 21:21:58 +Duración: 0:00:05.144464 Estado: SUCCESS (Código de Salida: 0) --- SALIDA ESTÁNDAR (STDOUT) --- diff --git a/backend/script_groups/example_group/scripts_description.json b/backend/script_groups/example_group/scripts_description.json new file mode 100644 index 0000000..ea53198 --- /dev/null +++ b/backend/script_groups/example_group/scripts_description.json @@ -0,0 +1,14 @@ +{ + "x1.py": { + "display_name": "x1: Basico muestra los datos de config", + "short_description": "Script de prueba que imprime las configuraciones y realiza una tarea simple.", + "long_description": "Test", + "hidden": false + }, + "x2.py": { + "display_name": "x2 : Simula un proceso", + "short_description": "Script de prueba que simula un proceso de análisis de datos.", + "long_description": "", + "hidden": false + } +} \ No newline at end of file diff --git a/data/log.txt b/data/log.txt index 4514b26..29d640a 100644 --- a/data/log.txt +++ b/data/log.txt @@ -20,3 +20,25 @@ [20:54:17] ¡Proceso completado! [20:54:17] Ejecución de x1.py finalizada (success). Duración: 0:00:05.196719. [20:54:17] Log completo guardado en: d:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\example_group\log_x1.txt +[21:21:53] Iniciando ejecución de x1.py en C:\Estudio... +[21:21:53] === Ejecutando Script de Prueba 1 === +[21:21:53] Configuraciones cargadas: +[21:21:53] Nivel 1: { +[21:21:53] "api_key": "your-api-key-here", +[21:21:53] "model": "gpt-3.5-turbo" +[21:21:53] } +[21:21:53] Nivel 2: { +[21:21:53] "input_dir": "D:/Datos/Entrada", +[21:21:53] "output_dir": "D:/Datos/Salida", +[21:21:53] "batch_size": 50 +[21:21:53] } +[21:21:53] Nivel 3: {} +[21:21:53] Simulando procesamiento... +[21:21:54] Progreso: 20% +[21:21:55] Progreso: 40% +[21:21:56] Progreso: 60% +[21:21:57] Progreso: 80% +[21:21:58] Progreso: 100% +[21:21:58] ¡Proceso completado! +[21:21:58] Ejecución de x1.py finalizada (success). Duración: 0:00:05.144464. +[21:21:58] Log completo guardado en: d:\Proyectos\Scripts\ParamManagerScripts\backend\script_groups\example_group\log_x1.txt diff --git a/lib/claude_file_organizer.py b/lib/claude_file_organizer.py new file mode 100644 index 0000000..2889dae --- /dev/null +++ b/lib/claude_file_organizer.py @@ -0,0 +1,171 @@ +import os +import shutil +from pathlib import Path +import re + +class ClaudeProjectOrganizer: + def __init__(self): + self.source_dir = Path.cwd() + self.claude_dir = self.source_dir / 'claude' + self.file_mapping = {} + + def should_skip_directory(self, dir_name): + skip_dirs = {'.git', '__pycache__', 'venv', 'env', '.pytest_cache', '.vscode', 'claude'} + return dir_name in skip_dirs + + def get_comment_prefix(self, file_extension): + """Determina el prefijo de comentario según la extensión del archivo""" + comment_styles = { + '.py': '#', + '.js': '//', + '.css': '/*', + '.html': '', + } + return comment_suffixes.get(file_extension.lower(), '') + + def normalize_path(self, path_str: str) -> str: + """Normaliza la ruta usando forward slashes""" + return str(path_str).replace('\\', '/') + + def check_existing_path_comment(self, content: str, normalized_path: str, comment_prefix: str) -> bool: + """Verifica si ya existe un comentario con la ruta en el archivo""" + # Escapar caracteres especiales en el prefijo de comentario para regex + escaped_prefix = re.escape(comment_prefix) + + # Crear patrones para buscar tanto forward como backward slashes + forward_pattern = f"{escaped_prefix}\\s*{re.escape(normalized_path)}\\b" + backward_path = normalized_path.replace('/', '\\\\') # Doble backslash para el patrón + backward_pattern = f"{escaped_prefix}\\s*{re.escape(backward_path)}" + + # Buscar en las primeras líneas del archivo + first_lines = content.split('\n')[:5] + for line in first_lines: + if (re.search(forward_pattern, line) or + re.search(backward_pattern, line)): + return True + return False + + def add_path_comment(self, file_path: Path, content: str) -> str: + """Agrega un comentario con la ruta al inicio del archivo si no existe""" + relative_path = file_path.relative_to(self.source_dir) + normalized_path = self.normalize_path(relative_path) + comment_prefix = self.get_comment_prefix(file_path.suffix) + + if comment_prefix is None: + return content + + comment_suffix = self.get_comment_suffix(file_path.suffix) + + # Verificar si ya existe el comentario + if self.check_existing_path_comment(content, normalized_path, comment_prefix): + print(f" - Comentario de ruta ya existe en {file_path}") + return content + + path_comment = f"{comment_prefix} {normalized_path}{comment_suffix}\n" + + # Para archivos HTML, insertar después del doctype si existe + if file_path.suffix.lower() == '.html': + if content.lower().startswith('') + 1 + return content[:doctype_end] + '\n' + path_comment + content[doctype_end:] + + return path_comment + content + + def clean_claude_directory(self): + if self.claude_dir.exists(): + shutil.rmtree(self.claude_dir) + self.claude_dir.mkdir() + print(f"Directorio claude limpiado: {self.claude_dir}") + + def copy_files(self): + self.clean_claude_directory() + + for root, dirs, files in os.walk(self.source_dir): + dirs[:] = [d for d in dirs if not self.should_skip_directory(d)] + current_path = Path(root) + + for file in files: + file_path = current_path / file + + if file.endswith(('.py', '.js', '.css', '.html', '.json', '.yml', '.yaml', + '.tsx', '.ts', '.jsx', '.scss', '.less')): + target_path = self.claude_dir / file + + # Si el archivo ya existe en el directorio claude, agregar un sufijo numérico + if target_path.exists(): + base = target_path.stem + ext = target_path.suffix + counter = 1 + while target_path.exists(): + target_path = self.claude_dir / f"{base}_{counter}{ext}" + counter += 1 + + try: + # Leer el contenido del archivo + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Agregar el comentario con la ruta si no existe + modified_content = self.add_path_comment(file_path, content) + + # Escribir el nuevo contenido + with open(target_path, 'w', encoding='utf-8', newline='\n') as f: + f.write(modified_content) + + self.file_mapping[str(file_path)] = target_path.name + print(f"Copiado: {file_path} -> {target_path}") + + except UnicodeDecodeError: + print(f"Advertencia: No se pudo procesar {file_path} como texto. Copiando sin modificar...") + shutil.copy2(file_path, target_path) + except Exception as e: + print(f"Error procesando {file_path}: {str(e)}") + + def generate_tree_report(self): + """Genera el reporte en formato árbol visual""" + report = ["Estructura del proyecto original:\n"] + + def add_to_report(path, prefix="", is_last=True): + report.append(prefix + ("└── " if is_last else "├── ") + path.name) + + if path.is_dir() and not self.should_skip_directory(path.name): + children = sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name)) + children = [c for c in children if not (c.is_dir() and self.should_skip_directory(c.name))] + + for i, child in enumerate(children): + is_last_child = i == len(children) - 1 + new_prefix = prefix + (" " if is_last else "│ ") + add_to_report(child, new_prefix, is_last_child) + + add_to_report(self.source_dir) + + report_path = self.claude_dir / "project_structure.txt" + with open(report_path, "w", encoding="utf-8") as f: + f.write("\n".join(report)) + print(f"\nReporte generado en: {report_path}") + +def main(): + try: + print("Iniciando organización de archivos para Claude...") + organizer = ClaudeProjectOrganizer() + organizer.copy_files() + organizer.generate_tree_report() + print("\n¡Proceso completado exitosamente!") + except Exception as e: + print(f"\nError durante la ejecución: {str(e)}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/lib/config_handler.py b/lib/config_handler.py new file mode 100644 index 0000000..c2a773f --- /dev/null +++ b/lib/config_handler.py @@ -0,0 +1,168 @@ +import os +import json +from typing import Dict, Any, Optional, Callable +from .schema_handler import SchemaHandler # Import SchemaHandler + + +class ConfigHandler: + def __init__( + self, + data_path: str, + script_groups_path: str, + get_workdir_func: Callable[[], Optional[str]], + schema_handler: SchemaHandler, + ): + self.data_path = data_path + self.script_groups_path = script_groups_path + self._get_working_directory = ( + get_workdir_func # Function to get current workdir + ) + self.schema_handler = schema_handler # Instance of SchemaHandler + + def get_config(self, level: str, group: str = None) -> Dict[str, Any]: + """ + Get configuration for specified level. + Applies default values from the corresponding schema if the config + file doesn't exist or is missing keys with defaults. + """ + config_data = {} + needs_save = False + schema = None + data_path = self._get_config_path(level, group) + schema_path_for_debug = "N/A" # For logging + + if not data_path: + if level == "3": # Level 3 depends on working directory + return {} # Return empty if working dir not set for L3 + else: + return { + "error": f"Could not determine config path for level {level}, group {group}" + } + + # Determine schema path for logging purposes (actual loading done by schema_handler) + if level == "1": + schema_path_for_debug = os.path.join(self.data_path, "esquema_general.json") + elif level == "2" and group: + schema_path_for_debug = os.path.join( + self.script_groups_path, group, "esquema_group.json" + ) + elif level == "3" and group: + schema_path_for_debug = os.path.join( + self.script_groups_path, group, "esquema_work.json" + ) + elif level == "3": + schema_path_for_debug = "N/A (Level 3 without group)" + + # Get schema using SchemaHandler + try: + schema = self.schema_handler.get_schema(level, group) + except Exception as e: + print( + f"Warning: Could not load schema for level {level}, group {group}. Defaults will not be applied. Error: {e}" + ) + schema = None + + # Try to load existing data + data_file_exists = os.path.exists(data_path) + if data_file_exists: + try: + with open(data_path, "r", encoding="utf-8") as f_data: + content = f_data.read() + if content.strip(): + config_data = json.loads(content) + else: + print( + f"Warning: Data file {data_path} is empty. Will initialize with defaults." + ) + needs_save = True + except json.JSONDecodeError: + print( + f"Warning: Could not decode JSON from {data_path}. Will initialize with defaults." + ) + config_data = {} + needs_save = True + except Exception as e: + print( + f"Error reading data from {data_path}: {e}. Will attempt to initialize with defaults." + ) + config_data = {} + needs_save = True + else: # File doesn't exist + print( + f"Info: Data file not found at {data_path}. Will initialize with defaults." + ) + needs_save = True + + # Apply defaults from schema + if schema and isinstance(schema, dict) and "properties" in schema: + schema_properties = schema.get("properties", {}) + if isinstance(schema_properties, dict): + for key, prop_definition in schema_properties.items(): + if ( + isinstance(prop_definition, dict) + and key not in config_data + and "default" in prop_definition + ): + print( + f"Info: Applying default for '{key}' from schema {schema_path_for_debug}" + ) + config_data[key] = prop_definition["default"] + needs_save = True + else: + print( + f"Warning: 'properties' in schema {schema_path_for_debug} is not a dictionary. Cannot apply defaults." + ) + + # Save if needed + if needs_save: + try: + print(f"Info: Saving updated config data to: {data_path}") + os.makedirs(os.path.dirname(data_path), exist_ok=True) + with open(data_path, "w", encoding="utf-8") as f_data: + json.dump(config_data, f_data, indent=2, ensure_ascii=False) + except IOError as e: + print(f"Error: Could not write data file to {data_path}: {e}") + except Exception as e: + print(f"Unexpected error saving data to {data_path}: {e}") + + return config_data + + def update_config( + self, level: str, data: Dict[str, Any], group: str = None + ) -> Dict[str, str]: + """Update configuration for specified level.""" + path = self._get_config_path(level, group) + if not path: + return { + "status": "error", + "message": f"Could not determine config path for level {level}, group {group}", + } + + try: + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + print(f"Info: Config successfully updated at {path}") + return {"status": "success"} + except Exception as e: + print(f"Error updating config at {path}: {str(e)}") + return {"status": "error", "message": str(e)} + + def _get_config_path( + self, level: str, group: Optional[str] = None + ) -> Optional[str]: + """Helper to determine the config file path.""" + if level == "1": + return os.path.join(self.data_path, "data.json") + elif level == "2": + if not group: + return None + return os.path.join(self.script_groups_path, group, "data.json") + elif level == "3": + working_directory = self._get_working_directory() + if working_directory and os.path.isdir(working_directory): + return os.path.join(working_directory, "data.json") + else: + return None # Cannot determine L3 path without valid workdir + else: + return None diff --git a/lib/config_manager.py b/lib/config_manager.py new file mode 100644 index 0000000..df9369d --- /dev/null +++ b/lib/config_manager.py @@ -0,0 +1,324 @@ +import os +import json +from typing import Dict, Any, List, Optional +import re # Necesario para extraer docstring + +# Import the new modules +from .logger import Logger +from .directory_manager import DirectoryManager +from .group_manager import GroupManager +from .schema_handler import SchemaHandler +from .config_handler import ConfigHandler +from .script_executor import ScriptExecutor + +# Keep time for execution throttling state +import time +from datetime import datetime # Needed for append_log timestamp if we keep it here + + +# --- ConfigurationManager Class --- +class ConfigurationManager: + def __init__(self): + # Adjust base_path to point to the project root (one level up from lib) + lib_dir = os.path.dirname(os.path.abspath(__file__)) + self.base_path = os.path.dirname(lib_dir) + self.data_path = os.path.join(self.base_path, "data") + self.script_groups_path = os.path.join( + self.base_path, "backend", "script_groups" + ) + self.working_directory = None + # log_file_path is now managed by the Logger instance + + # State for script execution throttling + self.last_execution_time = 0 + # Minimum seconds between script executions to prevent rapid clicks + self.min_execution_interval = 1 + + # Instantiate handlers/managers + self.logger = Logger(os.path.join(self.data_path, "log.txt")) # Pass log path to Logger + self.dir_manager = DirectoryManager(self.script_groups_path, self._set_working_directory_internal) + self.group_manager = GroupManager(self.script_groups_path) + self.schema_handler = SchemaHandler(self.data_path, self.script_groups_path, self._get_working_directory_internal) + self.config_handler = ConfigHandler(self.data_path, self.script_groups_path, self._get_working_directory_internal, self.schema_handler) + self.script_executor = ScriptExecutor( + self.script_groups_path, + self.dir_manager, + self.config_handler, + self.logger, # Pass the central logger instance + self._get_execution_state_internal, + self._set_last_execution_time_internal + ) + + # --- Internal Callbacks/Getters for Sub-Managers --- + def _set_working_directory_internal(self, path: Optional[str]): + """Callback for DirectoryManager to update the main working directory.""" + if path and os.path.isdir(path): + self.working_directory = path + # Create data.json in the new working directory if it doesn't exist + # This ensures L3 config can be created/read immediately after setting WD + data_json_path = os.path.join(path, "data.json") + if not os.path.exists(data_json_path): + try: + with open(data_json_path, 'w', encoding='utf-8') as f: + json.dump({}, f) + print(f"Info: Created empty data.json in new working directory: {data_json_path}") + except Exception as e: + print(f"Warning: Could not create data.json in {path}: {e}") + else: + self.working_directory = None + + def _get_working_directory_internal(self) -> Optional[str]: + """Provides the current working directory to sub-managers.""" + return self.working_directory + + def _get_execution_state_internal(self) -> Dict[str, Any]: + """Provides execution throttling state to ScriptExecutor.""" + return {"last_time": self.last_execution_time, "interval": self.min_execution_interval} + + def _set_last_execution_time_internal(self, exec_time: float): + """Callback for ScriptExecutor to update the last execution time.""" + self.last_execution_time = exec_time + + # --- Logging Methods (Delegated) --- + def append_log(self, message: str) -> None: + # The Logger class now handles timestamping internally. + # We just need to pass the raw message. + # The broadcast_message in app.py might still add its own timestamp for display, + # but the core logging is handled by the Logger instance. + self.logger.append_log(message) + + def read_log(self) -> str: + return self.logger.read_log() + + def clear_log(self) -> bool: + return self.logger.clear_log() + + # --- Working Directory Methods (Delegated) --- + def set_work_dir(self, group: str, path: str) -> Dict[str, str]: + """Sets the working directory for a group and updates the global working directory.""" + # Note: This now primarily updates the group's work_dir.json and calls the internal setter + return self.dir_manager.set_work_dir_for_group(group, path) + + def get_work_dir(self, group: str) -> Optional[str]: + """Gets the stored working directory for a group and sets it globally if valid.""" + path = self.dir_manager.get_work_dir_for_group(group) + # Ensure the global working directory is updated when fetched successfully + self._set_working_directory_internal(path) + return path + + def get_directory_history(self, group: str) -> List[str]: + return self.dir_manager.get_directory_history(group) + + # --- Script Group Methods (Delegated) --- + def get_script_groups(self) -> List[Dict[str, Any]]: + return self.group_manager.get_script_groups() + + def get_group_details(self, group: str) -> Dict[str, Any]: + """Get details (description, etc.) for a specific group.""" + group_path = os.path.join(self.script_groups_path, group) + if not os.path.isdir(group_path): + return {"error": "Group not found"} + # Use the internal method of GroupManager + details = self.group_manager._get_group_description(group_path) + # Ensure default values if description file is missing/empty + details.setdefault("name", group) + details.setdefault("description", "Sin descripción") + details.setdefault("version", "1.0") + details.setdefault("author", "Unknown") + return details + + def update_group_description(self, group: str, data: Dict[str, Any]) -> Dict[str, str]: + """Update the description file for a specific group.""" + description_path = os.path.join(self.script_groups_path, group, "description.json") + try: + os.makedirs(os.path.dirname(description_path), exist_ok=True) + with open(description_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + return {"status": "success"} + except Exception as e: + print(f"Error updating group description for {group}: {e}") + return {"status": "error", "message": str(e)} + + # --- Configuration (data.json) Methods (Delegated) --- + def get_config(self, level: str, group: str = None) -> Dict[str, Any]: + # ConfigHandler uses the _get_working_directory_internal callback + return self.config_handler.get_config(level, group) + + def update_config( + self, level: str, data: Dict[str, Any], group: str = None + ) -> Dict[str, str]: + return self.config_handler.update_config(level, data, group) + + # --- Schema Methods (Delegated) --- + def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: + # SchemaHandler uses the _get_working_directory_internal callback + return self.schema_handler.get_schema(level, group) + + def update_schema( + self, level: str, data: Dict[str, Any], group: str = None + ) -> Dict[str, str]: + # SchemaHandler uses the _get_working_directory_internal callback + return self.schema_handler.update_schema(level, data, group) + + # --- Script Listing and Execution Methods --- + + # --- Métodos para manejar scripts_description.json --- + + def _get_group_path(self, group_id: str) -> Optional[str]: + """Obtiene la ruta completa a la carpeta de un grupo.""" + path = os.path.join(self.script_groups_path, group_id) + return path if os.path.isdir(path) else None + + def _get_script_descriptions_path(self, group_id: str) -> Optional[str]: + """Obtiene la ruta al archivo scripts_description.json de un grupo.""" + group_path = self._get_group_path(group_id) + if not group_path: + return None + return os.path.join(group_path, 'scripts_description.json') + + def _load_script_descriptions(self, group_id: str) -> Dict[str, Any]: + """Carga las descripciones de scripts desde scripts_description.json.""" + path = self._get_script_descriptions_path(group_id) + if path and os.path.exists(path): + try: + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + except json.JSONDecodeError: + print(f"Error: JSON inválido en {path}") + return {} + except Exception as e: + print(f"Error leyendo {path}: {e}") + return {} + return {} + + def _save_script_descriptions(self, group_id: str, descriptions: Dict[str, Any]) -> bool: + """Guarda las descripciones de scripts en scripts_description.json.""" + path = self._get_script_descriptions_path(group_id) + if path: + try: + os.makedirs(os.path.dirname(path), exist_ok=True) # Asegura que el directorio del grupo existe + with open(path, 'w', encoding='utf-8') as f: + json.dump(descriptions, f, indent=4, ensure_ascii=False) + return True + except Exception as e: + print(f"Error escribiendo en {path}: {e}") + return False + return False + + def _extract_short_description(self, script_path: str) -> str: + """Extrae la primera línea del docstring de un script Python.""" + try: + with open(script_path, 'r', encoding='utf-8') as f: + content = f.read() + # Buscar docstring al inicio del archivo """...""" o '''...''' + match = re.match(r'^\s*("""(.*?)"""|\'\'\'(.*?)\'\'\')', content, re.DOTALL | re.MULTILINE) + if match: + # Obtener el contenido del docstring (grupo 2 o 3) + docstring = match.group(2) or match.group(3) + # Tomar la primera línea no vacía + first_line = next((line.strip() for line in docstring.strip().splitlines() if line.strip()), None) + return first_line if first_line else "Sin descripción corta." + except Exception as e: + print(f"Error extrayendo descripción de {script_path}: {e}") + return "Sin descripción corta." + + def list_scripts(self, group: str) -> List[Dict[str, str]]: + """Lista scripts visibles con sus detalles desde scripts_description.json.""" + group_path = self._get_group_path(group) + if not group_path: + return [] + + descriptions = self._load_script_descriptions(group) + updated = False + scripts_details = [] + + try: + # Listar archivos .py en el directorio del grupo + script_files = [f for f in os.listdir(group_path) if f.endswith('.py') and os.path.isfile(os.path.join(group_path, f))] + + for filename in script_files: + script_path = os.path.join(group_path, filename) + if filename not in descriptions: + print(f"Script '{filename}' no encontrado en descripciones, auto-populando.") + short_desc = self._extract_short_description(script_path) + descriptions[filename] = { + "display_name": filename.replace('.py', ''), # Nombre por defecto + "short_description": short_desc, + "long_description": "", + "hidden": False + } + updated = True + + # Añadir a la lista si no está oculto + details = descriptions[filename] + if not details.get('hidden', False): + scripts_details.append({ + "filename": filename, # Nombre real del archivo + "display_name": details.get("display_name", filename.replace('.py', '')), + "short_description": details.get("short_description", "Sin descripción corta."), + # No necesitamos enviar la descripción larga aquí + }) + + if updated: + self._save_script_descriptions(group, descriptions) + + # Ordenar por display_name para consistencia + scripts_details.sort(key=lambda x: x['display_name']) + return scripts_details + + except FileNotFoundError: + return [] + except Exception as e: + print(f"Error listando scripts para el grupo {group}: {e}") + return [] + + def get_script_details(self, group_id: str, script_filename: str) -> Dict[str, Any]: + """Obtiene los detalles completos de un script específico.""" + descriptions = self._load_script_descriptions(group_id) + # Devolver detalles o un diccionario por defecto si no existe (aunque list_scripts debería crearlo) + return descriptions.get(script_filename, { + "display_name": script_filename.replace('.py', ''), + "short_description": "No encontrado.", + "long_description": "", + "hidden": False + }) + + def update_script_details(self, group_id: str, script_filename: str, details: Dict[str, Any]) -> Dict[str, str]: + """Actualiza los detalles de un script específico.""" + descriptions = self._load_script_descriptions(group_id) + if script_filename in descriptions: + # Asegurarse de que los campos esperados están presentes y actualizar + descriptions[script_filename]["display_name"] = details.get("display_name", descriptions[script_filename].get("display_name", script_filename.replace('.py', ''))) + descriptions[script_filename]["short_description"] = details.get("short_description", descriptions[script_filename].get("short_description", "")) # Actualizar descripción corta + descriptions[script_filename]["long_description"] = details.get("long_description", descriptions[script_filename].get("long_description", "")) + descriptions[script_filename]["hidden"] = details.get("hidden", descriptions[script_filename].get("hidden", False)) + + if self._save_script_descriptions(group_id, descriptions): + return {"status": "success"} + else: + return {"status": "error", "message": "Fallo al guardar las descripciones de los scripts."} + else: + # Intentar crear la entrada si el script existe pero no está en el JSON (caso raro) + group_path = self._get_group_path(group_id) + script_path = os.path.join(group_path, script_filename) if group_path else None + if script_path and os.path.exists(script_path): + print(f"Advertencia: El script '{script_filename}' existe pero no estaba en descriptions.json. Creando entrada.") + short_desc = self._extract_short_description(script_path) + descriptions[script_filename] = { + "display_name": details.get("display_name", script_filename.replace('.py', '')), + "short_description": short_desc, # Usar la extraída + "long_description": details.get("long_description", ""), + "hidden": details.get("hidden", False) + } + if self._save_script_descriptions(group_id, descriptions): + return {"status": "success"} + else: + return {"status": "error", "message": "Fallo al guardar las descripciones de los scripts después de crear la entrada."} + else: + return {"status": "error", "message": f"Script '{script_filename}' no encontrado en las descripciones ni en el sistema de archivos."} + + def execute_script( + self, group: str, script_name: str, broadcast_fn=None + ) -> Dict[str, Any]: + # ScriptExecutor uses callbacks to get/set execution state + return self.script_executor.execute_script(group, script_name, broadcast_fn) diff --git a/lib/directory_manager.py b/lib/directory_manager.py new file mode 100644 index 0000000..939e20d --- /dev/null +++ b/lib/directory_manager.py @@ -0,0 +1,97 @@ +import os +import json +from typing import Dict, List, Optional, Callable + + +class DirectoryManager: + def __init__( + self, + script_groups_path: str, + set_global_workdir_callback: Callable[[Optional[str]], None], + ): + self.script_groups_path = script_groups_path + self._set_global_workdir = ( + set_global_workdir_callback # Callback to update main manager's workdir + ) + + def get_work_dir_for_group(self, group: str) -> Optional[str]: + """Get working directory path for a script group from work_dir.json.""" + work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") + try: + with open(work_dir_path, "r", encoding="utf-8") as f: + data = json.load(f) + path = data.get("path", "") + if path: + path = os.path.normpath(path) + if path and os.path.isdir(path): + return path + elif path: + print( + f"Warning: Stored working directory for group '{group}' is invalid or does not exist: {path}" + ) + return None + else: + return None + except (FileNotFoundError, json.JSONDecodeError): + return None + except Exception as e: + print(f"Error reading work_dir.json for group '{group}': {e}") + return None + + def get_directory_history(self, group: str) -> List[str]: + """Get the directory history for a script group.""" + work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") + try: + with open(work_dir_path, "r", encoding="utf-8") as f: + data = json.load(f) + history = [os.path.normpath(p) for p in data.get("history", [])] + return [p for p in history if os.path.isdir(p)] + except (FileNotFoundError, json.JSONDecodeError): + return [] + except Exception as e: + print(f"Error reading directory history for group '{group}': {e}") + return [] + + def set_work_dir_for_group(self, group: str, path: str) -> Dict[str, str]: + """Set working directory path for a script group and update history.""" + path = os.path.normpath(path) + + if not os.path.isdir(path): # Check if it's a valid directory + return { + "status": "error", + "message": f"Directory does not exist or is not valid: {path}", + } + + work_dir_file = os.path.join(self.script_groups_path, group, "work_dir.json") + work_dir_folder = os.path.dirname(work_dir_file) + + try: + os.makedirs(work_dir_folder, exist_ok=True) # Ensure group folder exists + try: + with open(work_dir_file, "r", encoding="utf-8") as f: + data = json.load(f) + if "history" in data: + data["history"] = [os.path.normpath(p) for p in data["history"]] + except (FileNotFoundError, json.JSONDecodeError): + data = {"path": "", "history": []} + + data["path"] = path + if "history" not in data: + data["history"] = [] + data["history"] = [ + p for p in data["history"] if os.path.normpath(p) != path + ] + data["history"].insert(0, path) + data["history"] = data["history"][:10] + + with open(work_dir_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + self._set_global_workdir( + path + ) # Update the main manager's working directory + + return {"status": "success", "path": path} + except Exception as e: + print(f"Error setting work directory for group {group} at {path}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lib/group_manager.py b/lib/group_manager.py new file mode 100644 index 0000000..39ecd92 --- /dev/null +++ b/lib/group_manager.py @@ -0,0 +1,45 @@ +import os +import json +from typing import Dict, Any, List + + +class GroupManager: + def __init__(self, script_groups_path: str): + self.script_groups_path = script_groups_path + + def get_script_groups(self) -> List[Dict[str, Any]]: + """Returns list of available script groups with their descriptions.""" + groups = [] + if not os.path.isdir(self.script_groups_path): + print( + f"Warning: Script groups directory not found: {self.script_groups_path}" + ) + return [] + + for d in os.listdir(self.script_groups_path): + group_path = os.path.join(self.script_groups_path, d) + if os.path.isdir(group_path): + description = self._get_group_description(group_path) + groups.append( + { + "id": d, + "name": description.get("name", d), + "description": description.get( + "description", "Sin descripción" + ), + "version": description.get("version", "1.0"), + "author": description.get("author", "Unknown"), + } + ) + return groups + + def _get_group_description(self, group_path: str) -> Dict[str, Any]: + """Get description for a script group.""" + description_file = os.path.join(group_path, "description.json") + try: + if os.path.exists(description_file): + with open(description_file, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f"Error reading group description from {description_file}: {e}") + return {} diff --git a/lib/logger.py b/lib/logger.py new file mode 100644 index 0000000..66cc6f6 --- /dev/null +++ b/lib/logger.py @@ -0,0 +1,54 @@ +import os +from datetime import datetime + + +class Logger: + def __init__(self, log_file_path: str): + self.log_file = log_file_path + self._init_log_file() + + def _init_log_file(self): + """Initialize log file if it doesn't exist""" + log_dir = os.path.dirname(self.log_file) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + if not os.path.exists(self.log_file): + try: + with open(self.log_file, "w", encoding="utf-8") as f: + f.write("") + except Exception as e: + print(f"Error initializing log file {self.log_file}: {e}") + + def append_log(self, message: str) -> None: + """Append a message to the log file with timestamp.""" + try: + timestamp = datetime.now().strftime("[%H:%M:%S] ") + lines = message.split("\n") + lines_with_timestamp = [ + f"{timestamp}{line}\n" for line in lines if line.strip() + ] + + if lines_with_timestamp: + with open(self.log_file, "a", encoding="utf-8") as f: + f.writelines(lines_with_timestamp) + except Exception as e: + print(f"Error writing to log file {self.log_file}: {e}") + + def read_log(self) -> str: + """Read the entire log file""" + try: + with open(self.log_file, "r", encoding="utf-8") as f: + return f.read() + except Exception as e: + print(f"Error reading log file {self.log_file}: {e}") + return "" + + def clear_log(self) -> bool: + """Clear the log file""" + try: + with open(self.log_file, "w", encoding="utf-8") as f: + f.write("") + return True + except Exception as e: + print(f"Error clearing log file {self.log_file}: {e}") + return False diff --git a/lib/schema_handler.py b/lib/schema_handler.py new file mode 100644 index 0000000..cbb6ced --- /dev/null +++ b/lib/schema_handler.py @@ -0,0 +1,285 @@ +import os +import json +import traceback +from typing import Dict, Any, Optional, Callable + + +class SchemaHandler: + def __init__( + self, + data_path: str, + script_groups_path: str, + get_workdir_func: Callable[[], Optional[str]], + ): + self.data_path = data_path + self.script_groups_path = script_groups_path + self._get_working_directory = ( + get_workdir_func # Function to get current workdir from main manager + ) + + def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: + """Get schema for specified level.""" + schema_path = self._get_schema_path(level, group) + if not schema_path: + print( + f"Warning: Could not determine schema path for level '{level}', group '{group}'. Returning empty schema." + ) + return {"type": "object", "properties": {}} + + try: + if os.path.exists(schema_path): + try: + with open(schema_path, "r", encoding="utf-8") as f: + schema = json.load(f) + if ( + not isinstance(schema, dict) + or "properties" not in schema + or "type" not in schema + ): + print( + f"Warning: Schema file {schema_path} has invalid structure. Returning default." + ) + return {"type": "object", "properties": {}} + if not isinstance(schema.get("properties"), dict): + print( + f"Warning: 'properties' in schema file {schema_path} is not a dictionary. Normalizing." + ) + schema["properties"] = {} + return schema + except json.JSONDecodeError: + print( + f"Error: Could not decode JSON from schema file: {schema_path}. Returning default." + ) + return {"type": "object", "properties": {}} + except Exception as e: + print( + f"Error reading schema file {schema_path}: {e}. Returning default." + ) + return {"type": "object", "properties": {}} + else: + print( + f"Info: Schema file not found at {schema_path}. Creating default schema." + ) + default_schema = {"type": "object", "properties": {}} + try: + os.makedirs(os.path.dirname(schema_path), exist_ok=True) + with open(schema_path, "w", encoding="utf-8") as f: + json.dump(default_schema, f, indent=2, ensure_ascii=False) + return default_schema + except Exception as e: + print(f"Error creating default schema file at {schema_path}: {e}") + return {"type": "object", "properties": {}} + + except ValueError as ve: + print(f"Error getting schema path: {ve}") + return {"type": "object", "properties": {}} + except Exception as e: + error_path = schema_path if schema_path else f"Level {level}, Group {group}" + print(f"Unexpected error loading schema from {error_path}: {str(e)}") + return {"type": "object", "properties": {}} + + def update_schema( + self, level: str, data: Dict[str, Any], group: str = None + ) -> Dict[str, str]: + """Update schema for specified level and clean corresponding config.""" + schema_path = self._get_schema_path(level, group) + config_path = self._get_config_path_for_schema( + level, group + ) # Get corresponding config path + + if not schema_path: + return { + "status": "error", + "message": f"Could not determine schema path for level '{level}', group '{group}'", + } + + try: + os.makedirs(os.path.dirname(schema_path), exist_ok=True) + + # Basic validation and normalization of the schema data being saved + if not isinstance(data, dict): + data = {"type": "object", "properties": {}} + if "type" not in data: + data["type"] = "object" + if "properties" not in data or not isinstance(data["properties"], dict): + data["properties"] = {} + + with open(schema_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + print(f"Info: Schema successfully updated at {schema_path}") + + if config_path: + self._clean_config_for_schema(config_path, data) + else: + print( + f"Info: Config cleaning skipped for level {level} (no valid config path)." + ) + + return {"status": "success"} + + except Exception as e: + print(f"Error updating schema at {schema_path}: {str(e)}") + print(traceback.format_exc()) + return {"status": "error", "message": str(e)} + + def _get_schema_path( + self, level: str, group: Optional[str] = None + ) -> Optional[str]: + """Helper to determine the schema file path.""" + clean_level = str(level).split("-")[0] + if clean_level == "1": + return os.path.join(self.data_path, "esquema_general.json") + elif clean_level == "2": + if not group: + raise ValueError("Group is required for level 2 schema") + return os.path.join(self.script_groups_path, group, "esquema_group.json") + elif clean_level == "3": + if not group: + print( + "Warning: Group needed to determine level 3 schema (esquema_work.json)." + ) + return None # Cannot determine without group + return os.path.join(self.script_groups_path, group, "esquema_work.json") + else: + print(f"Warning: Invalid level '{level}' for schema path retrieval.") + return None + + def _get_config_path_for_schema( + self, level: str, group: Optional[str] = None + ) -> Optional[str]: + """Helper to determine the config file path corresponding to a schema level.""" + clean_level = str(level).split("-")[0] + if clean_level == "1": + return os.path.join(self.data_path, "data.json") + elif clean_level == "2": + if not group: + return None + return os.path.join(self.script_groups_path, group, "data.json") + elif clean_level == "3": + working_directory = self._get_working_directory() + if working_directory and os.path.isdir(working_directory): + return os.path.join(working_directory, "data.json") + else: + print( + f"Warning: Working directory not set or invalid ('{working_directory}') for level 3 config path." + ) + return None + else: + return None + + def _clean_config_for_schema( + self, config_path: str, schema: Dict[str, Any] + ) -> None: + """Clean configuration file to match schema structure.""" + try: + if not os.path.exists(config_path): + print( + f"Info: Config file {config_path} not found for cleaning. Skipping." + ) + return + + config = {} + content = "" + with open(config_path, "r", encoding="utf-8") as f: + content = f.read() + if content.strip(): + config = json.loads(content) + else: + print( + f"Info: Config file {config_path} is empty. Cleaning will result in an empty object." + ) + + cleaned_config = self._clean_object_against_schema(config, schema) + + try: + original_config_str = json.dumps(config, sort_keys=True) + cleaned_config_str = json.dumps(cleaned_config, sort_keys=True) + except TypeError as te: + print( + f"Warning: Could not serialize config for comparison during clean: {te}. Forcing save." + ) + original_config_str, cleaned_config_str = "", " " # Force inequality + + if original_config_str != cleaned_config_str or not content.strip(): + print(f"Info: Cleaning config file: {config_path}") + with open(config_path, "w", encoding="utf-8") as f: + json.dump(cleaned_config, f, indent=2, ensure_ascii=False) + else: + print( + f"Info: Config file {config_path} already matches schema. No cleaning needed." + ) + + except json.JSONDecodeError: + print( + f"Error: Could not decode JSON from config file {config_path} during cleaning. Skipping clean." + ) + except IOError as e: + print(f"Error accessing config file {config_path} during cleaning: {e}") + except Exception as e: + print(f"Unexpected error cleaning config {config_path}: {str(e)}") + print(traceback.format_exc()) + + def _clean_object_against_schema(self, data: Any, schema: Dict[str, Any]) -> Any: + """Recursively clean data to match schema structure.""" + if not isinstance(schema, dict): + print( + f"Warning: Invalid schema provided to _clean_object_against_schema (not a dict). Returning data as is: {type(schema)}" + ) + return data + + schema_type = schema.get("type") + + if schema_type == "object": + if not isinstance(data, dict): + return {} + result = {} + schema_props = schema.get("properties", {}) + if not isinstance(schema_props, dict): + print( + "Warning: 'properties' in schema is not a dictionary during cleaning. Returning empty object." + ) + return {} + for key, value in data.items(): + if key in schema_props: + prop_schema = schema_props[key] + if isinstance(prop_schema, dict): + result[key] = self._clean_object_against_schema( + value, prop_schema + ) + else: + print( + f"Warning: Schema for property '{key}' is not a dictionary. Omitting from cleaned data." + ) + return result + + elif schema_type == "array": + if not isinstance(data, list): + return [] + items_schema = schema.get("items") + if isinstance(items_schema, dict): + return [ + self._clean_object_against_schema(item, items_schema) + for item in data + ] + else: + return data # Keep array items as they are if no valid 'items' schema defined + + elif "enum" in schema: + enum_values = schema.get("enum") + if isinstance(enum_values, list): + if data in enum_values: + return data + else: + return None # Or consider schema.get('default') + else: + print( + f"Warning: Invalid 'enum' definition in schema (not a list). Returning None for value '{data}'." + ) + return None + + elif schema_type in ["string", "integer", "number", "boolean", "null"]: + return data # Basic types, return as is (could add type checking) + + else: + # print(f"Warning: Unknown or unhandled schema type '{schema_type}' during cleaning. Returning data as is.") + return data diff --git a/lib/script_executor.py b/lib/script_executor.py new file mode 100644 index 0000000..5be4aab --- /dev/null +++ b/lib/script_executor.py @@ -0,0 +1,233 @@ +import os +import json +import subprocess +import re +import traceback +from typing import Dict, Any, List, Optional, Callable +import sys +import time +from datetime import datetime + +# Import necessary handlers/managers +from .directory_manager import DirectoryManager +from .config_handler import ConfigHandler +from .logger import Logger + + +class ScriptExecutor: + def __init__( + self, + script_groups_path: str, + dir_manager: DirectoryManager, + config_handler: ConfigHandler, + app_logger: Logger, + get_exec_state_func: Callable[ + [], Dict[str, Any] + ], # Func to get {last_time, interval} + set_last_exec_time_func: Callable[[float], None], # Func to set last exec time + ): + self.script_groups_path = script_groups_path + self.dir_manager = dir_manager + self.config_handler = config_handler + self.app_logger = app_logger # Central application logger instance + self._get_exec_state = get_exec_state_func + self._set_last_exec_time = set_last_exec_time_func + + def execute_script( + self, + group: str, + script_name: str, + broadcast_fn: Optional[Callable[[str], None]] = None, + ) -> Dict[str, Any]: + """ + Execute script, broadcast output in real-time, and save final log + to a script-specific file in the script's directory. + """ + exec_state = self._get_exec_state() + last_execution_time = exec_state.get("last_time", 0) + min_execution_interval = exec_state.get("interval", 1) + + current_time = time.time() + time_since_last = current_time - last_execution_time + if time_since_last < min_execution_interval: + msg = f"Por favor espere {min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones" + self.app_logger.append_log(f"Warning: {msg}") # Log throttling attempt + if broadcast_fn: + broadcast_fn(msg) + return {"status": "throttled", "error": msg} + + self._set_last_exec_time(current_time) # Update last execution time + + script_path = os.path.join(self.script_groups_path, group, script_name) + script_dir = os.path.dirname(script_path) + script_base_name = os.path.splitext(script_name)[0] + script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt") + + if not os.path.exists(script_path): + msg = f"Error Fatal: Script no encontrado en {script_path}" + self.app_logger.append_log(msg) + if broadcast_fn: + broadcast_fn(msg) + return {"status": "error", "error": "Script not found"} + + # Get working directory using DirectoryManager + working_dir = self.dir_manager.get_work_dir_for_group(group) + if not working_dir: + msg = f"Error Fatal: Directorio de trabajo no configurado o inválido para el grupo '{group}'" + self.app_logger.append_log(msg) + if broadcast_fn: + broadcast_fn(msg) + return {"status": "error", "error": "Working directory not set"} + if not os.path.isdir(working_dir): # Double check validity + msg = f"Error Fatal: El directorio de trabajo '{working_dir}' no es válido o no existe." + self.app_logger.append_log(msg) + if broadcast_fn: + broadcast_fn(msg) + return {"status": "error", "error": "Invalid working directory"} + + # Aggregate configurations using ConfigHandler + configs = { + "level1": self.config_handler.get_config("1"), + "level2": self.config_handler.get_config("2", group), + "level3": self.config_handler.get_config( + "3", group + ), # Relies on workdir set in main manager + "working_directory": working_dir, + } + print( + f"Debug: Aggregated configs for script execution: {configs}" + ) # Keep for debug + + config_file_path = os.path.join(script_dir, "script_config.json") + try: + with open(config_file_path, "w", encoding="utf-8") as f: + json.dump(configs, f, indent=2, ensure_ascii=False) + except Exception as e: + msg = f"Error Fatal: No se pudieron guardar las configuraciones temporales en {config_file_path}: {str(e)}" + self.app_logger.append_log(msg) + if broadcast_fn: + broadcast_fn(msg) + # Optionally return error here + + stdout_capture = [] + stderr_capture = "" + process = None + start_time = datetime.now() + + try: + if broadcast_fn: + start_msg = f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}..." + broadcast_fn(start_msg) + + creation_flags = ( + subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 + ) + + process = subprocess.Popen( + ["python", "-u", script_path], + cwd=working_dir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8", + errors="replace", + bufsize=1, + env=dict(os.environ, PYTHONIOENCODING="utf-8"), + creationflags=creation_flags, + ) + + while True: + line = process.stdout.readline() + if not line and process.poll() is not None: + break + if line: + cleaned_line = line.rstrip() + stdout_capture.append(cleaned_line) + if broadcast_fn: + broadcast_fn(cleaned_line) + + return_code = process.wait() + end_time = datetime.now() + duration = end_time - start_time + stderr_capture = process.stderr.read() + status = "success" if return_code == 0 else "error" + completion_msg = f"[{end_time.strftime('%H:%M:%S')}] Ejecución de {script_name} finalizada ({status}). Duración: {duration}." + + if stderr_capture: + if status == "error" and broadcast_fn: + broadcast_fn(f"--- ERRORES ---") + broadcast_fn(stderr_capture.strip()) + broadcast_fn(f"--- FIN ERRORES ---") + completion_msg += f" Se detectaron errores (ver log)." + + if broadcast_fn: + broadcast_fn(completion_msg) + + # Write to script-specific log file + try: + with open(script_log_path, "w", encoding="utf-8") as log_f: + log_f.write( + f"--- Log de Ejecución: {script_name} ---\nGrupo: {group}\nDirectorio de Trabajo: {working_dir}\n" + ) + log_f.write( + f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\nFin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\nDuración: {duration}\n" + ) + log_f.write( + f"Estado: {status.upper()} (Código de Salida: {return_code})\n\n--- SALIDA ESTÁNDAR (STDOUT) ---\n" + ) + log_f.write("\n".join(stdout_capture)) + log_f.write("\n\n--- ERRORES (STDERR) ---\n") + log_f.write(stderr_capture if stderr_capture else "Ninguno") + log_f.write("\n--- FIN DEL LOG ---\n") + if broadcast_fn: + broadcast_fn(f"Log completo guardado en: {script_log_path}") + print(f"Info: Script log saved to {script_log_path}") + except Exception as log_e: + err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}" + print(err_msg) + self.app_logger.append_log(f"ERROR: {err_msg}") + if broadcast_fn: + broadcast_fn(err_msg) + + return { + "status": status, + "return_code": return_code, + "error": stderr_capture if stderr_capture else None, + "log_file": script_log_path, + } + + except Exception as e: + end_time = datetime.now() + duration = end_time - start_time + error_msg = ( + f"Error inesperado durante la ejecución de {script_name}: {str(e)}" + ) + traceback_info = traceback.format_exc() + print(error_msg) + print(traceback_info) + self.app_logger.append_log(f"ERROR FATAL: {error_msg}\n{traceback_info}") + if broadcast_fn: + broadcast_fn( + f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}" + ) + + try: # Attempt to write error to script-specific log + with open(script_log_path, "w", encoding="utf-8") as log_f: + log_f.write( + f"--- Log de Ejecución: {script_name} ---\nGrupo: {group}\nDirectorio de Trabajo: {working_dir}\n" + ) + log_f.write( + f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\nFin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n" + ) + log_f.write( + f"Duración: {duration}\nEstado: FATAL ERROR\n\n--- ERROR ---\n{error_msg}\n\n--- TRACEBACK ---\n{traceback_info}\n--- FIN DEL LOG ---\n" + ) + except Exception as log_e: + print(f"Error adicional al intentar guardar el log de error: {log_e}") + + return {"status": "error", "error": error_msg, "traceback": traceback_info} + finally: + if process and process.stderr: + process.stderr.close() + if process and process.stdout: + process.stdout.close() diff --git a/static/js/scripts.js b/static/js/scripts.js index 7936e53..88d3cce 100644 --- a/static/js/scripts.js +++ b/static/js/scripts.js @@ -86,21 +86,129 @@ async function loadConfigs() { } } +// --- Funciones para Editar Detalles del Script --- + +async function editScriptDetails(group, scriptFilename) { + console.log(`[1] editScriptDetails called for: group=${group}, script=${scriptFilename}`); // Log inicial + try { + console.log('[2] Fetching script details...'); // Log antes del fetch + const response = await fetch(`/api/script-details/${group}/${scriptFilename}`); + console.log('[3] Fetch response received:', response); // Log después del fetch + if (!response.ok) { + console.error(`[!] Fetch error: ${response.status} ${response.statusText}`); // Log si la respuesta no es OK + throw new Error(`Error fetching script details: ${response.statusText}`); + } + console.log('[4] Parsing JSON response...'); // Log antes de parsear JSON + const details = await response.json(); + console.log('[5] Script details received:', details); // Log con los detalles + + // Poblar el modal + document.getElementById('edit-script-group').value = group; + document.getElementById('edit-script-filename').value = scriptFilename; + document.getElementById('edit-script-filename-display').textContent = scriptFilename; // Mostrar nombre de archivo + document.getElementById('edit-script-display-name').value = details.display_name || ''; + document.getElementById('edit-script-short-description').value = details.short_description || ''; // Poblar descripción corta + document.getElementById('edit-script-long-description').value = details.long_description || ''; + document.getElementById('edit-script-hidden').checked = details.hidden || false; + + console.log('[6] Populated modal fields.'); // Log después de poblar + // Mostrar el modal + document.getElementById('script-editor-modal').classList.remove('hidden'); + console.log('[7] Modal should be visible now.'); // Log final + + } catch (error) { + console.error('[!] Error in editScriptDetails:', error); // Log en el catch + alert(`Error al cargar detalles del script: ${error.message}`); + } +} + +function closeScriptEditorModal() { + document.getElementById('script-editor-modal').classList.add('hidden'); + // Limpiar campos si es necesario (opcional) + // document.getElementById('edit-script-display-name').value = ''; + // document.getElementById('edit-script-long-description').value = ''; + // document.getElementById('edit-script-hidden').checked = false; +} + +async function saveScriptDetails() { + const group = document.getElementById('edit-script-group').value; + const scriptFilename = document.getElementById('edit-script-filename').value; + const updatedDetails = { + display_name: document.getElementById('edit-script-display-name').value, + short_description: document.getElementById('edit-script-short-description').value, // Recoger descripción corta + long_description: document.getElementById('edit-script-long-description').value, + hidden: document.getElementById('edit-script-hidden').checked + }; + + try { + const response = await fetch(`/api/script-details/${group}/${scriptFilename}`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(updatedDetails) + }); + const result = await response.json(); + if (!response.ok || result.status !== 'success') { + throw new Error(result.message || `Error guardando detalles: ${response.statusText}`); + } + closeScriptEditorModal(); + await loadScripts(currentGroup); // Recargar la lista de scripts + showToast('Detalles del script guardados con éxito.'); + + } catch (error) { + console.error('Error saving script details:', error); + alert(`Error al guardar detalles del script: ${error.message}`); + } +} + // Load and display available scripts async function loadScripts(group) { + if (!group) { + console.warn("loadScripts called without group"); + document.getElementById('scripts-list').innerHTML = '

Selecciona un grupo para ver los scripts.

'; + return; + } const response = await fetch(`/api/scripts/${group}`); const scripts = await response.json(); const container = document.getElementById('scripts-list'); - container.innerHTML = scripts.map(script => ` -
-
${script.name}
-
${script.description}
- -
- `).join(''); + container.innerHTML = ''; // Limpiar contenedor antes de añadir nuevos elementos + + scripts.forEach(script => { + const div = document.createElement('div'); + div.className = 'script-item p-4 border rounded bg-white shadow-sm flex justify-between items-start gap-4'; + div.innerHTML = ` +
+
${script.name}
+
${script.description}
+
+
+
+ +
${script.filename}
+
+ +
+ `; + container.appendChild(div); + + // Añadir event listeners a los botones recién creados + const executeButton = div.querySelector('.execute-button'); + executeButton.addEventListener('click', () => { + executeScript(script.filename); + }); + + const editButton = div.querySelector('.edit-button'); + editButton.addEventListener('click', () => { + editScriptDetails(group, script.filename); + }); + }); } // Execute a script @@ -112,7 +220,7 @@ async function executeScript(scriptName) { const response = await fetch('/api/execute_script', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ group: currentGroup, script: scriptName }) + body: JSON.stringify({ group: currentGroup, script: scriptName }) // scriptName aquí es el filename real }); // Check for HTTP errors during the *request* itself diff --git a/templates/index.html b/templates/index.html index 11b8052..405dfea 100644 --- a/templates/index.html +++ b/templates/index.html @@ -146,7 +146,7 @@

Scripts Disponibles

-
+
@@ -209,6 +209,47 @@ + + +