417 lines
18 KiB
Python
417 lines
18 KiB
Python
import os
|
|
import json
|
|
from typing import Dict, Any, List, Optional
|
|
import re # Necesario para extraer docstring
|
|
|
|
# Import the new modules
|
|
from .logger import Logger
|
|
from .directory_manager import DirectoryManager
|
|
from .group_manager import GroupManager
|
|
from .schema_handler import SchemaHandler
|
|
from .config_handler import ConfigHandler
|
|
from .script_executor import ScriptExecutor
|
|
|
|
# Keep time for execution throttling state
|
|
import time
|
|
from datetime import datetime # Needed for append_log timestamp if we keep it here
|
|
|
|
|
|
# --- ConfigurationManager Class ---
|
|
class ConfigurationManager:
|
|
def __init__(self):
|
|
# Adjust base_path to point to the project root (one level up from lib)
|
|
lib_dir = os.path.dirname(os.path.abspath(__file__))
|
|
self.base_path = os.path.dirname(lib_dir)
|
|
self.data_path = os.path.join(self.base_path, "data")
|
|
self.script_groups_path = os.path.join(
|
|
self.base_path, "backend", "script_groups"
|
|
)
|
|
self.working_directory = None
|
|
# log_file_path is now managed by the Logger instance
|
|
|
|
# State for script execution throttling
|
|
self.last_execution_time = 0
|
|
# Minimum seconds between script executions to prevent rapid clicks
|
|
self.min_execution_interval = 1
|
|
|
|
# Instantiate handlers/managers
|
|
self.logger = Logger(
|
|
os.path.join(self.data_path, "log.txt")
|
|
) # Pass log path to Logger
|
|
self.dir_manager = DirectoryManager(
|
|
self.script_groups_path, self._set_working_directory_internal
|
|
)
|
|
self.group_manager = GroupManager(self.script_groups_path)
|
|
self.schema_handler = SchemaHandler(
|
|
self.data_path,
|
|
self.script_groups_path,
|
|
self._get_working_directory_internal,
|
|
)
|
|
self.config_handler = ConfigHandler(
|
|
self.data_path,
|
|
self.script_groups_path,
|
|
self._get_working_directory_internal,
|
|
self.schema_handler,
|
|
)
|
|
self.script_executor = ScriptExecutor(
|
|
self.script_groups_path,
|
|
self.dir_manager,
|
|
self.config_handler,
|
|
self.logger, # Pass the central logger instance
|
|
self._get_execution_state_internal,
|
|
self._set_last_execution_time_internal,
|
|
)
|
|
|
|
# --- Internal Callbacks/Getters for Sub-Managers ---
|
|
def _set_working_directory_internal(self, path: Optional[str]):
|
|
"""Callback for DirectoryManager to update the main working directory."""
|
|
if path and os.path.isdir(path):
|
|
self.working_directory = path
|
|
# Create data.json in the new working directory if it doesn't exist
|
|
# This ensures L3 config can be created/read immediately after setting WD
|
|
data_json_path = os.path.join(path, "data.json")
|
|
if not os.path.exists(data_json_path):
|
|
try:
|
|
with open(data_json_path, "w", encoding="utf-8") as f:
|
|
json.dump({}, f)
|
|
print(
|
|
f"Info: Created empty data.json in new working directory: {data_json_path}"
|
|
)
|
|
except Exception as e:
|
|
print(f"Warning: Could not create data.json in {path}: {e}")
|
|
else:
|
|
self.working_directory = None
|
|
|
|
def _get_working_directory_internal(self) -> Optional[str]:
|
|
"""Provides the current working directory to sub-managers."""
|
|
return self.working_directory
|
|
|
|
def _get_execution_state_internal(self) -> Dict[str, Any]:
|
|
"""Provides execution throttling state to ScriptExecutor."""
|
|
return {
|
|
"last_time": self.last_execution_time,
|
|
"interval": self.min_execution_interval,
|
|
}
|
|
|
|
def _set_last_execution_time_internal(self, exec_time: float):
|
|
"""Callback for ScriptExecutor to update the last execution time."""
|
|
self.last_execution_time = exec_time
|
|
|
|
# --- Logging Methods (Delegated) ---
|
|
def append_log(self, message: str) -> None:
|
|
# The Logger class now handles timestamping internally.
|
|
# We just need to pass the raw message.
|
|
# The broadcast_message in app.py might still add its own timestamp for display,
|
|
# but the core logging is handled by the Logger instance.
|
|
self.logger.append_log(message)
|
|
|
|
def read_log(self) -> str:
|
|
return self.logger.read_log()
|
|
|
|
def clear_log(self) -> bool:
|
|
return self.logger.clear_log()
|
|
|
|
# --- Working Directory Methods (Delegated) ---
|
|
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
|
|
"""Sets the working directory for a group and updates the global working directory."""
|
|
# Note: This now primarily updates the group's work_dir.json and calls the internal setter
|
|
return self.dir_manager.set_work_dir_for_group(group, path)
|
|
|
|
def get_work_dir(self, group: str) -> Optional[str]:
|
|
"""Gets the stored working directory for a group and sets it globally if valid."""
|
|
path = self.dir_manager.get_work_dir_for_group(group)
|
|
# Ensure the global working directory is updated when fetched successfully
|
|
self._set_working_directory_internal(path)
|
|
return path
|
|
|
|
def get_directory_history(self, group: str) -> List[str]:
|
|
return self.dir_manager.get_directory_history(group)
|
|
|
|
# --- Script Group Methods (Delegated) ---
|
|
def get_script_groups(self) -> List[Dict[str, Any]]:
|
|
return self.group_manager.get_script_groups()
|
|
|
|
def get_group_details(self, group: str) -> Dict[str, Any]:
|
|
"""Get details (description, etc.) for a specific group."""
|
|
group_path = os.path.join(self.script_groups_path, group)
|
|
if not os.path.isdir(group_path):
|
|
return {"error": "Group not found"}
|
|
# Use the internal method of GroupManager
|
|
details = self.group_manager._get_group_description(group_path)
|
|
# Ensure default values if description file is missing/empty
|
|
details.setdefault("name", group)
|
|
details.setdefault("description", "Sin descripción")
|
|
details.setdefault("version", "1.0")
|
|
details.setdefault("author", "Unknown")
|
|
return details
|
|
|
|
def update_group_description(
|
|
self, group: str, data: Dict[str, Any]
|
|
) -> Dict[str, str]:
|
|
"""Update the description file for a specific group."""
|
|
description_path = os.path.join(
|
|
self.script_groups_path, group, "description.json"
|
|
)
|
|
try:
|
|
os.makedirs(os.path.dirname(description_path), exist_ok=True)
|
|
with open(description_path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
print(f"Error updating group description for {group}: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# --- Configuration (data.json) Methods (Delegated) ---
|
|
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
# ConfigHandler uses the _get_working_directory_internal callback
|
|
return self.config_handler.get_config(level, group)
|
|
|
|
def update_config(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> Dict[str, str]:
|
|
return self.config_handler.update_config(level, data, group)
|
|
|
|
# --- Schema Methods (Delegated) ---
|
|
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
# SchemaHandler uses the _get_working_directory_internal callback
|
|
return self.schema_handler.get_schema(level, group)
|
|
|
|
def update_schema(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> Dict[str, str]:
|
|
# SchemaHandler uses the _get_working_directory_internal callback
|
|
return self.schema_handler.update_schema(level, data, group)
|
|
|
|
# --- Script Listing and Execution Methods ---
|
|
|
|
# --- Métodos para manejar scripts_description.json ---
|
|
|
|
def _get_group_path(self, group_id: str) -> Optional[str]:
|
|
"""Obtiene la ruta completa a la carpeta de un grupo."""
|
|
path = os.path.join(self.script_groups_path, group_id)
|
|
return path if os.path.isdir(path) else None
|
|
|
|
def _get_script_descriptions_path(self, group_id: str) -> Optional[str]:
|
|
"""Obtiene la ruta al archivo scripts_description.json de un grupo."""
|
|
group_path = self._get_group_path(group_id)
|
|
if not group_path:
|
|
return None
|
|
return os.path.join(group_path, "scripts_description.json")
|
|
|
|
def _load_script_descriptions(self, group_id: str) -> Dict[str, Any]:
|
|
"""Carga las descripciones de scripts desde scripts_description.json."""
|
|
path = self._get_script_descriptions_path(group_id)
|
|
if path and os.path.exists(path):
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError:
|
|
print(f"Error: JSON inválido en {path}")
|
|
return {}
|
|
except Exception as e:
|
|
print(f"Error leyendo {path}: {e}")
|
|
return {}
|
|
return {}
|
|
|
|
def _save_script_descriptions(
|
|
self, group_id: str, descriptions: Dict[str, Any]
|
|
) -> bool:
|
|
"""Guarda las descripciones de scripts en scripts_description.json."""
|
|
path = self._get_script_descriptions_path(group_id)
|
|
if path:
|
|
try:
|
|
os.makedirs(
|
|
os.path.dirname(path), exist_ok=True
|
|
) # Asegura que el directorio del grupo existe
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(descriptions, f, indent=4, ensure_ascii=False)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error escribiendo en {path}: {e}")
|
|
return False
|
|
return False
|
|
|
|
def _extract_short_description(self, script_path: str) -> str:
|
|
"""Extrae la primera línea del docstring de un script Python."""
|
|
try:
|
|
with open(script_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
# Buscar docstring al inicio del archivo """...""" o '''...'''
|
|
match = re.match(
|
|
r'^\s*("""(.*?)"""|\'\'\'(.*?)\'\'\')',
|
|
content,
|
|
re.DOTALL | re.MULTILINE,
|
|
)
|
|
if match:
|
|
# Obtener el contenido del docstring (grupo 2 o 3)
|
|
docstring = match.group(2) or match.group(3)
|
|
# Tomar la primera línea no vacía
|
|
first_line = next(
|
|
(
|
|
line.strip()
|
|
for line in docstring.strip().splitlines()
|
|
if line.strip()
|
|
),
|
|
None,
|
|
)
|
|
return first_line if first_line else "Sin descripción corta."
|
|
except Exception as e:
|
|
print(f"Error extrayendo descripción de {script_path}: {e}")
|
|
return "Sin descripción corta."
|
|
|
|
def list_scripts(self, group: str) -> List[Dict[str, str]]:
|
|
"""Lista scripts visibles con sus detalles desde scripts_description.json."""
|
|
group_path = self._get_group_path(group)
|
|
if not group_path:
|
|
return []
|
|
|
|
descriptions = self._load_script_descriptions(group)
|
|
updated = False
|
|
scripts_details = []
|
|
|
|
try:
|
|
# Listar archivos .py en el directorio del grupo
|
|
script_files = [
|
|
f
|
|
for f in os.listdir(group_path)
|
|
if f.endswith(".py") and os.path.isfile(os.path.join(group_path, f))
|
|
]
|
|
|
|
for filename in script_files:
|
|
script_path = os.path.join(group_path, filename)
|
|
if filename not in descriptions:
|
|
print(
|
|
f"Script '{filename}' no encontrado en descripciones, auto-populando."
|
|
)
|
|
short_desc = self._extract_short_description(script_path)
|
|
descriptions[filename] = {
|
|
"display_name": filename.replace(
|
|
".py", ""
|
|
), # Nombre por defecto
|
|
"short_description": short_desc,
|
|
"long_description": "",
|
|
"hidden": False,
|
|
}
|
|
updated = True
|
|
|
|
# Añadir a la lista si no está oculto
|
|
details = descriptions[filename]
|
|
if not details.get("hidden", False):
|
|
scripts_details.append(
|
|
{
|
|
"filename": filename, # Nombre real del archivo
|
|
"display_name": details.get(
|
|
"display_name", filename.replace(".py", "")
|
|
),
|
|
"short_description": details.get(
|
|
"short_description", "Sin descripción corta."
|
|
),
|
|
"long_description": details.get(
|
|
"long_description", ""
|
|
), # Añadir descripción larga
|
|
}
|
|
)
|
|
|
|
if updated:
|
|
self._save_script_descriptions(group, descriptions)
|
|
|
|
# Ordenar por display_name para consistencia
|
|
scripts_details.sort(key=lambda x: x["display_name"])
|
|
return scripts_details
|
|
|
|
except FileNotFoundError:
|
|
return []
|
|
except Exception as e:
|
|
print(f"Error listando scripts para el grupo {group}: {e}")
|
|
return []
|
|
|
|
def get_script_details(self, group_id: str, script_filename: str) -> Dict[str, Any]:
|
|
"""Obtiene los detalles completos de un script específico."""
|
|
descriptions = self._load_script_descriptions(group_id)
|
|
# Devolver detalles o un diccionario por defecto si no existe (aunque list_scripts debería crearlo)
|
|
return descriptions.get(
|
|
script_filename,
|
|
{
|
|
"display_name": script_filename.replace(".py", ""),
|
|
"short_description": "No encontrado.",
|
|
"long_description": "",
|
|
"hidden": False,
|
|
},
|
|
)
|
|
|
|
def update_script_details(
|
|
self, group_id: str, script_filename: str, details: Dict[str, Any]
|
|
) -> Dict[str, str]:
|
|
"""Actualiza los detalles de un script específico."""
|
|
descriptions = self._load_script_descriptions(group_id)
|
|
if script_filename in descriptions:
|
|
# Asegurarse de que los campos esperados están presentes y actualizar
|
|
descriptions[script_filename]["display_name"] = details.get(
|
|
"display_name",
|
|
descriptions[script_filename].get(
|
|
"display_name", script_filename.replace(".py", "")
|
|
),
|
|
)
|
|
descriptions[script_filename]["short_description"] = details.get(
|
|
"short_description",
|
|
descriptions[script_filename].get("short_description", ""),
|
|
) # Actualizar descripción corta
|
|
descriptions[script_filename]["long_description"] = details.get(
|
|
"long_description",
|
|
descriptions[script_filename].get("long_description", ""),
|
|
)
|
|
descriptions[script_filename]["hidden"] = details.get(
|
|
"hidden", descriptions[script_filename].get("hidden", False)
|
|
)
|
|
|
|
if self._save_script_descriptions(group_id, descriptions):
|
|
return {"status": "success"}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Fallo al guardar las descripciones de los scripts.",
|
|
}
|
|
else:
|
|
# Intentar crear la entrada si el script existe pero no está en el JSON (caso raro)
|
|
group_path = self._get_group_path(group_id)
|
|
script_path = (
|
|
os.path.join(group_path, script_filename) if group_path else None
|
|
)
|
|
if script_path and os.path.exists(script_path):
|
|
print(
|
|
f"Advertencia: El script '{script_filename}' existe pero no estaba en descriptions.json. Creando entrada."
|
|
)
|
|
short_desc = self._extract_short_description(script_path)
|
|
descriptions[script_filename] = {
|
|
"display_name": details.get(
|
|
"display_name", script_filename.replace(".py", "")
|
|
),
|
|
"short_description": short_desc, # Usar la extraída
|
|
"long_description": details.get("long_description", ""),
|
|
"hidden": details.get("hidden", False),
|
|
}
|
|
if self._save_script_descriptions(group_id, descriptions):
|
|
return {"status": "success"}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": "Fallo al guardar las descripciones de los scripts después de crear la entrada.",
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Script '{script_filename}' no encontrado en las descripciones ni en el sistema de archivos.",
|
|
}
|
|
|
|
def execute_script(
|
|
self, group: str, script_name: str, broadcast_fn=None
|
|
) -> Dict[str, Any]:
|
|
# ScriptExecutor uses callbacks to get/set execution state
|
|
return self.script_executor.execute_script(group, script_name, broadcast_fn)
|
|
|
|
def stop_script(
|
|
self, group: str, script_name: str, broadcast_fn=None
|
|
) -> Dict[str, Any]:
|
|
# Delegar al ScriptExecutor para detener el script
|
|
return self.script_executor.stop_script(group, script_name, broadcast_fn)
|