232 lines
7.1 KiB
Python
232 lines
7.1 KiB
Python
from flask import Flask, render_template, request, jsonify, url_for
|
|
from flask_sock import Sock
|
|
from config_manager import ConfigurationManager
|
|
from datetime import datetime
|
|
import os
|
|
import json # Added import
|
|
|
|
app = Flask(
|
|
__name__, static_url_path="", static_folder="static", template_folder="templates"
|
|
)
|
|
sock = Sock(app)
|
|
config_manager = ConfigurationManager()
|
|
|
|
# Lista global para mantener las conexiones WebSocket activas
|
|
websocket_connections = set()
|
|
|
|
|
|
@sock.route("/ws")
|
|
def handle_websocket(ws):
|
|
try:
|
|
websocket_connections.add(ws)
|
|
while True:
|
|
message = ws.receive()
|
|
if message:
|
|
broadcast_message(message)
|
|
except Exception as e:
|
|
print(f"WebSocket error: {e}")
|
|
finally:
|
|
websocket_connections.remove(ws)
|
|
|
|
|
|
def broadcast_message(message):
|
|
"""Envía un mensaje a todas las conexiones WebSocket activas y guarda en log."""
|
|
dead_connections = set()
|
|
timestamp = datetime.now().strftime("[%H:%M:%S] ")
|
|
|
|
# Si es una lista de mensajes, procesar cada uno
|
|
if isinstance(message, list):
|
|
messages = message
|
|
else:
|
|
# Si es un solo mensaje, dividirlo en líneas
|
|
messages = [line.strip() for line in message.splitlines() if line.strip()]
|
|
|
|
# Procesar cada mensaje
|
|
for msg in messages:
|
|
# Limpiar timestamps duplicados al inicio del mensaje
|
|
while msg.startswith("[") and "]" in msg:
|
|
try:
|
|
closing_bracket = msg.index("]") + 1
|
|
if msg[1 : closing_bracket - 1].replace(":", "").isdigit():
|
|
msg = msg[closing_bracket:].strip()
|
|
else:
|
|
break
|
|
except:
|
|
break
|
|
|
|
# Añadir un único timestamp
|
|
formatted_msg = f"{timestamp}{msg}"
|
|
|
|
# Escribir en el archivo de log
|
|
with open(config_manager.log_file, "a", encoding="utf-8") as f:
|
|
f.write(f"{formatted_msg}\n")
|
|
|
|
# Enviar a todos los clientes WebSocket
|
|
for ws in list(websocket_connections):
|
|
try:
|
|
if ws.connected:
|
|
ws.send(f"{formatted_msg}\n")
|
|
except Exception:
|
|
dead_connections.add(ws)
|
|
|
|
# Limpiar conexiones muertas
|
|
websocket_connections.difference_update(dead_connections)
|
|
|
|
|
|
@app.route("/api/execute_script", methods=["POST"])
|
|
def execute_script():
|
|
try:
|
|
script_group = request.json["group"]
|
|
script_name = request.json["script"]
|
|
|
|
# Ejecutar el script y obtener resultado
|
|
result = config_manager.execute_script(
|
|
script_group, script_name, broadcast_message
|
|
)
|
|
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
error_msg = f"Error ejecutando script: {str(e)}"
|
|
broadcast_message(error_msg)
|
|
return jsonify({"error": error_msg})
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
script_groups = config_manager.get_script_groups()
|
|
return render_template("index.html", script_groups=script_groups)
|
|
|
|
|
|
@app.route("/api/config/<level>", methods=["GET", "POST"])
|
|
def handle_config(level):
|
|
group = request.args.get("group")
|
|
if request.method == "GET":
|
|
try:
|
|
return jsonify(config_manager.get_config(level, group))
|
|
except FileNotFoundError:
|
|
return jsonify({})
|
|
else:
|
|
data = request.json
|
|
config_manager.update_config(level, data, group)
|
|
return jsonify({"status": "success"})
|
|
|
|
|
|
@app.route("/api/schema/<level>", methods=["GET", "POST"])
|
|
def handle_schema(level):
|
|
group = request.args.get("group")
|
|
if request.method == "GET":
|
|
return jsonify(config_manager.get_schema(level, group))
|
|
else:
|
|
data = request.json
|
|
config_manager.update_schema(level, data, group)
|
|
return jsonify({"status": "success"})
|
|
|
|
|
|
@app.route("/api/scripts/<group>")
|
|
def get_scripts(group):
|
|
return jsonify(config_manager.list_scripts(group))
|
|
|
|
|
|
@app.route("/api/working-directory", methods=["POST"])
|
|
def set_working_directory():
|
|
data = request.json
|
|
if not data:
|
|
return jsonify({"status": "error", "message": "No data provided"})
|
|
|
|
path = data.get("path")
|
|
group = data.get("group")
|
|
|
|
if not path or not group:
|
|
return jsonify(
|
|
{
|
|
"status": "error",
|
|
"message": f"Missing required fields. Path: {path}, Group: {group}",
|
|
}
|
|
)
|
|
|
|
print(f"Setting working directory - Path: {path}, Group: {group}") # Debug line
|
|
return jsonify(config_manager.set_work_dir(group, path))
|
|
|
|
|
|
@app.route("/api/working-directory/<group>", methods=["GET"])
|
|
def get_working_directory(group):
|
|
path = config_manager.get_work_dir(group)
|
|
return jsonify({"path": path, "status": "success" if path else "not_set"})
|
|
|
|
|
|
@app.route("/api/browse-directories")
|
|
def browse_directories():
|
|
import tkinter as tk
|
|
from tkinter import filedialog
|
|
|
|
# Obtener el directorio inicial
|
|
current_dir = request.args.get("current_path")
|
|
if not current_dir or not os.path.exists(current_dir):
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# Crear y configurar la ventana principal de tkinter
|
|
root = tk.Tk()
|
|
root.attributes("-topmost", True) # Mantener la ventana siempre arriba
|
|
root.withdraw()
|
|
|
|
# Abrir el diálogo de selección de directorio
|
|
directory = filedialog.askdirectory(
|
|
initialdir=current_dir, title="Seleccionar Directorio de Trabajo"
|
|
)
|
|
|
|
# Destruir la ventana de tkinter
|
|
root.destroy()
|
|
|
|
if directory:
|
|
return jsonify({"status": "success", "path": directory})
|
|
return jsonify({"status": "cancelled"})
|
|
|
|
|
|
@app.route("/api/logs", methods=["GET", "DELETE"])
|
|
def handle_logs():
|
|
if request.method == "GET":
|
|
return jsonify({"logs": config_manager.read_log()})
|
|
else: # DELETE
|
|
success = config_manager.clear_log()
|
|
return jsonify({"status": "success" if success else "error"})
|
|
|
|
|
|
@app.route("/api/group-description/<group>", methods=["GET", "POST"])
|
|
def handle_group_description(group):
|
|
description_path = os.path.join(
|
|
config_manager.script_groups_path, group, "description.json"
|
|
)
|
|
|
|
if request.method == "GET":
|
|
try:
|
|
with open(description_path, "r", encoding="utf-8") as f:
|
|
return jsonify(json.load(f))
|
|
except FileNotFoundError:
|
|
return jsonify(
|
|
{
|
|
"name": group,
|
|
"description": "Sin descripción",
|
|
"version": "1.0",
|
|
"author": "Unknown",
|
|
}
|
|
)
|
|
else: # POST
|
|
try:
|
|
data = request.json
|
|
os.makedirs(os.path.dirname(description_path), exist_ok=True)
|
|
with open(description_path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
return jsonify({"status": "success"})
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
|
|
|
|
@app.route("/api/directory-history/<group>")
|
|
def get_directory_history(group):
|
|
history = config_manager.get_directory_history(group)
|
|
return jsonify(history)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=True)
|