325 lines
16 KiB
Python
325 lines
16 KiB
Python
import os
|
|
import json
|
|
from typing import Dict, Any, List, Optional
|
|
import re # Necesario para extraer docstring
|
|
|
|
# Import the new modules
|
|
from .logger import Logger
|
|
from .directory_manager import DirectoryManager
|
|
from .group_manager import GroupManager
|
|
from .schema_handler import SchemaHandler
|
|
from .config_handler import ConfigHandler
|
|
from .script_executor import ScriptExecutor
|
|
|
|
# Keep time for execution throttling state
|
|
import time
|
|
from datetime import datetime # Needed for append_log timestamp if we keep it here
|
|
|
|
|
|
# --- ConfigurationManager Class ---
|
|
class ConfigurationManager:
|
|
def __init__(self):
|
|
# Adjust base_path to point to the project root (one level up from lib)
|
|
lib_dir = os.path.dirname(os.path.abspath(__file__))
|
|
self.base_path = os.path.dirname(lib_dir)
|
|
self.data_path = os.path.join(self.base_path, "data")
|
|
self.script_groups_path = os.path.join(
|
|
self.base_path, "backend", "script_groups"
|
|
)
|
|
self.working_directory = None
|
|
# log_file_path is now managed by the Logger instance
|
|
|
|
# State for script execution throttling
|
|
self.last_execution_time = 0
|
|
# Minimum seconds between script executions to prevent rapid clicks
|
|
self.min_execution_interval = 1
|
|
|
|
# Instantiate handlers/managers
|
|
self.logger = Logger(os.path.join(self.data_path, "log.txt")) # Pass log path to Logger
|
|
self.dir_manager = DirectoryManager(self.script_groups_path, self._set_working_directory_internal)
|
|
self.group_manager = GroupManager(self.script_groups_path)
|
|
self.schema_handler = SchemaHandler(self.data_path, self.script_groups_path, self._get_working_directory_internal)
|
|
self.config_handler = ConfigHandler(self.data_path, self.script_groups_path, self._get_working_directory_internal, self.schema_handler)
|
|
self.script_executor = ScriptExecutor(
|
|
self.script_groups_path,
|
|
self.dir_manager,
|
|
self.config_handler,
|
|
self.logger, # Pass the central logger instance
|
|
self._get_execution_state_internal,
|
|
self._set_last_execution_time_internal
|
|
)
|
|
|
|
# --- Internal Callbacks/Getters for Sub-Managers ---
|
|
def _set_working_directory_internal(self, path: Optional[str]):
|
|
"""Callback for DirectoryManager to update the main working directory."""
|
|
if path and os.path.isdir(path):
|
|
self.working_directory = path
|
|
# Create data.json in the new working directory if it doesn't exist
|
|
# This ensures L3 config can be created/read immediately after setting WD
|
|
data_json_path = os.path.join(path, "data.json")
|
|
if not os.path.exists(data_json_path):
|
|
try:
|
|
with open(data_json_path, 'w', encoding='utf-8') as f:
|
|
json.dump({}, f)
|
|
print(f"Info: Created empty data.json in new working directory: {data_json_path}")
|
|
except Exception as e:
|
|
print(f"Warning: Could not create data.json in {path}: {e}")
|
|
else:
|
|
self.working_directory = None
|
|
|
|
def _get_working_directory_internal(self) -> Optional[str]:
|
|
"""Provides the current working directory to sub-managers."""
|
|
return self.working_directory
|
|
|
|
def _get_execution_state_internal(self) -> Dict[str, Any]:
|
|
"""Provides execution throttling state to ScriptExecutor."""
|
|
return {"last_time": self.last_execution_time, "interval": self.min_execution_interval}
|
|
|
|
def _set_last_execution_time_internal(self, exec_time: float):
|
|
"""Callback for ScriptExecutor to update the last execution time."""
|
|
self.last_execution_time = exec_time
|
|
|
|
# --- Logging Methods (Delegated) ---
|
|
def append_log(self, message: str) -> None:
|
|
# The Logger class now handles timestamping internally.
|
|
# We just need to pass the raw message.
|
|
# The broadcast_message in app.py might still add its own timestamp for display,
|
|
# but the core logging is handled by the Logger instance.
|
|
self.logger.append_log(message)
|
|
|
|
def read_log(self) -> str:
|
|
return self.logger.read_log()
|
|
|
|
def clear_log(self) -> bool:
|
|
return self.logger.clear_log()
|
|
|
|
# --- Working Directory Methods (Delegated) ---
|
|
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
|
|
"""Sets the working directory for a group and updates the global working directory."""
|
|
# Note: This now primarily updates the group's work_dir.json and calls the internal setter
|
|
return self.dir_manager.set_work_dir_for_group(group, path)
|
|
|
|
def get_work_dir(self, group: str) -> Optional[str]:
|
|
"""Gets the stored working directory for a group and sets it globally if valid."""
|
|
path = self.dir_manager.get_work_dir_for_group(group)
|
|
# Ensure the global working directory is updated when fetched successfully
|
|
self._set_working_directory_internal(path)
|
|
return path
|
|
|
|
def get_directory_history(self, group: str) -> List[str]:
|
|
return self.dir_manager.get_directory_history(group)
|
|
|
|
# --- Script Group Methods (Delegated) ---
|
|
def get_script_groups(self) -> List[Dict[str, Any]]:
|
|
return self.group_manager.get_script_groups()
|
|
|
|
def get_group_details(self, group: str) -> Dict[str, Any]:
|
|
"""Get details (description, etc.) for a specific group."""
|
|
group_path = os.path.join(self.script_groups_path, group)
|
|
if not os.path.isdir(group_path):
|
|
return {"error": "Group not found"}
|
|
# Use the internal method of GroupManager
|
|
details = self.group_manager._get_group_description(group_path)
|
|
# Ensure default values if description file is missing/empty
|
|
details.setdefault("name", group)
|
|
details.setdefault("description", "Sin descripción")
|
|
details.setdefault("version", "1.0")
|
|
details.setdefault("author", "Unknown")
|
|
return details
|
|
|
|
def update_group_description(self, group: str, data: Dict[str, Any]) -> Dict[str, str]:
|
|
"""Update the description file for a specific group."""
|
|
description_path = os.path.join(self.script_groups_path, group, "description.json")
|
|
try:
|
|
os.makedirs(os.path.dirname(description_path), exist_ok=True)
|
|
with open(description_path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
print(f"Error updating group description for {group}: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# --- Configuration (data.json) Methods (Delegated) ---
|
|
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
# ConfigHandler uses the _get_working_directory_internal callback
|
|
return self.config_handler.get_config(level, group)
|
|
|
|
def update_config(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> Dict[str, str]:
|
|
return self.config_handler.update_config(level, data, group)
|
|
|
|
# --- Schema Methods (Delegated) ---
|
|
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
# SchemaHandler uses the _get_working_directory_internal callback
|
|
return self.schema_handler.get_schema(level, group)
|
|
|
|
def update_schema(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> Dict[str, str]:
|
|
# SchemaHandler uses the _get_working_directory_internal callback
|
|
return self.schema_handler.update_schema(level, data, group)
|
|
|
|
# --- Script Listing and Execution Methods ---
|
|
|
|
# --- Métodos para manejar scripts_description.json ---
|
|
|
|
def _get_group_path(self, group_id: str) -> Optional[str]:
|
|
"""Obtiene la ruta completa a la carpeta de un grupo."""
|
|
path = os.path.join(self.script_groups_path, group_id)
|
|
return path if os.path.isdir(path) else None
|
|
|
|
def _get_script_descriptions_path(self, group_id: str) -> Optional[str]:
|
|
"""Obtiene la ruta al archivo scripts_description.json de un grupo."""
|
|
group_path = self._get_group_path(group_id)
|
|
if not group_path:
|
|
return None
|
|
return os.path.join(group_path, 'scripts_description.json')
|
|
|
|
def _load_script_descriptions(self, group_id: str) -> Dict[str, Any]:
|
|
"""Carga las descripciones de scripts desde scripts_description.json."""
|
|
path = self._get_script_descriptions_path(group_id)
|
|
if path and os.path.exists(path):
|
|
try:
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError:
|
|
print(f"Error: JSON inválido en {path}")
|
|
return {}
|
|
except Exception as e:
|
|
print(f"Error leyendo {path}: {e}")
|
|
return {}
|
|
return {}
|
|
|
|
def _save_script_descriptions(self, group_id: str, descriptions: Dict[str, Any]) -> bool:
|
|
"""Guarda las descripciones de scripts en scripts_description.json."""
|
|
path = self._get_script_descriptions_path(group_id)
|
|
if path:
|
|
try:
|
|
os.makedirs(os.path.dirname(path), exist_ok=True) # Asegura que el directorio del grupo existe
|
|
with open(path, 'w', encoding='utf-8') as f:
|
|
json.dump(descriptions, f, indent=4, ensure_ascii=False)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error escribiendo en {path}: {e}")
|
|
return False
|
|
return False
|
|
|
|
def _extract_short_description(self, script_path: str) -> str:
|
|
"""Extrae la primera línea del docstring de un script Python."""
|
|
try:
|
|
with open(script_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
# Buscar docstring al inicio del archivo """...""" o '''...'''
|
|
match = re.match(r'^\s*("""(.*?)"""|\'\'\'(.*?)\'\'\')', content, re.DOTALL | re.MULTILINE)
|
|
if match:
|
|
# Obtener el contenido del docstring (grupo 2 o 3)
|
|
docstring = match.group(2) or match.group(3)
|
|
# Tomar la primera línea no vacía
|
|
first_line = next((line.strip() for line in docstring.strip().splitlines() if line.strip()), None)
|
|
return first_line if first_line else "Sin descripción corta."
|
|
except Exception as e:
|
|
print(f"Error extrayendo descripción de {script_path}: {e}")
|
|
return "Sin descripción corta."
|
|
|
|
def list_scripts(self, group: str) -> List[Dict[str, str]]:
|
|
"""Lista scripts visibles con sus detalles desde scripts_description.json."""
|
|
group_path = self._get_group_path(group)
|
|
if not group_path:
|
|
return []
|
|
|
|
descriptions = self._load_script_descriptions(group)
|
|
updated = False
|
|
scripts_details = []
|
|
|
|
try:
|
|
# Listar archivos .py en el directorio del grupo
|
|
script_files = [f for f in os.listdir(group_path) if f.endswith('.py') and os.path.isfile(os.path.join(group_path, f))]
|
|
|
|
for filename in script_files:
|
|
script_path = os.path.join(group_path, filename)
|
|
if filename not in descriptions:
|
|
print(f"Script '{filename}' no encontrado en descripciones, auto-populando.")
|
|
short_desc = self._extract_short_description(script_path)
|
|
descriptions[filename] = {
|
|
"display_name": filename.replace('.py', ''), # Nombre por defecto
|
|
"short_description": short_desc,
|
|
"long_description": "",
|
|
"hidden": False
|
|
}
|
|
updated = True
|
|
|
|
# Añadir a la lista si no está oculto
|
|
details = descriptions[filename]
|
|
if not details.get('hidden', False):
|
|
scripts_details.append({
|
|
"filename": filename, # Nombre real del archivo
|
|
"display_name": details.get("display_name", filename.replace('.py', '')),
|
|
"short_description": details.get("short_description", "Sin descripción corta."),
|
|
"long_description": details.get("long_description", "") # Añadir descripción larga
|
|
})
|
|
|
|
if updated:
|
|
self._save_script_descriptions(group, descriptions)
|
|
|
|
# Ordenar por display_name para consistencia
|
|
scripts_details.sort(key=lambda x: x['display_name'])
|
|
return scripts_details
|
|
|
|
except FileNotFoundError:
|
|
return []
|
|
except Exception as e:
|
|
print(f"Error listando scripts para el grupo {group}: {e}")
|
|
return []
|
|
|
|
def get_script_details(self, group_id: str, script_filename: str) -> Dict[str, Any]:
|
|
"""Obtiene los detalles completos de un script específico."""
|
|
descriptions = self._load_script_descriptions(group_id)
|
|
# Devolver detalles o un diccionario por defecto si no existe (aunque list_scripts debería crearlo)
|
|
return descriptions.get(script_filename, {
|
|
"display_name": script_filename.replace('.py', ''),
|
|
"short_description": "No encontrado.",
|
|
"long_description": "",
|
|
"hidden": False
|
|
})
|
|
|
|
def update_script_details(self, group_id: str, script_filename: str, details: Dict[str, Any]) -> Dict[str, str]:
|
|
"""Actualiza los detalles de un script específico."""
|
|
descriptions = self._load_script_descriptions(group_id)
|
|
if script_filename in descriptions:
|
|
# Asegurarse de que los campos esperados están presentes y actualizar
|
|
descriptions[script_filename]["display_name"] = details.get("display_name", descriptions[script_filename].get("display_name", script_filename.replace('.py', '')))
|
|
descriptions[script_filename]["short_description"] = details.get("short_description", descriptions[script_filename].get("short_description", "")) # Actualizar descripción corta
|
|
descriptions[script_filename]["long_description"] = details.get("long_description", descriptions[script_filename].get("long_description", ""))
|
|
descriptions[script_filename]["hidden"] = details.get("hidden", descriptions[script_filename].get("hidden", False))
|
|
|
|
if self._save_script_descriptions(group_id, descriptions):
|
|
return {"status": "success"}
|
|
else:
|
|
return {"status": "error", "message": "Fallo al guardar las descripciones de los scripts."}
|
|
else:
|
|
# Intentar crear la entrada si el script existe pero no está en el JSON (caso raro)
|
|
group_path = self._get_group_path(group_id)
|
|
script_path = os.path.join(group_path, script_filename) if group_path else None
|
|
if script_path and os.path.exists(script_path):
|
|
print(f"Advertencia: El script '{script_filename}' existe pero no estaba en descriptions.json. Creando entrada.")
|
|
short_desc = self._extract_short_description(script_path)
|
|
descriptions[script_filename] = {
|
|
"display_name": details.get("display_name", script_filename.replace('.py', '')),
|
|
"short_description": short_desc, # Usar la extraída
|
|
"long_description": details.get("long_description", ""),
|
|
"hidden": details.get("hidden", False)
|
|
}
|
|
if self._save_script_descriptions(group_id, descriptions):
|
|
return {"status": "success"}
|
|
else:
|
|
return {"status": "error", "message": "Fallo al guardar las descripciones de los scripts después de crear la entrada."}
|
|
else:
|
|
return {"status": "error", "message": f"Script '{script_filename}' no encontrado en las descripciones ni en el sistema de archivos."}
|
|
|
|
def execute_script(
|
|
self, group: str, script_name: str, broadcast_fn=None
|
|
) -> Dict[str, Any]:
|
|
# ScriptExecutor uses callbacks to get/set execution state
|
|
return self.script_executor.execute_script(group, script_name, broadcast_fn)
|