299 lines
11 KiB
Python
299 lines
11 KiB
Python
import os
|
|
import json
|
|
import subprocess
|
|
import re
|
|
from typing import Dict, Any, List
|
|
|
|
|
|
class ConfigurationManager:
|
|
def __init__(self):
|
|
self.base_path = os.path.dirname(os.path.abspath(__file__))
|
|
self.data_path = os.path.join(self.base_path, "data")
|
|
self.script_groups_path = os.path.join(
|
|
self.base_path, "backend", "script_groups"
|
|
)
|
|
self.working_directory = None
|
|
self.log_file = os.path.join(self.data_path, "log.txt")
|
|
self._init_log_file()
|
|
|
|
def _init_log_file(self):
|
|
"""Initialize log file if it doesn't exist"""
|
|
if not os.path.exists(self.data_path):
|
|
os.makedirs(self.data_path)
|
|
if not os.path.exists(self.log_file):
|
|
with open(self.log_file, "w", encoding="utf-8") as f:
|
|
f.write("")
|
|
|
|
def append_log(self, message: str) -> None:
|
|
"""Append a message to the log file"""
|
|
try:
|
|
with open(self.log_file, "a", encoding="utf-8") as f:
|
|
f.write(message)
|
|
except Exception as e:
|
|
print(f"Error writing to log file: {e}")
|
|
|
|
def read_log(self) -> str:
|
|
"""Read the entire log file"""
|
|
try:
|
|
with open(self.log_file, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
except Exception as e:
|
|
print(f"Error reading log file: {e}")
|
|
return ""
|
|
|
|
def clear_log(self) -> bool:
|
|
"""Clear the log file"""
|
|
try:
|
|
with open(self.log_file, "w", encoding="utf-8") as f:
|
|
f.write("")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error clearing log file: {e}")
|
|
return False
|
|
|
|
def set_working_directory(self, path: str) -> Dict[str, str]:
|
|
"""Set and validate working directory."""
|
|
if not os.path.exists(path):
|
|
return {"status": "error", "message": "Directory does not exist"}
|
|
|
|
self.working_directory = path
|
|
|
|
# Create default data.json if it doesn't exist
|
|
data_path = os.path.join(path, "data.json")
|
|
if not os.path.exists(data_path):
|
|
with open(data_path, "w") as f:
|
|
json.dump({}, f, indent=2)
|
|
|
|
return {"status": "success", "path": path}
|
|
|
|
def get_script_groups(self) -> List[str]:
|
|
"""Returns list of available script groups."""
|
|
return [
|
|
d
|
|
for d in os.listdir(self.script_groups_path)
|
|
if os.path.isdir(os.path.join(self.script_groups_path, d))
|
|
]
|
|
|
|
def get_config(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
"""Get configuration for specified level."""
|
|
if level == "1":
|
|
path = os.path.join(self.data_path, "data.json")
|
|
elif level == "2":
|
|
path = os.path.join(self.script_groups_path, group, "data.json")
|
|
elif level == "3":
|
|
if not self.working_directory:
|
|
return {} # Return empty config if working directory not set
|
|
path = os.path.join(self.working_directory, "data.json")
|
|
|
|
try:
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
return {} # Return empty config if file doesn't exist
|
|
|
|
def get_schema(self, level: str, group: str = None) -> Dict[str, Any]:
|
|
"""Get schema for specified level."""
|
|
# Clean level parameter (remove -form suffix if present)
|
|
level = level.split("-")[0]
|
|
|
|
try:
|
|
if level == "1":
|
|
path = os.path.join(self.data_path, "esquema.json")
|
|
elif level == "2":
|
|
path = os.path.join(self.script_groups_path, "esquema.json")
|
|
elif level == "3":
|
|
if not group:
|
|
return {} # Return empty schema if no group is specified
|
|
path = os.path.join(self.script_groups_path, group, "esquema.json")
|
|
else:
|
|
return {} # Return empty schema for invalid levels
|
|
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
return {} # Return empty schema if file doesn't exist
|
|
except json.JSONDecodeError:
|
|
return {} # Return empty schema if file is invalid JSON
|
|
|
|
def update_config(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> Dict[str, str]:
|
|
"""Update configuration for specified level."""
|
|
if level == "3" and not self.working_directory:
|
|
return {"status": "error", "message": "Working directory not set"}
|
|
|
|
if level == "1":
|
|
path = os.path.join(self.data_path, "data.json")
|
|
elif level == "2":
|
|
path = os.path.join(self.script_groups_path, group, "data.json")
|
|
elif level == "3":
|
|
path = os.path.join(self.working_directory, "data.json")
|
|
|
|
with open(path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def update_schema(
|
|
self, level: str, data: Dict[str, Any], group: str = None
|
|
) -> None:
|
|
"""Update schema for specified level."""
|
|
if level == "1":
|
|
path = os.path.join(self.data_path, "esquema.json")
|
|
elif level == "2":
|
|
path = os.path.join(self.script_groups_path, "esquema.json")
|
|
elif level == "3":
|
|
path = os.path.join(self.script_groups_path, group, "esquema.json")
|
|
|
|
with open(path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def list_scripts(self, group: str) -> List[Dict[str, str]]:
|
|
"""List all scripts in a group with their descriptions."""
|
|
try:
|
|
scripts_dir = os.path.join(self.script_groups_path, group)
|
|
scripts = []
|
|
|
|
if not os.path.exists(scripts_dir):
|
|
print(f"Directory not found: {scripts_dir}")
|
|
return []
|
|
|
|
for file in os.listdir(scripts_dir):
|
|
# Modificar la condición para incluir cualquier archivo .py
|
|
if file.endswith(".py"):
|
|
path = os.path.join(scripts_dir, file)
|
|
description = self._extract_script_description(path)
|
|
print(
|
|
f"Found script: {file} with description: {description}"
|
|
) # Debug line
|
|
scripts.append({"name": file, "description": description})
|
|
|
|
print(f"Total scripts found: {len(scripts)}") # Debug line
|
|
return scripts
|
|
except Exception as e:
|
|
print(f"Error listing scripts: {str(e)}") # Debug line
|
|
return []
|
|
|
|
def _extract_script_description(self, script_path: str) -> str:
|
|
"""Extract description from script's docstring or initial comments."""
|
|
try:
|
|
with open(script_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
# Try to find docstring
|
|
docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL)
|
|
if docstring_match:
|
|
return docstring_match.group(1).strip()
|
|
|
|
# Try to find initial comment
|
|
comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE)
|
|
if comment_match:
|
|
return comment_match.group(1).strip()
|
|
|
|
return "No description available"
|
|
except Exception as e:
|
|
print(
|
|
f"Error extracting description from {script_path}: {str(e)}"
|
|
) # Debug line
|
|
return "Error reading script description"
|
|
|
|
def execute_script(
|
|
self, group: str, script_name: str, broadcast_fn=None
|
|
) -> Dict[str, Any]:
|
|
"""Execute script with real-time logging via WebSocket broadcast function."""
|
|
script_path = os.path.join(self.script_groups_path, group, script_name)
|
|
if not os.path.exists(script_path):
|
|
return {"error": "Script not found"}
|
|
|
|
# Obtener el directorio de trabajo del grupo
|
|
working_dir = self.get_work_dir(group)
|
|
if not working_dir:
|
|
return {"error": "Working directory not set for this script group"}
|
|
|
|
configs = {
|
|
"level1": self.get_config("1"),
|
|
"level2": self.get_config("2", group),
|
|
"level3": self.get_config("3") if self.working_directory else {},
|
|
}
|
|
|
|
try:
|
|
if broadcast_fn:
|
|
broadcast_fn(f"\nIniciando ejecución de {script_name}...\n")
|
|
|
|
process = subprocess.Popen(
|
|
["python", script_path],
|
|
cwd=working_dir or self.base_path,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
env=dict(os.environ, **{"SCRIPT_CONFIGS": json.dumps(configs)}),
|
|
)
|
|
|
|
output = []
|
|
while True:
|
|
line = process.stdout.readline()
|
|
if not line and process.poll() is not None:
|
|
break
|
|
if line:
|
|
output.append(line)
|
|
if broadcast_fn:
|
|
broadcast_fn(line)
|
|
|
|
stderr = process.stderr.read()
|
|
if stderr:
|
|
if broadcast_fn:
|
|
broadcast_fn(f"\nERROR: {stderr}\n")
|
|
output.append(f"ERROR: {stderr}")
|
|
|
|
if broadcast_fn:
|
|
broadcast_fn("\nEjecución completada.\n")
|
|
|
|
return {
|
|
"output": "".join(output),
|
|
"error": stderr if stderr else None,
|
|
"status": "success" if process.returncode == 0 else "error",
|
|
}
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if broadcast_fn:
|
|
broadcast_fn(f"\nError inesperado: {error_msg}\n")
|
|
return {"error": error_msg}
|
|
|
|
def get_work_dir(self, group: str) -> str:
|
|
"""Get working directory path for a script group."""
|
|
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
|
|
try:
|
|
with open(work_dir_path, "r") as f:
|
|
data = json.load(f)
|
|
path = data.get("path", "")
|
|
# Actualizar la variable de instancia si hay una ruta válida
|
|
if path and os.path.exists(path):
|
|
self.working_directory = path
|
|
return path
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return ""
|
|
|
|
def set_work_dir(self, group: str, path: str) -> Dict[str, str]:
|
|
"""Set working directory path for a script group."""
|
|
if not os.path.exists(path):
|
|
return {"status": "error", "message": "Directory does not exist"}
|
|
|
|
work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json")
|
|
|
|
try:
|
|
# Guardar la ruta en work_dir.json
|
|
with open(work_dir_path, "w") as f:
|
|
json.dump({"path": path}, f, indent=2)
|
|
|
|
# Actualizar la variable de instancia
|
|
self.working_directory = path
|
|
|
|
# Crear data.json en el directorio de trabajo si no existe
|
|
data_path = os.path.join(path, "data.json")
|
|
if not os.path.exists(data_path):
|
|
with open(data_path, "w") as f:
|
|
json.dump({}, f, indent=2)
|
|
|
|
return {"status": "success", "path": path}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|