import os import json import subprocess import re from typing import Dict, Any, List import time # Add this import from datetime import datetime # Add this import class ConfigurationManager: def __init__(self): self.base_path = os.path.dirname(os.path.abspath(__file__)) self.data_path = os.path.join(self.base_path, "data") self.script_groups_path = os.path.join( self.base_path, "backend", "script_groups" ) self.working_directory = None self.log_file = os.path.join(self.data_path, "log.txt") self._init_log_file() self.last_execution_time = 0 # Add this attribute self.min_execution_interval = 1 # Minimum seconds between executions def _init_log_file(self): """Initialize log file if it doesn't exist""" if not os.path.exists(self.data_path): os.makedirs(self.data_path) if not os.path.exists(self.log_file): with open(self.log_file, "w", encoding="utf-8") as f: f.write("") def append_log(self, message: str) -> None: """Append a message to the CENTRAL log file with timestamp.""" # This function now primarily logs messages from the app itself, # script output is handled separately in execute_script. try: timestamp = datetime.now().strftime("[%H:%M:%S] ") lines = message.split("\n") lines_with_timestamp = [] for line in lines: if line.strip(): if not line.strip().startswith("["): line = f"{timestamp}{line}" lines_with_timestamp.append(f"{line}\n") if lines_with_timestamp: with open(self.log_file, "a", encoding="utf-8") as f: f.writelines(lines_with_timestamp) except Exception as e: print(f"Error writing to central log file: {e}") def read_last_log_line(self) -> str: """Read the last line from the log file.""" try: with open(self.log_file, "r", encoding="utf-8") as f: # Leer las últimas líneas y encontrar la última no vacía lines = f.readlines() for line in reversed(lines): if line.strip(): return line return "" except Exception as e: print(f"Error reading last log line: {e}") return "" def read_log(self) -> str: """Read the entire log file""" try: with open(self.log_file, "r", encoding="utf-8") as f: return f.read() except Exception as e: print(f"Error reading log file: {e}") return "" def clear_log(self) -> bool: """Clear the log file""" try: with open(self.log_file, "w", encoding="utf-8") as f: f.write("") return True except Exception as e: print(f"Error clearing log file: {e}") return False def set_working_directory(self, path: str) -> Dict[str, str]: """Set and validate working directory.""" if not os.path.exists(path): return {"status": "error", "message": "Directory does not exist"} self.working_directory = path # Create default data.json if it doesn't exist data_path = os.path.join(path, "data.json") if not os.path.exists(data_path): with open(data_path, "w") as f: json.dump({}, f, indent=2) return {"status": "success", "path": path} def get_script_groups(self) -> List[Dict[str, Any]]: """Returns list of available script groups with their descriptions.""" groups = [] for d in os.listdir(self.script_groups_path): group_path = os.path.join(self.script_groups_path, d) if os.path.isdir(group_path): description = self._get_group_description(group_path) groups.append( { "id": d, "name": description.get("name", d), "description": description.get( "description", "Sin descripción" ), "version": description.get("version", "1.0"), "author": description.get("author", "Unknown"), } ) return groups def _get_group_description(self, group_path: str) -> Dict[str, Any]: """Get description for a script group.""" description_file = os.path.join(group_path, "description.json") try: if os.path.exists(description_file): with open(description_file, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"Error reading group description: {e}") return {} def get_config(self, level: str, group: str = None) -> Dict[str, Any]: """Get configuration for specified level.""" if level == "1": path = os.path.join(self.data_path, "data.json") elif level == "2": path = os.path.join(self.script_groups_path, group, "data.json") elif level == "3": if not self.working_directory: return {} # Return empty config if working directory not set path = os.path.join(self.working_directory, "data.json") try: with open(path, "r") as f: return json.load(f) except FileNotFoundError: return {} # Return empty config if file doesn't exist def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: """Get schema for specified level.""" try: # Clean level parameter level = str(level).split("-")[0] # Determine schema path based on level if level == "1": path = os.path.join(self.data_path, "esquema_general.json") elif level == "2": path = os.path.join( self.script_groups_path, group, "esquema_group.json" ) elif level == "3": if not group: return {"type": "object", "properties": {}} path = os.path.join(self.script_groups_path, group, "esquema_work.json") else: return {"type": "object", "properties": {}} # Read existing schema from whichever file exists if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: schema = json.load(f) return ( schema if isinstance(schema, dict) else {"type": "object", "properties": {}} ) # Create default schema if no file exists default_schema = {"type": "object", "properties": {}} os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(default_schema, f, indent=2) return default_schema except Exception as e: print(f"Error loading schema: {str(e)}") return {"type": "object", "properties": {}} def update_schema( self, level: str, data: Dict[str, Any], group: str = None ) -> Dict[str, str]: """Update schema for specified level and clean corresponding config.""" try: # Determinar rutas de schema y config if level == "1": schema_path = os.path.join(self.data_path, "esquema_general.json") config_path = os.path.join(self.data_path, "data.json") elif level == "2": schema_path = os.path.join( self.script_groups_path, group, "esquema_group.json" ) config_path = os.path.join(self.script_groups_path, group, "data.json") elif level == "3": if not group: return { "status": "error", "message": "Group is required for level 3", } schema_path = os.path.join( self.script_groups_path, group, "esquema_work.json" ) config_path = ( os.path.join(self.working_directory, "data.json") if self.working_directory else None ) else: return {"status": "error", "message": "Invalid level"} # Ensure directory exists os.makedirs(os.path.dirname(schema_path), exist_ok=True) # Validate schema structure if ( not isinstance(data, dict) or "type" not in data or "properties" not in data ): data = { "type": "object", "properties": data if isinstance(data, dict) else {}, } # Write schema with open(schema_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # Clean corresponding config file self._clean_config_for_schema(config_path, data) return {"status": "success"} except Exception as e: print(f"Error updating schema: {str(e)}") return {"status": "error", "message": str(e)} def _clean_config_for_schema( self, config_path: str, schema: Dict[str, Any] ) -> None: """Clean configuration file to match schema structure.""" if not config_path or not os.path.exists(config_path): return try: # Cargar configuración actual with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) # Limpiar configuración recursivamente cleaned_config = self._clean_object_against_schema(config, schema) # Guardar configuración limpia with open(config_path, "w", encoding="utf-8") as f: json.dump(cleaned_config, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Error cleaning config: {str(e)}") def _clean_object_against_schema( self, data: Dict[str, Any], schema: Dict[str, Any] ) -> Dict[str, Any]: """Recursively clean object to match schema structure.""" if not isinstance(data, dict) or not isinstance(schema, dict): return {} result = {} schema_props = schema.get("properties", {}) for key, value in data.items(): # Solo mantener campos que existen en el schema if key in schema_props: prop_schema = schema_props[key] # Si es un objeto anidado, limpiar recursivamente if prop_schema.get("type") == "object": result[key] = self._clean_object_against_schema(value, prop_schema) # Si es un enum, verificar que el valor sea válido elif "enum" in prop_schema: if value in prop_schema["enum"]: result[key] = value # Para otros tipos, mantener el valor else: result[key] = value return result def update_config( self, level: str, data: Dict[str, Any], group: str = None ) -> Dict[str, str]: """Update configuration for specified level.""" if level == "3" and not self.working_directory: return {"status": "error", "message": "Working directory not set"} if level == "1": path = os.path.join(self.data_path, "data.json") elif level == "2": path = os.path.join(self.script_groups_path, group, "data.json") elif level == "3": path = os.path.join(self.working_directory, "data.json") with open(path, "w") as f: json.dump(data, f, indent=2) def list_scripts(self, group: str) -> List[Dict[str, str]]: """List all scripts in a group with their descriptions.""" try: scripts_dir = os.path.join(self.script_groups_path, group) scripts = [] if not os.path.exists(scripts_dir): print(f"Directory not found: {scripts_dir}") return [] for file in os.listdir(scripts_dir): # Modificar la condición para incluir cualquier archivo .py if file.endswith(".py"): path = os.path.join(scripts_dir, file) description = self._extract_script_description(path) print( f"Found script: {file} with description: {description}" ) # Debug line scripts.append({"name": file, "description": description}) print(f"Total scripts found: {len(scripts)}") # Debug line return scripts except Exception as e: print(f"Error listing scripts: {str(e)}") # Debug line return [] def _extract_script_description(self, script_path: str) -> str: """Extract description from script's docstring or initial comments.""" try: with open(script_path, "r", encoding="utf-8") as f: content = f.read() # Try to find docstring docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL) if docstring_match: return docstring_match.group(1).strip() # Try to find initial comment comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE) if comment_match: return comment_match.group(1).strip() return "No description available" except Exception as e: print( f"Error extracting description from {script_path}: {str(e)}" ) # Debug line return "Error reading script description" def execute_script( self, group: str, script_name: str, broadcast_fn=None ) -> Dict[str, Any]: """ Execute script, broadcast output in real-time, and save final log to a script-specific file in the script's directory. """ current_time = time.time() time_since_last = current_time - self.last_execution_time if time_since_last < self.min_execution_interval: msg = f"Por favor espere {self.min_execution_interval - time_since_last:.1f} segundo(s) más entre ejecuciones" if broadcast_fn: broadcast_fn(msg) return {"status": "throttled", "error": msg} self.last_execution_time = current_time script_path = os.path.join(self.script_groups_path, group, script_name) script_dir = os.path.dirname(script_path) script_base_name = os.path.splitext(script_name)[0] # Define script-specific log file path script_log_path = os.path.join(script_dir, f"log_{script_base_name}.txt") if not os.path.exists(script_path): msg = f"Error: Script no encontrado en {script_path}" if broadcast_fn: broadcast_fn(msg) return {"status": "error", "error": "Script not found"} working_dir = self.get_work_dir(group) if not working_dir: msg = f"Error: Directorio de trabajo no configurado para el grupo '{group}'" if broadcast_fn: broadcast_fn(msg) return {"status": "error", "error": "Working directory not set"} if not os.path.isdir(working_dir): msg = f"Error: El directorio de trabajo '{working_dir}' no es válido o no existe." if broadcast_fn: broadcast_fn(msg) return {"status": "error", "error": "Invalid working directory"} configs = { "level1": self.get_config("1"), "level2": self.get_config("2", group), "level3": self.get_config("3", group), # get_config now handles working dir lookup "working_directory": working_dir, } config_file_path = os.path.join(script_dir, "script_config.json") try: with open(config_file_path, "w", encoding="utf-8") as f: json.dump(configs, f, indent=2, ensure_ascii=False) # Don't broadcast config saving unless debugging # if broadcast_fn: broadcast_fn(f"Configuraciones guardadas en {config_file_path}") except Exception as e: msg = f"Error guardando configuraciones temporales: {str(e)}" if broadcast_fn: broadcast_fn(msg) # Optionally return error here if config saving is critical stdout_capture = [] stderr_capture = "" process = None start_time = datetime.now() try: if broadcast_fn: broadcast_fn(f"[{start_time.strftime('%H:%M:%S')}] Iniciando ejecución de {script_name} en {working_dir}...") process = subprocess.Popen( ["python", "-u", script_path], # Added -u for unbuffered output cwd=working_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', errors='replace', bufsize=1, env=dict(os.environ, PYTHONIOENCODING="utf-8"), ) # Real-time stdout reading and broadcasting while True: line = process.stdout.readline() if not line and process.poll() is not None: break if line: cleaned_line = line.rstrip() stdout_capture.append(cleaned_line) # Store line for final log if broadcast_fn: broadcast_fn(cleaned_line) # Broadcast in real-time # Wait for process to finish and get return code return_code = process.wait() end_time = datetime.now() duration = end_time - start_time # Capture any remaining stderr stderr_capture = process.stderr.read() status = "success" if return_code == 0 else "error" completion_msg = f"[{end_time.strftime('%H:%M:%S')}] Ejecución de {script_name} finalizada ({status}). Duración: {duration}." if stderr_capture: # Broadcast stderr only if there was an error potentially if status == "error" and broadcast_fn: broadcast_fn(f"--- ERRORES ---") broadcast_fn(stderr_capture.strip()) broadcast_fn(f"--- FIN ERRORES ---") # Always include stderr in the final log if present completion_msg += f" Se detectaron errores (ver log)." if broadcast_fn: broadcast_fn(completion_msg) # --- Write to script-specific log file --- try: with open(script_log_path, "w", encoding="utf-8") as log_f: log_f.write(f"--- Log de Ejecución: {script_name} ---\n") log_f.write(f"Grupo: {group}\n") log_f.write(f"Directorio de Trabajo: {working_dir}\n") log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") log_f.write(f"Duración: {duration}\n") log_f.write(f"Estado: {status.upper()} (Código de Salida: {return_code})\n") log_f.write("\n--- SALIDA ESTÁNDAR (STDOUT) ---\n") log_f.write("\n".join(stdout_capture)) log_f.write("\n\n--- ERRORES (STDERR) ---\n") log_f.write(stderr_capture if stderr_capture else "Ninguno") log_f.write("\n--- FIN DEL LOG ---\n") if broadcast_fn: broadcast_fn(f"Log completo guardado en: {script_log_path}") except Exception as log_e: err_msg = f"Error al guardar el log específico del script en {script_log_path}: {log_e}" print(err_msg) if broadcast_fn: broadcast_fn(err_msg) # ------------------------------------------ return { "status": status, "return_code": return_code, "error": stderr_capture if stderr_capture else None, "log_file": script_log_path # Return path to the specific log } except Exception as e: end_time = datetime.now() duration = end_time - start_time error_msg = f"Error inesperado durante la ejecución de {script_name}: {str(e)}" traceback_info = traceback.format_exc() # Get traceback print(error_msg) # Print to console as well print(traceback_info) if broadcast_fn: broadcast_fn(f"[{end_time.strftime('%H:%M:%S')}] ERROR FATAL: {error_msg}") # Attempt to write error to script-specific log try: with open(script_log_path, "w", encoding="utf-8") as log_f: log_f.write(f"--- Log de Ejecución: {script_name} ---\n") log_f.write(f"Grupo: {group}\n") log_f.write(f"Directorio de Trabajo: {working_dir}\n") log_f.write(f"Inicio: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") log_f.write(f"Fin: {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Interrumpido por error)\n") log_f.write(f"Duración: {duration}\n") log_f.write(f"Estado: FATAL ERROR\n") log_f.write("\n--- ERROR ---\n") log_f.write(error_msg + "\n") log_f.write("\n--- TRACEBACK ---\n") log_f.write(traceback_info) # Include traceback in log log_f.write("\n--- FIN DEL LOG ---\n") except Exception as log_e: print(f"Error adicional al intentar guardar el log de error: {log_e}") return {"status": "error", "error": error_msg, "traceback": traceback_info} finally: # Ensure stderr pipe is closed if process exists if process and process.stderr: process.stderr.close() # Ensure stdout pipe is closed if process exists if process and process.stdout: process.stdout.close() def get_work_dir(self, group: str) -> str: """Get working directory path for a script group.""" work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") try: with open(work_dir_path, "r") as f: data = json.load(f) path = data.get("path", "") # Normalizar separadores de ruta if path: path = os.path.normpath(path) # Actualizar la variable de instancia si hay una ruta válida if path and os.path.exists(path): self.working_directory = path return path except (FileNotFoundError, json.JSONDecodeError): return "" def set_work_dir(self, group: str, path: str) -> Dict[str, str]: """Set working directory path for a script group and update history.""" # Normalizar el path recibido path = os.path.normpath(path) if not os.path.exists(path): return {"status": "error", "message": "Directory does not exist"} work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") try: # Cargar datos existentes o crear nuevos try: with open(work_dir_path, "r") as f: data = json.load(f) # Normalizar paths existentes en el historial if "history" in data: data["history"] = [os.path.normpath(p) for p in data["history"]] except (FileNotFoundError, json.JSONDecodeError): data = {"path": "", "history": []} # Actualizar path actual data["path"] = path # Actualizar historial if "history" not in data: data["history"] = [] # Eliminar la ruta del historial si ya existe (usando path normalizado) data["history"] = [ p for p in data["history"] if os.path.normpath(p) != path ] # Agregar la ruta al principio del historial data["history"].insert(0, path) # Mantener solo los últimos 10 directorios data["history"] = data["history"][:10] # Guardar datos actualizados with open(work_dir_path, "w") as f: json.dump(data, f, indent=2) # Actualizar la variable de instancia self.working_directory = path # Crear data.json en el directorio de trabajo si no existe data_path = os.path.join(path, "data.json") if not os.path.exists(data_path): with open(data_path, "w") as f: json.dump({}, f, indent=2) return {"status": "success", "path": path} except Exception as e: return {"status": "error", "message": str(e)} def get_directory_history(self, group: str) -> List[str]: """Get the directory history for a script group.""" work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") try: with open(work_dir_path, "r") as f: data = json.load(f) # Normalizar todos los paths en el historial history = [os.path.normpath(p) for p in data.get("history", [])] # Filtrar solo directorios que existen return [p for p in history if os.path.exists(p)] except (FileNotFoundError, json.JSONDecodeError): return []