# backend/core/script_manager.py from pathlib import Path import importlib.util import inspect from typing import Dict, List, Any, Optional import json from .group_settings_manager import GroupSettingsManager # Agregar esta importación class ScriptManager: def __init__(self, script_groups_dir: Path): self.script_groups_dir = script_groups_dir self.group_settings = GroupSettingsManager(script_groups_dir) def get_group_settings(self, group_id: str) -> Dict[str, Any]: """Get settings for a script group""" return self.group_settings.get_group_settings(group_id) def update_group_settings(self, group_id: str, settings: Dict[str, Any]): """Update settings for a script group""" return self.group_settings.update_group_settings(group_id, settings) def get_group_config_schema(self, group_id: str) -> Dict[str, Any]: """Get configuration schema for a script group""" config_file = self.script_groups_dir / group_id / "config.json" print(f"Looking for config file: {config_file}") # Debug if config_file.exists(): try: with open(config_file, "r", encoding="utf-8") as f: schema = json.load(f) print(f"Loaded schema: {schema}") # Debug return schema except Exception as e: print(f"Error loading group config schema: {e}") # Debug else: print(f"Config file not found: {config_file}") # Debug # Retornar un schema vacío si no existe el archivo return {"group_name": group_id, "description": "", "config_schema": {}} def get_available_groups(self) -> List[Dict[str, Any]]: """Get list of available script groups""" groups = [] for group_dir in self.script_groups_dir.iterdir(): if group_dir.is_dir() and not group_dir.name.startswith("_"): groups.append( { "id": group_dir.name, "name": group_dir.name.replace("_", " ").title(), "path": str(group_dir), } ) return groups def get_group_scripts(self, group_id: str) -> List[Dict[str, Any]]: """Get scripts for a specific group""" group_dir = self.script_groups_dir / group_id print(f"Looking for scripts in: {group_dir}") # Debug if not group_dir.exists() or not group_dir.is_dir(): print(f"Directory not found: {group_dir}") # Debug raise ValueError(f"Script group '{group_id}' not found") scripts = [] for script_file in group_dir.glob("x[0-9].py"): print(f"Found script file: {script_file}") # Debug script_info = self._analyze_script(script_file) if script_info: scripts.append(script_info) return sorted(scripts, key=lambda x: x["id"]) def discover_groups(self) -> List[Dict[str, Any]]: """Discover all script groups""" groups = [] for group_dir in self.script_groups_dir.iterdir(): if group_dir.is_dir() and not group_dir.name.startswith("_"): group_info = self._analyze_group(group_dir) if group_info: groups.append(group_info) return groups def _analyze_group(self, group_dir: Path) -> Optional[Dict[str, Any]]: """Analyze a script group directory""" scripts = [] for script_file in group_dir.glob("x[0-9].py"): try: script_info = self._analyze_script(script_file) if script_info: scripts.append(script_info) except Exception as e: print(f"Error analyzing script {script_file}: {e}") if scripts: return { "id": group_dir.name, "name": group_dir.name.replace("_", " ").title(), "scripts": sorted(scripts, key=lambda x: x["id"]), } return None def _analyze_script(self, script_file: Path) -> Optional[Dict[str, Any]]: """Analyze a single script file""" try: # Import script module spec = importlib.util.spec_from_file_location(script_file.stem, script_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Find script class script_class = None for name, obj in inspect.getmembers(module): if ( inspect.isclass(obj) and obj.__module__ == module.__name__ and hasattr(obj, "run") ): script_class = obj break if script_class: # Extraer la primera línea del docstring como nombre docstring = inspect.getdoc(script_class) if docstring: name, *description = docstring.split("\n", 1) description = description[0] if description else "" else: name = script_file.stem description = "" return { "id": script_file.stem, "name": name.strip(), "description": description.strip(), "file": str(script_file.relative_to(self.script_groups_dir)), } except Exception as e: print(f"Error loading script {script_file}: {e}") return None def execute_script( self, group_id: str, script_id: str, profile: Dict[str, Any] ) -> Dict[str, Any]: """Execute a specific script""" # Get group settings first group_settings = self.group_settings.get_group_settings(group_id) work_dir = group_settings.get("work_dir") if not work_dir: raise ValueError(f"No work directory configured for group {group_id}") script_file = self.script_groups_dir / group_id / f"{script_id}.py" if not script_file.exists(): raise ValueError(f"Script {script_id} not found in group {group_id}") try: # Import script module spec = importlib.util.spec_from_file_location(script_id, script_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Find and instantiate script class script_class = None for name, obj in inspect.getmembers(module): if ( inspect.isclass(obj) and obj.__module__ == module.__name__ and hasattr(obj, "run") ): script_class = obj break if not script_class: raise ValueError(f"No valid script class found in {script_id}") script = script_class() return script.run(work_dir, profile) except Exception as e: return {"status": "error", "error": str(e)}