219 lines
8.6 KiB
Python
219 lines
8.6 KiB
Python
from pathlib import Path
|
|
import importlib.util
|
|
import inspect
|
|
from typing import Dict, List, Any, Optional
|
|
import json
|
|
from datetime import datetime
|
|
|
|
class ScriptManager:
|
|
"""Manages script groups and their execution"""
|
|
|
|
def __init__(self, script_groups_dir: Path):
|
|
self.script_groups_dir = script_groups_dir
|
|
self._global_schema = self._load_global_schema()
|
|
|
|
def _load_global_schema(self) -> Dict[str, Any]:
|
|
"""Load global configuration schema for script groups"""
|
|
schema_file = self.script_groups_dir / "config.json"
|
|
try:
|
|
with open(schema_file, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading global schema: {e}")
|
|
return {"config_schema": {}}
|
|
|
|
def _load_group_data(self, group_id: str) -> Dict[str, Any]:
|
|
"""Load group data"""
|
|
data_file = self.script_groups_dir / group_id / "data.json"
|
|
try:
|
|
with open(data_file, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading group data: {e}")
|
|
return {}
|
|
|
|
def _save_group_data(self, group_id: str, data: Dict[str, Any]):
|
|
"""Save group data"""
|
|
data_file = self.script_groups_dir / group_id / "data.json"
|
|
try:
|
|
data_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(data_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=4)
|
|
except Exception as e:
|
|
print(f"Error saving group data: {e}")
|
|
raise
|
|
|
|
def _validate_group_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Validate group data against schema"""
|
|
schema = self._global_schema.get("config_schema", {})
|
|
validated = {}
|
|
|
|
for key, field_schema in schema.items():
|
|
if key in data:
|
|
validated[key] = self._validate_field(key, data[key], field_schema)
|
|
elif field_schema.get("required", False):
|
|
raise ValueError(f"Required field '{key}' is missing")
|
|
else:
|
|
validated[key] = field_schema.get("default")
|
|
|
|
return validated
|
|
|
|
def _validate_field(self, key: str, value: Any, field_schema: Dict[str, Any]) -> Any:
|
|
"""Validate a single field value"""
|
|
field_type = field_schema.get("type")
|
|
|
|
if value is None or value == "":
|
|
if field_schema.get("required", False):
|
|
raise ValueError(f"Field '{key}' is required")
|
|
return field_schema.get("default")
|
|
|
|
try:
|
|
if field_type == "string":
|
|
return str(value)
|
|
elif field_type == "number":
|
|
return float(value) if "." in str(value) else int(value)
|
|
elif field_type == "boolean":
|
|
if isinstance(value, str):
|
|
return value.lower() == "true"
|
|
return bool(value)
|
|
elif field_type == "directory":
|
|
path = Path(value)
|
|
if not path.is_absolute():
|
|
path = self.script_groups_dir / path
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return str(path)
|
|
elif field_type == "select":
|
|
if value not in field_schema.get("options", []):
|
|
raise ValueError(f"Invalid option '{value}' for field '{key}'")
|
|
return value
|
|
else:
|
|
return value
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid value for field '{key}': {str(e)}")
|
|
|
|
def get_available_groups(self) -> List[Dict[str, Any]]:
|
|
"""Get all available script groups"""
|
|
groups = []
|
|
for group_dir in self.script_groups_dir.iterdir():
|
|
if group_dir.is_dir() and not group_dir.name.startswith("_"):
|
|
group_data = self._load_group_data(group_dir.name)
|
|
if group_data:
|
|
groups.append({
|
|
"id": group_dir.name,
|
|
"name": group_data.get("name", group_dir.name),
|
|
"description": group_data.get("description", ""),
|
|
"work_dir": group_data.get("work_dir", ""),
|
|
"enabled": group_data.get("enabled", True)
|
|
})
|
|
return groups
|
|
|
|
def get_group_data(self, group_id: str) -> Dict[str, Any]:
|
|
"""Get group configuration data"""
|
|
return self._load_group_data(group_id)
|
|
|
|
def update_group_data(self, group_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Update group configuration data"""
|
|
# Validar datos
|
|
validated_data = self._validate_group_data(data)
|
|
|
|
# Actualizar timestamps
|
|
validated_data["updated_at"] = datetime.now().isoformat()
|
|
if not self._load_group_data(group_id):
|
|
validated_data["created_at"] = validated_data["updated_at"]
|
|
|
|
# Guardar datos
|
|
self._save_group_data(group_id, validated_data)
|
|
return validated_data
|
|
|
|
def get_group_scripts(self, group_id: str) -> List[Dict[str, Any]]:
|
|
"""Get scripts for a specific group"""
|
|
group_dir = self.script_groups_dir / group_id
|
|
if not group_dir.exists() or not group_dir.is_dir():
|
|
raise ValueError(f"Script group '{group_id}' not found")
|
|
|
|
scripts = []
|
|
for script_file in group_dir.glob("x[0-9].py"):
|
|
script_info = self._analyze_script(script_file)
|
|
if script_info:
|
|
scripts.append(script_info)
|
|
|
|
return sorted(scripts, key=lambda x: x["id"])
|
|
|
|
def _analyze_script(self, script_file: Path) -> Optional[Dict[str, Any]]:
|
|
"""Analyze a single script file"""
|
|
try:
|
|
# Importar módulo del script
|
|
spec = importlib.util.spec_from_file_location(script_file.stem, script_file)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
# Encontrar la clase del script
|
|
script_class = None
|
|
for name, obj in inspect.getmembers(module):
|
|
if (inspect.isclass(obj) and
|
|
obj.__module__ == module.__name__ and
|
|
hasattr(obj, "run")):
|
|
script_class = obj
|
|
break
|
|
|
|
if script_class:
|
|
# Extraer nombre y descripción del docstring
|
|
docstring = inspect.getdoc(script_class)
|
|
if docstring:
|
|
name, *description = docstring.split("\n", 1)
|
|
description = description[0] if description else ""
|
|
else:
|
|
name = script_file.stem
|
|
description = ""
|
|
|
|
return {
|
|
"id": script_file.stem,
|
|
"name": name.strip(),
|
|
"description": description.strip(),
|
|
"file": str(script_file.relative_to(self.script_groups_dir))
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error loading script {script_file}: {e}")
|
|
return None
|
|
|
|
def execute_script(self, group_id: str, script_id: str, profile: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute a specific script"""
|
|
# Obtener datos del grupo
|
|
group_data = self._load_group_data(group_id)
|
|
work_dir = group_data.get("work_dir")
|
|
|
|
if not work_dir:
|
|
raise ValueError(f"No work directory configured for group {group_id}")
|
|
|
|
script_file = self.script_groups_dir / group_id / f"{script_id}.py"
|
|
if not script_file.exists():
|
|
raise ValueError(f"Script {script_id} not found in group {group_id}")
|
|
|
|
try:
|
|
# Importar módulo del script
|
|
spec = importlib.util.spec_from_file_location(script_id, script_file)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
# Encontrar e instanciar la clase del script
|
|
script_class = None
|
|
for name, obj in inspect.getmembers(module):
|
|
if (inspect.isclass(obj) and
|
|
obj.__module__ == module.__name__ and
|
|
hasattr(obj, "run")):
|
|
script_class = obj
|
|
break
|
|
|
|
if not script_class:
|
|
raise ValueError(f"No valid script class found in {script_id}")
|
|
|
|
script = script_class()
|
|
return script.run(work_dir, profile)
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
def get_global_schema(self) -> Dict[str, Any]:
|
|
"""Get global configuration schema"""
|
|
return self._global_schema |