ParamManagerScripts/config_manager.py

440 lines
17 KiB
Python

import os
import json
import subprocess
import re
from typing import Dict, Any, List
class ConfigurationManager:
def __init__(self):
self.base_path = os.path.dirname(os.path.abspath(__file__))
self.data_path = os.path.join(self.base_path, "data")
self.script_groups_path = os.path.join(
self.base_path, "backend", "script_groups"
)
self.working_directory = None
self.log_file = os.path.join(self.data_path, "log.txt")
self._init_log_file()
def _init_log_file(self):
"""Initialize log file if it doesn't exist"""
if not os.path.exists(self.data_path):
os.makedirs(self.data_path)
if not os.path.exists(self.log_file):
with open(self.log_file, "w", encoding="utf-8") as f:
f.write("")
def append_log(self, message: str) -> None:
"""Append a message to the log file"""
try:
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(message)
except Exception as e:
print(f"Error writing to log file: {e}")
def read_log(self) -> str:
"""Read the entire log file"""
try:
with open(self.log_file, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"Error reading log file: {e}")
return ""
def clear_log(self) -> bool:
"""Clear the log file"""
try:
with open(self.log_file, "w", encoding="utf-8") as f:
f.write("")
return True
except Exception as e:
print(f"Error clearing log file: {e}")
return False
def set_working_directory(self, path: str) -> Dict[str, str]:
"""Set and validate working directory."""
if not os.path.exists(path):
return {"status": "error", "message": "Directory does not exist"}
self.working_directory = path
# Create default data.json if it doesn't exist
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
with open(data_path, "w") as f:
json.dump({}, f, indent=2)
return {"status": "success", "path": path}
def get_script_groups(self) -> List[Dict[str, Any]]:
"""Returns list of available script groups with their descriptions."""
groups = []
for d in os.listdir(self.script_groups_path):
group_path = os.path.join(self.script_groups_path, d)
if os.path.isdir(group_path):
description = self._get_group_description(group_path)
groups.append(
{
"id": d,
"name": description.get("name", d),
"description": description.get(
"description", "Sin descripción"
),
"version": description.get("version", "1.0"),
"author": description.get("author", "Unknown"),
}
)
return groups
def _get_group_description(self, group_path: str) -> Dict[str, Any]:
"""Get description for a script group."""
description_file = os.path.join(group_path, "description.json")
try:
if os.path.exists(description_file):
with open(description_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"Error reading group description: {e}")
return {}
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get configuration for specified level."""
if level == "1":
path = os.path.join(self.data_path, "data.json")
elif level == "2":
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
if not self.working_directory:
return {} # Return empty config if working directory not set
path = os.path.join(self.working_directory, "data.json")
try:
with open(path, "r") as f:
return json.load(f)
except FileNotFoundError:
return {} # Return empty config if file doesn't exist
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
"""Get schema for specified level."""
try:
# Clean level parameter
level = str(level).split("-")[0]
# Determine schema path based on level
if level == "1":
path = os.path.join(self.data_path, "esquema.json")
# Try esquema.json first, then schema.json if not found
if not os.path.exists(path):
path = os.path.join(self.data_path, "schema.json")
elif level == "2":
path = os.path.join(self.script_groups_path, group, "esquema.json")
# Try esquema.json first, then schema.json if not found
if not os.path.exists(path):
path = os.path.join(self.script_groups_path, group, "schema.json")
elif level == "3":
if not group:
return {"type": "object", "properties": {}}
path = os.path.join(self.script_groups_path, group, "esquema.json")
# Try esquema.json first, then schema.json if not found
if not os.path.exists(path):
path = os.path.join(self.script_groups_path, group, "schema.json")
else:
return {"type": "object", "properties": {}}
# Read existing schema from whichever file exists
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
schema = json.load(f)
return (
schema
if isinstance(schema, dict)
else {"type": "object", "properties": {}}
)
# Create default schema if no file exists
default_schema = {"type": "object", "properties": {}}
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(default_schema, f, indent=2)
return default_schema
except Exception as e:
print(f"Error loading schema: {str(e)}")
return {"type": "object", "properties": {}}
def update_schema(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update schema for specified level and clean corresponding config."""
try:
# Determinar rutas de schema y config
if level == "1":
schema_path = os.path.join(self.data_path, "esquema.json")
config_path = os.path.join(self.data_path, "data.json")
elif level == "2":
schema_path = os.path.join(
self.script_groups_path, group, "esquema.json"
)
config_path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
if not group:
return {
"status": "error",
"message": "Group is required for level 3",
}
schema_path = os.path.join(
self.script_groups_path, group, "esquema.json"
)
config_path = (
os.path.join(self.working_directory, "data.json")
if self.working_directory
else None
)
else:
return {"status": "error", "message": "Invalid level"}
# Ensure directory exists
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
# Validate schema structure
if (
not isinstance(data, dict)
or "type" not in data
or "properties" not in data
):
data = {
"type": "object",
"properties": data if isinstance(data, dict) else {},
}
# Write schema
with open(schema_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Clean corresponding config file
self._clean_config_for_schema(config_path, data)
return {"status": "success"}
except Exception as e:
print(f"Error updating schema: {str(e)}")
return {"status": "error", "message": str(e)}
def _clean_config_for_schema(
self, config_path: str, schema: Dict[str, Any]
) -> None:
"""Clean configuration file to match schema structure."""
if not config_path or not os.path.exists(config_path):
return
try:
# Cargar configuración actual
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
# Limpiar configuración recursivamente
cleaned_config = self._clean_object_against_schema(config, schema)
# Guardar configuración limpia
with open(config_path, "w", encoding="utf-8") as f:
json.dump(cleaned_config, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error cleaning config: {str(e)}")
def _clean_object_against_schema(
self, data: Dict[str, Any], schema: Dict[str, Any]
) -> Dict[str, Any]:
"""Recursively clean object to match schema structure."""
if not isinstance(data, dict) or not isinstance(schema, dict):
return {}
result = {}
schema_props = schema.get("properties", {})
for key, value in data.items():
# Solo mantener campos que existen en el schema
if key in schema_props:
prop_schema = schema_props[key]
# Si es un objeto anidado, limpiar recursivamente
if prop_schema.get("type") == "object":
result[key] = self._clean_object_against_schema(value, prop_schema)
# Si es un enum, verificar que el valor sea válido
elif "enum" in prop_schema:
if value in prop_schema["enum"]:
result[key] = value
# Para otros tipos, mantener el valor
else:
result[key] = value
return result
def update_config(
self, level: str, data: Dict[str, Any], group: str = None
) -> Dict[str, str]:
"""Update configuration for specified level."""
if level == "3" and not self.working_directory:
return {"status": "error", "message": "Working directory not set"}
if level == "1":
path = os.path.join(self.data_path, "data.json")
elif level == "2":
path = os.path.join(self.script_groups_path, group, "data.json")
elif level == "3":
path = os.path.join(self.working_directory, "data.json")
with open(path, "w") as f:
json.dump(data, f, indent=2)
def list_scripts(self, group: str) -> List[Dict[str, str]]:
"""List all scripts in a group with their descriptions."""
try:
scripts_dir = os.path.join(self.script_groups_path, group)
scripts = []
if not os.path.exists(scripts_dir):
print(f"Directory not found: {scripts_dir}")
return []
for file in os.listdir(scripts_dir):
# Modificar la condición para incluir cualquier archivo .py
if file.endswith(".py"):
path = os.path.join(scripts_dir, file)
description = self._extract_script_description(path)
print(
f"Found script: {file} with description: {description}"
) # Debug line
scripts.append({"name": file, "description": description})
print(f"Total scripts found: {len(scripts)}") # Debug line
return scripts
except Exception as e:
print(f"Error listing scripts: {str(e)}") # Debug line
return []
def _extract_script_description(self, script_path: str) -> str:
"""Extract description from script's docstring or initial comments."""
try:
with open(script_path, "r", encoding="utf-8") as f:
content = f.read()
# Try to find docstring
docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL)
if docstring_match:
return docstring_match.group(1).strip()
# Try to find initial comment
comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE)
if comment_match:
return comment_match.group(1).strip()
return "No description available"
except Exception as e:
print(
f"Error extracting description from {script_path}: {str(e)}"
) # Debug line
return "Error reading script description"
def execute_script(
self, group: str, script_name: str, broadcast_fn=None
) -> Dict[str, Any]:
"""Execute script with real-time logging via WebSocket broadcast function."""
script_path = os.path.join(self.script_groups_path, group, script_name)
if not os.path.exists(script_path):
return {"error": "Script not found"}
# Obtener el directorio de trabajo del grupo
working_dir = self.get_work_dir(group)
if not working_dir:
return {"error": "Working directory not set for this script group"}
configs = {
"level1": self.get_config("1"),
"level2": self.get_config("2", group),
"level3": self.get_config("3") if self.working_directory else {},
}
try:
if broadcast_fn:
broadcast_fn(f"\nIniciando ejecución de {script_name}...\n")
process = subprocess.Popen(
["python", script_path],
cwd=working_dir or self.base_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=dict(os.environ, **{"SCRIPT_CONFIGS": json.dumps(configs)}),
)
output = []
while True:
line = process.stdout.readline()
if not line and process.poll() is not None:
break
if line:
output.append(line)
if broadcast_fn:
broadcast_fn(line)
stderr = process.stderr.read()
if stderr:
if broadcast_fn:
broadcast_fn(f"\nERROR: {stderr}\n")
output.append(f"ERROR: {stderr}")
if broadcast_fn:
broadcast_fn("\nEjecución completada.\n")
return {
"output": "".join(output),
"error": stderr if stderr else None,
"status": "success" if process.returncode == 0 else "error",
}
except Exception as e:
error_msg = str(e)
if broadcast_fn:
broadcast_fn(f"\nError inesperado: {error_msg}\n")
return {"error": error_msg}
def get_work_dir(self, group: str) -> str:
"""Get working directory path for a script group."""
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
with open(work_dir_path, "r") as f:
data = json.load(f)
path = data.get("path", "")
# Actualizar la variable de instancia si hay una ruta válida
if path and os.path.exists(path):
self.working_directory = path
return path
except (FileNotFoundError, json.JSONDecodeError):
return ""
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
"""Set working directory path for a script group."""
if not os.path.exists(path):
return {"status": "error", "message": "Directory does not exist"}
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
try:
# Guardar la ruta en work_dir.json
with open(work_dir_path, "w") as f:
json.dump({"path": path}, f, indent=2)
# Actualizar la variable de instancia
self.working_directory = path
# Crear data.json en el directorio de trabajo si no existe
data_path = os.path.join(path, "data.json")
if not os.path.exists(data_path):
with open(data_path, "w") as f:
json.dump({}, f, indent=2)
return {"status": "success", "path": path}
except Exception as e:
return {"status": "error", "message": str(e)}