533 lines
21 KiB
Python
533 lines
21 KiB
Python
import os
|
|
import json
|
|
import subprocess
|
|
import re
|
|
from typing import Dict, Any, List
|
|
import time # Add this import
|
|
from datetime import datetime # Add this import
|
|
|
|
|
|
class ConfigurationManager:
|
|
def __init__(self):
|
|
self.base_path = os.path.dirname(os.path.abspath(__file__))
|
|
self.data_path = os.path.join(self.base_path, "data")
|
|
self.script_groups_path = os.path.join(
|
|
self.base_path, "backend", "script_groups"
|
|
)
|
|
self.working_directory = None
|
|
self.log_file = os.path.join(self.data_path, "log.txt")
|
|
self._init_log_file()
|
|
self.last_execution_time = 0 # Add this attribute
|
|
self.min_execution_interval = 1 # Minimum seconds between executions
|
|
|
|
def _init_log_file(self):
|
|
"""Initialize log file if it doesn't exist"""
|
|
if not os.path.exists(self.data_path):
|
|
os.makedirs(self.data_path)
|
|
if not os.path.exists(self.log_file):
|
|
with open(self.log_file, "w", encoding="utf-8") as f:
|
|
f.write("")
|
|
|
|
def append_log(self, message: str) -> None:
|
|
"""Append a message to the log file with timestamp."""
|
|
try:
|
|
timestamp = datetime.now().strftime("[%H:%M:%S] ")
|
|
# Filtrar líneas vacías y agregar timestamp solo a líneas con contenido
|
|
lines = message.split("\n")
|
|
lines_with_timestamp = []
|
|
for line in lines:
|
|
if line.strip():
|
|
# Solo agregar timestamp si la línea no tiene ya uno
|
|
if not line.strip().startswith("["):
|
|
line = f"{timestamp}{line}"
|
|
lines_with_timestamp.append(f"{line}\n")
|
|
|
|
if lines_with_timestamp:
|
|
with open(self.log_file, "a", encoding="utf-8") as f:
|
|
f.writelines(lines_with_timestamp)
|
|
except Exception as e:
|
|
print(f"Error writing to log file: {e}")
|
|
|
|
def read_last_log_line(self) -> str:
|
|
"""Read the last line from the log file."""
|
|
try:
|
|
with open(self.log_file, "r", encoding="utf-8") as f:
|
|
# Leer las últimas líneas y encontrar la última no vacía
|
|
lines = f.readlines()
|
|
for line in reversed(lines):
|
|
if line.strip():
|
|
return line
|
|
return ""
|
|
except Exception as e:
|
|
print(f"Error reading last log line: {e}")
|
|
return ""
|
|
|
|
def read_log(self) -> str:
|
|
"""Read the entire log file"""
|
|
try:
|
|
with open(self.log_file, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
except Exception as e:
|
|
print(f"Error reading log file: {e}")
|
|
return ""
|
|
|
|
def clear_log(self) -> bool:
|
|
"""Clear the log file"""
|
|
try:
|
|
with open(self.log_file, "w", encoding="utf-8") as f:
|
|
f.write("")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error clearing log file: {e}")
|
|
return False
|
|
|
|
def set_working_directory(self, path: str) -> Dict[str, str]:
|
|
"""Set and validate working directory."""
|
|
if not os.path.exists(path):
|
|
return {"status": "error", "message": "Directory does not exist"}
|
|
|
|
self.working_directory = path
|
|
|
|
# Create default data.json if it doesn't exist
|
|
data_path = os.path.join(path, "data.json")
|
|
if not os.path.exists(data_path):
|
|
with open(data_path, "w") as f:
|
|
json.dump({}, f, indent=2)
|
|
|
|
return {"status": "success", "path": path}
|
|
|
|
def get_script_groups(self) -> List[Dict[str, Any]]:
|
|
"""Returns list of available script groups with their descriptions."""
|
|
groups = []
|
|
for d in os.listdir(self.script_groups_path):
|
|
group_path = os.path.join(self.script_groups_path, d)
|
|
if os.path.isdir(group_path):
|
|
description = self._get_group_description(group_path)
|
|
groups.append(
|
|
{
|
|
"id": d,
|
|
"name": description.get("name", d),
|
|
"description": description.get(
|
|
"description", "Sin descripción"
|
|
),
|
|
"version": description.get("version", "1.0"),
|
|
"author": description.get("author", "Unknown"),
|
|
}
|
|
)
|
|
return groups
|
|
|
|
def _get_group_description(self, group_path: str) -> Dict[str, Any]:
|
|
"""Get description for a script group."""
|
|
description_file = os.path.join(group_path, "description.json")
|
|
try:
|
|
if os.path.exists(description_file):
|
|
with open(description_file, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error reading group description: {e}")
|
|
return {}
|
|
|
|
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
"""Get configuration for specified level."""
|
|
if level == "1":
|
|
path = os.path.join(self.data_path, "data.json")
|
|
elif level == "2":
|
|
path = os.path.join(self.script_groups_path, group, "data.json")
|
|
elif level == "3":
|
|
if not self.working_directory:
|
|
return {} # Return empty config if working directory not set
|
|
path = os.path.join(self.working_directory, "data.json")
|
|
|
|
try:
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
return {} # Return empty config if file doesn't exist
|
|
|
|
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
"""Get schema for specified level."""
|
|
try:
|
|
# Clean level parameter
|
|
level = str(level).split("-")[0]
|
|
|
|
# Determine schema path based on level
|
|
if level == "1":
|
|
path = os.path.join(self.data_path, "esquema_general.json")
|
|
elif level == "2":
|
|
path = os.path.join(
|
|
self.script_groups_path, group, "esquema_group.json"
|
|
)
|
|
elif level == "3":
|
|
if not group:
|
|
return {"type": "object", "properties": {}}
|
|
path = os.path.join(self.script_groups_path, group, "esquema_work.json")
|
|
else:
|
|
return {"type": "object", "properties": {}}
|
|
|
|
# Read existing schema from whichever file exists
|
|
if os.path.exists(path):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
schema = json.load(f)
|
|
return (
|
|
schema
|
|
if isinstance(schema, dict)
|
|
else {"type": "object", "properties": {}}
|
|
)
|
|
|
|
# Create default schema if no file exists
|
|
default_schema = {"type": "object", "properties": {}}
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(default_schema, f, indent=2)
|
|
return default_schema
|
|
|
|
except Exception as e:
|
|
print(f"Error loading schema: {str(e)}")
|
|
return {"type": "object", "properties": {}}
|
|
|
|
def update_schema(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> Dict[str, str]:
|
|
"""Update schema for specified level and clean corresponding config."""
|
|
try:
|
|
# Determinar rutas de schema y config
|
|
if level == "1":
|
|
schema_path = os.path.join(self.data_path, "esquema_general.json")
|
|
config_path = os.path.join(self.data_path, "data.json")
|
|
elif level == "2":
|
|
schema_path = os.path.join(
|
|
self.script_groups_path, group, "esquema_group.json"
|
|
)
|
|
config_path = os.path.join(self.script_groups_path, group, "data.json")
|
|
elif level == "3":
|
|
if not group:
|
|
return {
|
|
"status": "error",
|
|
"message": "Group is required for level 3",
|
|
}
|
|
schema_path = os.path.join(
|
|
self.script_groups_path, group, "esquema_work.json"
|
|
)
|
|
config_path = (
|
|
os.path.join(self.working_directory, "data.json")
|
|
if self.working_directory
|
|
else None
|
|
)
|
|
else:
|
|
return {"status": "error", "message": "Invalid level"}
|
|
|
|
# Ensure directory exists
|
|
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
|
|
|
|
# Validate schema structure
|
|
if (
|
|
not isinstance(data, dict)
|
|
or "type" not in data
|
|
or "properties" not in data
|
|
):
|
|
data = {
|
|
"type": "object",
|
|
"properties": data if isinstance(data, dict) else {},
|
|
}
|
|
|
|
# Write schema
|
|
with open(schema_path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Clean corresponding config file
|
|
self._clean_config_for_schema(config_path, data)
|
|
|
|
return {"status": "success"}
|
|
|
|
except Exception as e:
|
|
print(f"Error updating schema: {str(e)}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
def _clean_config_for_schema(
|
|
self, config_path: str, schema: Dict[str, Any]
|
|
) -> None:
|
|
"""Clean configuration file to match schema structure."""
|
|
if not config_path or not os.path.exists(config_path):
|
|
return
|
|
|
|
try:
|
|
# Cargar configuración actual
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
|
|
# Limpiar configuración recursivamente
|
|
cleaned_config = self._clean_object_against_schema(config, schema)
|
|
|
|
# Guardar configuración limpia
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
json.dump(cleaned_config, f, indent=2, ensure_ascii=False)
|
|
|
|
except Exception as e:
|
|
print(f"Error cleaning config: {str(e)}")
|
|
|
|
def _clean_object_against_schema(
|
|
self, data: Dict[str, Any], schema: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""Recursively clean object to match schema structure."""
|
|
if not isinstance(data, dict) or not isinstance(schema, dict):
|
|
return {}
|
|
|
|
result = {}
|
|
schema_props = schema.get("properties", {})
|
|
|
|
for key, value in data.items():
|
|
# Solo mantener campos que existen en el schema
|
|
if key in schema_props:
|
|
prop_schema = schema_props[key]
|
|
|
|
# Si es un objeto anidado, limpiar recursivamente
|
|
if prop_schema.get("type") == "object":
|
|
result[key] = self._clean_object_against_schema(value, prop_schema)
|
|
# Si es un enum, verificar que el valor sea válido
|
|
elif "enum" in prop_schema:
|
|
if value in prop_schema["enum"]:
|
|
result[key] = value
|
|
# Para otros tipos, mantener el valor
|
|
else:
|
|
result[key] = value
|
|
|
|
return result
|
|
|
|
def update_config(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> Dict[str, str]:
|
|
"""Update configuration for specified level."""
|
|
if level == "3" and not self.working_directory:
|
|
return {"status": "error", "message": "Working directory not set"}
|
|
|
|
if level == "1":
|
|
path = os.path.join(self.data_path, "data.json")
|
|
elif level == "2":
|
|
path = os.path.join(self.script_groups_path, group, "data.json")
|
|
elif level == "3":
|
|
path = os.path.join(self.working_directory, "data.json")
|
|
|
|
with open(path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def list_scripts(self, group: str) -> List[Dict[str, str]]:
|
|
"""List all scripts in a group with their descriptions."""
|
|
try:
|
|
scripts_dir = os.path.join(self.script_groups_path, group)
|
|
scripts = []
|
|
|
|
if not os.path.exists(scripts_dir):
|
|
print(f"Directory not found: {scripts_dir}")
|
|
return []
|
|
|
|
for file in os.listdir(scripts_dir):
|
|
# Modificar la condición para incluir cualquier archivo .py
|
|
if file.endswith(".py"):
|
|
path = os.path.join(scripts_dir, file)
|
|
description = self._extract_script_description(path)
|
|
print(
|
|
f"Found script: {file} with description: {description}"
|
|
) # Debug line
|
|
scripts.append({"name": file, "description": description})
|
|
|
|
print(f"Total scripts found: {len(scripts)}") # Debug line
|
|
return scripts
|
|
except Exception as e:
|
|
print(f"Error listing scripts: {str(e)}") # Debug line
|
|
return []
|
|
|
|
def _extract_script_description(self, script_path: str) -> str:
|
|
"""Extract description from script's docstring or initial comments."""
|
|
try:
|
|
with open(script_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
# Try to find docstring
|
|
docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL)
|
|
if docstring_match:
|
|
return docstring_match.group(1).strip()
|
|
|
|
# Try to find initial comment
|
|
comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE)
|
|
if comment_match:
|
|
return comment_match.group(1).strip()
|
|
|
|
return "No description available"
|
|
except Exception as e:
|
|
print(
|
|
f"Error extracting description from {script_path}: {str(e)}"
|
|
) # Debug line
|
|
return "Error reading script description"
|
|
|
|
def execute_script(
|
|
self, group: str, script_name: str, broadcast_fn=None
|
|
) -> Dict[str, Any]:
|
|
"""Execute script with real-time logging via WebSocket broadcast function."""
|
|
# Check execution throttling
|
|
current_time = time.time()
|
|
time_since_last = current_time - self.last_execution_time
|
|
|
|
if time_since_last < self.min_execution_interval:
|
|
if broadcast_fn:
|
|
broadcast_fn(
|
|
f"Por favor espere {self.min_execution_interval} segundo(s) entre ejecuciones"
|
|
)
|
|
return {
|
|
"status": "throttled",
|
|
"error": f"Por favor espere {self.min_execution_interval} segundo(s) entre ejecuciones",
|
|
}
|
|
|
|
self.last_execution_time = current_time
|
|
script_path = os.path.join(self.script_groups_path, group, script_name)
|
|
|
|
if not os.path.exists(script_path):
|
|
if broadcast_fn:
|
|
broadcast_fn("Error: Script no encontrado")
|
|
return {"status": "error", "error": "Script not found"}
|
|
|
|
# Get working directory
|
|
working_dir = self.get_work_dir(group)
|
|
if not working_dir:
|
|
if broadcast_fn:
|
|
broadcast_fn("Error: Directorio de trabajo no configurado")
|
|
return {"status": "error", "error": "Working directory not set"}
|
|
|
|
# Prepare environment configurations
|
|
configs = {
|
|
"level1": self.get_config("1"),
|
|
"level2": self.get_config("2", group),
|
|
"level3": self.get_config("3", group) if working_dir else {},
|
|
"working_directory": working_dir,
|
|
}
|
|
|
|
try:
|
|
if broadcast_fn:
|
|
broadcast_fn(f"Iniciando ejecución de {script_name}")
|
|
|
|
# Execute script with configured environment
|
|
process = subprocess.Popen(
|
|
["python", script_path],
|
|
cwd=working_dir,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
env=dict(
|
|
os.environ,
|
|
SCRIPT_CONFIGS=json.dumps(configs),
|
|
PYTHONIOENCODING="utf-8",
|
|
),
|
|
)
|
|
|
|
# Stream output in real-time
|
|
while True:
|
|
line = process.stdout.readline()
|
|
if not line and process.poll() is not None:
|
|
break
|
|
if line and broadcast_fn:
|
|
broadcast_fn(line.rstrip())
|
|
|
|
# Handle any errors
|
|
stderr = process.stderr.read()
|
|
if stderr and broadcast_fn:
|
|
broadcast_fn(f"ERROR: {stderr.strip()}")
|
|
|
|
# Signal completion
|
|
if broadcast_fn:
|
|
broadcast_fn("Ejecución completada")
|
|
|
|
return {
|
|
"status": "success" if process.returncode == 0 else "error",
|
|
"error": stderr if stderr else None,
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if broadcast_fn:
|
|
broadcast_fn(f"Error inesperado: {error_msg}")
|
|
return {"status": "error", "error": error_msg}
|
|
|
|
def get_work_dir(self, group: str) -> str:
|
|
"""Get working directory path for a script group."""
|
|
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
|
|
try:
|
|
with open(work_dir_path, "r") as f:
|
|
data = json.load(f)
|
|
path = data.get("path", "")
|
|
# Normalizar separadores de ruta
|
|
if path:
|
|
path = os.path.normpath(path)
|
|
# Actualizar la variable de instancia si hay una ruta válida
|
|
if path and os.path.exists(path):
|
|
self.working_directory = path
|
|
return path
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return ""
|
|
|
|
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
|
|
"""Set working directory path for a script group and update history."""
|
|
# Normalizar el path recibido
|
|
path = os.path.normpath(path)
|
|
|
|
if not os.path.exists(path):
|
|
return {"status": "error", "message": "Directory does not exist"}
|
|
|
|
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
|
|
|
|
try:
|
|
# Cargar datos existentes o crear nuevos
|
|
try:
|
|
with open(work_dir_path, "r") as f:
|
|
data = json.load(f)
|
|
# Normalizar paths existentes en el historial
|
|
if "history" in data:
|
|
data["history"] = [os.path.normpath(p) for p in data["history"]]
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
data = {"path": "", "history": []}
|
|
|
|
# Actualizar path actual
|
|
data["path"] = path
|
|
|
|
# Actualizar historial
|
|
if "history" not in data:
|
|
data["history"] = []
|
|
|
|
# Eliminar la ruta del historial si ya existe (usando path normalizado)
|
|
data["history"] = [p for p in data["history"] if os.path.normpath(p) != path]
|
|
|
|
# Agregar la ruta al principio del historial
|
|
data["history"].insert(0, path)
|
|
|
|
# Mantener solo los últimos 10 directorios
|
|
data["history"] = data["history"][:10]
|
|
|
|
# Guardar datos actualizados
|
|
with open(work_dir_path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
# Actualizar la variable de instancia
|
|
self.working_directory = path
|
|
|
|
# Crear data.json en el directorio de trabajo si no existe
|
|
data_path = os.path.join(path, "data.json")
|
|
if not os.path.exists(data_path):
|
|
with open(data_path, "w") as f:
|
|
json.dump({}, f, indent=2)
|
|
|
|
return {"status": "success", "path": path}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
def get_directory_history(self, group: str) -> List[str]:
|
|
"""Get the directory history for a script group."""
|
|
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
|
|
try:
|
|
with open(work_dir_path, "r") as f:
|
|
data = json.load(f)
|
|
# Normalizar todos los paths en el historial
|
|
history = [os.path.normpath(p) for p in data.get("history", [])]
|
|
# Filtrar solo directorios que existen
|
|
return [p for p in history if os.path.exists(p)]
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return []
|