LocalScriptsWeb/backend/core/script_manager.py

176 lines
6.6 KiB
Python

# backend/core/script_manager.py
from pathlib import Path
import importlib.util
import inspect
from typing import Dict, List, Any, Optional
import json
class ScriptManager:
def get_group_config_schema(self, group_id: str) -> Dict[str, Any]:
"""Get configuration schema for a script group"""
config_file = self.script_groups_dir / group_id / "config.json"
print(f"Looking for config file: {config_file}") # Debug
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
schema = json.load(f)
print(f"Loaded schema: {schema}") # Debug
return schema
except Exception as e:
print(f"Error loading group config schema: {e}") # Debug
else:
print(f"Config file not found: {config_file}") # Debug
# Retornar un schema vacío si no existe el archivo
return {
"group_name": group_id,
"description": "",
"config_schema": {}
}
def get_available_groups(self) -> List[Dict[str, Any]]:
"""Get list of available script groups"""
groups = []
for group_dir in self.script_groups_dir.iterdir():
if group_dir.is_dir() and not group_dir.name.startswith('_'):
groups.append({
"id": group_dir.name,
"name": group_dir.name.replace('_', ' ').title(),
"path": str(group_dir)
})
return groups
def get_group_scripts(self, group_id: str) -> List[Dict[str, Any]]:
"""Get scripts for a specific group"""
group_dir = self.script_groups_dir / group_id
print(f"Looking for scripts in: {group_dir}") # Debug
if not group_dir.exists() or not group_dir.is_dir():
print(f"Directory not found: {group_dir}") # Debug
raise ValueError(f"Script group '{group_id}' not found")
scripts = []
for script_file in group_dir.glob('x[0-9].py'):
print(f"Found script file: {script_file}") # Debug
script_info = self._analyze_script(script_file)
if script_info:
scripts.append(script_info)
return sorted(scripts, key=lambda x: x['id'])
def __init__(self, script_groups_dir: Path):
self.script_groups_dir = script_groups_dir
def discover_groups(self) -> List[Dict[str, Any]]:
"""Discover all script groups"""
groups = []
for group_dir in self.script_groups_dir.iterdir():
if group_dir.is_dir() and not group_dir.name.startswith('_'):
group_info = self._analyze_group(group_dir)
if group_info:
groups.append(group_info)
return groups
def _analyze_group(self, group_dir: Path) -> Optional[Dict[str, Any]]:
"""Analyze a script group directory"""
scripts = []
for script_file in group_dir.glob('x[0-9].py'):
try:
script_info = self._analyze_script(script_file)
if script_info:
scripts.append(script_info)
except Exception as e:
print(f"Error analyzing script {script_file}: {e}")
if scripts:
return {
"id": group_dir.name,
"name": group_dir.name.replace('_', ' ').title(),
"scripts": sorted(scripts, key=lambda x: x['id'])
}
return None
def _analyze_script(self, script_file: Path) -> Optional[Dict[str, Any]]:
"""Analyze a single script file"""
try:
# Import script module
spec = importlib.util.spec_from_file_location(
script_file.stem,
script_file
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find script class
script_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
obj.__module__ == module.__name__ and
hasattr(obj, 'run')):
script_class = obj
break
if script_class:
# Extraer la primera línea del docstring como nombre
docstring = inspect.getdoc(script_class)
if docstring:
name, *description = docstring.split('\n', 1)
description = description[0] if description else ''
else:
name = script_file.stem
description = ''
return {
"id": script_file.stem,
"name": name.strip(),
"description": description.strip(),
"file": str(script_file.relative_to(self.script_groups_dir))
}
except Exception as e:
print(f"Error loading script {script_file}: {e}")
return None
def execute_script(self, group_id: str, script_id: str, work_dir: str,
profile: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a specific script"""
script_file = self.script_groups_dir / group_id / f"{script_id}.py"
if not script_file.exists():
raise ValueError(f"Script {script_id} not found in group {group_id}")
try:
# Import script module
spec = importlib.util.spec_from_file_location(script_id, script_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find and instantiate script class
script_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
obj.__module__ == module.__name__ and
hasattr(obj, 'run')):
script_class = obj
break
if not script_class:
raise ValueError(f"No valid script class found in {script_id}")
script = script_class()
return script.run(work_dir, profile)
except Exception as e:
return {
"status": "error",
"error": str(e)
}