import os import json import subprocess import re from typing import Dict, Any, List import time # Add this import from datetime import datetime # Add this import class ConfigurationManager: def __init__(self): self.base_path = os.path.dirname(os.path.abspath(__file__)) self.data_path = os.path.join(self.base_path, "data") self.script_groups_path = os.path.join( self.base_path, "backend", "script_groups" ) self.working_directory = None self.log_file = os.path.join(self.data_path, "log.txt") self._init_log_file() self.last_execution_time = 0 # Add this attribute self.min_execution_interval = 1 # Minimum seconds between executions def _init_log_file(self): """Initialize log file if it doesn't exist""" if not os.path.exists(self.data_path): os.makedirs(self.data_path) if not os.path.exists(self.log_file): with open(self.log_file, "w", encoding="utf-8") as f: f.write("") def append_log(self, message: str) -> None: """Append a message to the log file with timestamp.""" try: timestamp = datetime.now().strftime("[%H:%M:%S] ") # Filtrar líneas vacías y agregar timestamp solo a líneas con contenido lines = message.split("\n") lines_with_timestamp = [] for line in lines: if line.strip(): # Solo agregar timestamp si la línea no tiene ya uno if not line.strip().startswith("["): line = f"{timestamp}{line}" lines_with_timestamp.append(f"{line}\n") if lines_with_timestamp: with open(self.log_file, "a", encoding="utf-8") as f: f.writelines(lines_with_timestamp) except Exception as e: print(f"Error writing to log file: {e}") def read_last_log_line(self) -> str: """Read the last line from the log file.""" try: with open(self.log_file, "r", encoding="utf-8") as f: # Leer las últimas líneas y encontrar la última no vacía lines = f.readlines() for line in reversed(lines): if line.strip(): return line return "" except Exception as e: print(f"Error reading last log line: {e}") return "" def read_log(self) -> str: """Read the entire log file""" try: with open(self.log_file, "r", encoding="utf-8") as f: return f.read() except Exception as e: print(f"Error reading log file: {e}") return "" def clear_log(self) -> bool: """Clear the log file""" try: with open(self.log_file, "w", encoding="utf-8") as f: f.write("") return True except Exception as e: print(f"Error clearing log file: {e}") return False def set_working_directory(self, path: str) -> Dict[str, str]: """Set and validate working directory.""" if not os.path.exists(path): return {"status": "error", "message": "Directory does not exist"} self.working_directory = path # Create default data.json if it doesn't exist data_path = os.path.join(path, "data.json") if not os.path.exists(data_path): with open(data_path, "w") as f: json.dump({}, f, indent=2) return {"status": "success", "path": path} def get_script_groups(self) -> List[Dict[str, Any]]: """Returns list of available script groups with their descriptions.""" groups = [] for d in os.listdir(self.script_groups_path): group_path = os.path.join(self.script_groups_path, d) if os.path.isdir(group_path): description = self._get_group_description(group_path) groups.append( { "id": d, "name": description.get("name", d), "description": description.get( "description", "Sin descripción" ), "version": description.get("version", "1.0"), "author": description.get("author", "Unknown"), } ) return groups def _get_group_description(self, group_path: str) -> Dict[str, Any]: """Get description for a script group.""" description_file = os.path.join(group_path, "description.json") try: if os.path.exists(description_file): with open(description_file, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"Error reading group description: {e}") return {} def get_config(self, level: str, group: str = None) -> Dict[str, Any]: """Get configuration for specified level.""" if level == "1": path = os.path.join(self.data_path, "data.json") elif level == "2": path = os.path.join(self.script_groups_path, group, "data.json") elif level == "3": if not self.working_directory: return {} # Return empty config if working directory not set path = os.path.join(self.working_directory, "data.json") try: with open(path, "r") as f: return json.load(f) except FileNotFoundError: return {} # Return empty config if file doesn't exist def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: """Get schema for specified level.""" try: # Clean level parameter level = str(level).split("-")[0] # Determine schema path based on level if level == "1": path = os.path.join(self.data_path, "esquema_general.json") elif level == "2": path = os.path.join( self.script_groups_path, group, "esquema_group.json" ) elif level == "3": if not group: return {"type": "object", "properties": {}} path = os.path.join(self.script_groups_path, group, "esquema_work.json") else: return {"type": "object", "properties": {}} # Read existing schema from whichever file exists if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: schema = json.load(f) return ( schema if isinstance(schema, dict) else {"type": "object", "properties": {}} ) # Create default schema if no file exists default_schema = {"type": "object", "properties": {}} os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(default_schema, f, indent=2) return default_schema except Exception as e: print(f"Error loading schema: {str(e)}") return {"type": "object", "properties": {}} def update_schema( self, level: str, data: Dict[str, Any], group: str = None ) -> Dict[str, str]: """Update schema for specified level and clean corresponding config.""" try: # Determinar rutas de schema y config if level == "1": schema_path = os.path.join(self.data_path, "esquema_general.json") config_path = os.path.join(self.data_path, "data.json") elif level == "2": schema_path = os.path.join( self.script_groups_path, group, "esquema_group.json" ) config_path = os.path.join(self.script_groups_path, group, "data.json") elif level == "3": if not group: return { "status": "error", "message": "Group is required for level 3", } schema_path = os.path.join( self.script_groups_path, group, "esquema_work.json" ) config_path = ( os.path.join(self.working_directory, "data.json") if self.working_directory else None ) else: return {"status": "error", "message": "Invalid level"} # Ensure directory exists os.makedirs(os.path.dirname(schema_path), exist_ok=True) # Validate schema structure if ( not isinstance(data, dict) or "type" not in data or "properties" not in data ): data = { "type": "object", "properties": data if isinstance(data, dict) else {}, } # Write schema with open(schema_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # Clean corresponding config file self._clean_config_for_schema(config_path, data) return {"status": "success"} except Exception as e: print(f"Error updating schema: {str(e)}") return {"status": "error", "message": str(e)} def _clean_config_for_schema( self, config_path: str, schema: Dict[str, Any] ) -> None: """Clean configuration file to match schema structure.""" if not config_path or not os.path.exists(config_path): return try: # Cargar configuración actual with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) # Limpiar configuración recursivamente cleaned_config = self._clean_object_against_schema(config, schema) # Guardar configuración limpia with open(config_path, "w", encoding="utf-8") as f: json.dump(cleaned_config, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Error cleaning config: {str(e)}") def _clean_object_against_schema( self, data: Dict[str, Any], schema: Dict[str, Any] ) -> Dict[str, Any]: """Recursively clean object to match schema structure.""" if not isinstance(data, dict) or not isinstance(schema, dict): return {} result = {} schema_props = schema.get("properties", {}) for key, value in data.items(): # Solo mantener campos que existen en el schema if key in schema_props: prop_schema = schema_props[key] # Si es un objeto anidado, limpiar recursivamente if prop_schema.get("type") == "object": result[key] = self._clean_object_against_schema(value, prop_schema) # Si es un enum, verificar que el valor sea válido elif "enum" in prop_schema: if value in prop_schema["enum"]: result[key] = value # Para otros tipos, mantener el valor else: result[key] = value return result def update_config( self, level: str, data: Dict[str, Any], group: str = None ) -> Dict[str, str]: """Update configuration for specified level.""" if level == "3" and not self.working_directory: return {"status": "error", "message": "Working directory not set"} if level == "1": path = os.path.join(self.data_path, "data.json") elif level == "2": path = os.path.join(self.script_groups_path, group, "data.json") elif level == "3": path = os.path.join(self.working_directory, "data.json") with open(path, "w") as f: json.dump(data, f, indent=2) def list_scripts(self, group: str) -> List[Dict[str, str]]: """List all scripts in a group with their descriptions.""" try: scripts_dir = os.path.join(self.script_groups_path, group) scripts = [] if not os.path.exists(scripts_dir): print(f"Directory not found: {scripts_dir}") return [] for file in os.listdir(scripts_dir): # Modificar la condición para incluir cualquier archivo .py if file.endswith(".py"): path = os.path.join(scripts_dir, file) description = self._extract_script_description(path) print( f"Found script: {file} with description: {description}" ) # Debug line scripts.append({"name": file, "description": description}) print(f"Total scripts found: {len(scripts)}") # Debug line return scripts except Exception as e: print(f"Error listing scripts: {str(e)}") # Debug line return [] def _extract_script_description(self, script_path: str) -> str: """Extract description from script's docstring or initial comments.""" try: with open(script_path, "r", encoding="utf-8") as f: content = f.read() # Try to find docstring docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL) if docstring_match: return docstring_match.group(1).strip() # Try to find initial comment comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE) if comment_match: return comment_match.group(1).strip() return "No description available" except Exception as e: print( f"Error extracting description from {script_path}: {str(e)}" ) # Debug line return "Error reading script description" def execute_script( self, group: str, script_name: str, broadcast_fn=None ) -> Dict[str, Any]: """Execute script with real-time logging via WebSocket broadcast function.""" # Check execution throttling current_time = time.time() time_since_last = current_time - self.last_execution_time if time_since_last < self.min_execution_interval: if broadcast_fn: broadcast_fn( f"Por favor espere {self.min_execution_interval} segundo(s) entre ejecuciones" ) return { "status": "throttled", "error": f"Por favor espere {self.min_execution_interval} segundo(s) entre ejecuciones", } self.last_execution_time = current_time script_path = os.path.join(self.script_groups_path, group, script_name) if not os.path.exists(script_path): if broadcast_fn: broadcast_fn("Error: Script no encontrado") return {"status": "error", "error": "Script not found"} # Get working directory working_dir = self.get_work_dir(group) if not working_dir: if broadcast_fn: broadcast_fn("Error: Directorio de trabajo no configurado") return {"status": "error", "error": "Working directory not set"} # Prepare environment configurations configs = { "level1": self.get_config("1"), "level2": self.get_config("2", group), "level3": self.get_config("3", group) if working_dir else {}, "working_directory": working_dir, } try: if broadcast_fn: broadcast_fn(f"Iniciando ejecución de {script_name}") # Execute script with configured environment process = subprocess.Popen( ["python", script_path], cwd=working_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, env=dict( os.environ, SCRIPT_CONFIGS=json.dumps(configs), PYTHONIOENCODING="utf-8", ), ) # Stream output in real-time while True: line = process.stdout.readline() if not line and process.poll() is not None: break if line and broadcast_fn: broadcast_fn(line.rstrip()) # Handle any errors stderr = process.stderr.read() if stderr and broadcast_fn: broadcast_fn(f"ERROR: {stderr.strip()}") # Signal completion if broadcast_fn: broadcast_fn("Ejecución completada") return { "status": "success" if process.returncode == 0 else "error", "error": stderr if stderr else None, } except Exception as e: error_msg = str(e) if broadcast_fn: broadcast_fn(f"Error inesperado: {error_msg}") return {"status": "error", "error": error_msg} def get_work_dir(self, group: str) -> str: """Get working directory path for a script group.""" work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") try: with open(work_dir_path, "r") as f: data = json.load(f) path = data.get("path", "") # Actualizar la variable de instancia si hay una ruta válida if path and os.path.exists(path): self.working_directory = path return path except (FileNotFoundError, json.JSONDecodeError): return "" def set_work_dir(self, group: str, path: str) -> Dict[str, str]: """Set working directory path for a script group.""" if not os.path.exists(path): return {"status": "error", "message": "Directory does not exist"} work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") try: # Guardar la ruta en work_dir.json with open(work_dir_path, "w") as f: json.dump({"path": path}, f, indent=2) # Actualizar la variable de instancia self.working_directory = path # Crear data.json en el directorio de trabajo si no existe data_path = os.path.join(path, "data.json") if not os.path.exists(data_path): with open(data_path, "w") as f: json.dump({}, f, indent=2) return {"status": "success", "path": path} except Exception as e: return {"status": "error", "message": str(e)}