from flask import Flask, render_template, request, jsonify, url_for from flask_sock import Sock from config_manager import ConfigurationManager from datetime import datetime import os import json # Added import app = Flask( __name__, static_url_path="", static_folder="static", template_folder="templates" ) sock = Sock(app) config_manager = ConfigurationManager() # Lista global para mantener las conexiones WebSocket activas websocket_connections = set() @sock.route("/ws") def handle_websocket(ws): try: websocket_connections.add(ws) while True: message = ws.receive() if message: broadcast_message(message) except Exception as e: print(f"WebSocket error: {e}") finally: websocket_connections.remove(ws) def broadcast_message(message): """Envía un mensaje a todas las conexiones WebSocket activas y guarda en log.""" dead_connections = set() timestamp = datetime.now().strftime("[%H:%M:%S] ") # Si es una lista de mensajes, procesar cada uno if isinstance(message, list): messages = message else: # Si es un solo mensaje, dividirlo en líneas messages = [line.strip() for line in message.splitlines() if line.strip()] # Procesar cada mensaje for msg in messages: # Limpiar timestamps duplicados al inicio del mensaje while msg.startswith("[") and "]" in msg: try: closing_bracket = msg.index("]") + 1 if msg[1 : closing_bracket - 1].replace(":", "").isdigit(): msg = msg[closing_bracket:].strip() else: break except: break # Añadir un único timestamp formatted_msg = f"{timestamp}{msg}" # Escribir en el archivo de log with open(config_manager.log_file, "a", encoding="utf-8") as f: f.write(f"{formatted_msg}\n") # Enviar a todos los clientes WebSocket for ws in list(websocket_connections): try: if ws.connected: ws.send(f"{formatted_msg}\n") except Exception: dead_connections.add(ws) # Limpiar conexiones muertas websocket_connections.difference_update(dead_connections) @app.route("/api/execute_script", methods=["POST"]) def execute_script(): try: script_group = request.json["group"] script_name = request.json["script"] # Ejecutar el script y obtener resultado result = config_manager.execute_script( script_group, script_name, broadcast_message ) return jsonify(result) except Exception as e: error_msg = f"Error ejecutando script: {str(e)}" broadcast_message(error_msg) return jsonify({"error": error_msg}) @app.route("/") def index(): script_groups = config_manager.get_script_groups() return render_template("index.html", script_groups=script_groups) @app.route("/api/config/", methods=["GET", "POST"]) def handle_config(level): group = request.args.get("group") if request.method == "GET": try: return jsonify(config_manager.get_config(level, group)) except FileNotFoundError: return jsonify({}) else: data = request.json config_manager.update_config(level, data, group) return jsonify({"status": "success"}) @app.route("/api/schema/", methods=["GET", "POST"]) def handle_schema(level): group = request.args.get("group") if request.method == "GET": return jsonify(config_manager.get_schema(level, group)) else: data = request.json config_manager.update_schema(level, data, group) return jsonify({"status": "success"}) @app.route("/api/scripts/") def get_scripts(group): return jsonify(config_manager.list_scripts(group)) @app.route("/api/working-directory", methods=["POST"]) def set_working_directory(): data = request.json if not data: return jsonify({"status": "error", "message": "No data provided"}) path = data.get("path") group = data.get("group") if not path or not group: return jsonify( { "status": "error", "message": f"Missing required fields. Path: {path}, Group: {group}", } ) print(f"Setting working directory - Path: {path}, Group: {group}") # Debug line return jsonify(config_manager.set_work_dir(group, path)) @app.route("/api/working-directory/", methods=["GET"]) def get_working_directory(group): path = config_manager.get_work_dir(group) return jsonify({"path": path, "status": "success" if path else "not_set"}) @app.route("/api/browse-directories") def browse_directories(): import tkinter as tk from tkinter import filedialog # Obtener el directorio inicial current_dir = request.args.get("current_path") if not current_dir or not os.path.exists(current_dir): current_dir = os.path.dirname(os.path.abspath(__file__)) # Crear y configurar la ventana principal de tkinter root = tk.Tk() root.attributes("-topmost", True) # Mantener la ventana siempre arriba root.withdraw() # Abrir el diálogo de selección de directorio directory = filedialog.askdirectory( initialdir=current_dir, title="Seleccionar Directorio de Trabajo" ) # Destruir la ventana de tkinter root.destroy() if directory: return jsonify({"status": "success", "path": directory}) return jsonify({"status": "cancelled"}) @app.route("/api/logs", methods=["GET", "DELETE"]) def handle_logs(): if request.method == "GET": return jsonify({"logs": config_manager.read_log()}) else: # DELETE success = config_manager.clear_log() return jsonify({"status": "success" if success else "error"}) @app.route("/api/group-description/", methods=["GET", "POST"]) def handle_group_description(group): description_path = os.path.join( config_manager.script_groups_path, group, "description.json" ) if request.method == "GET": try: with open(description_path, "r", encoding="utf-8") as f: return jsonify(json.load(f)) except FileNotFoundError: return jsonify( { "name": group, "description": "Sin descripción", "version": "1.0", "author": "Unknown", } ) else: # POST try: data = request.json os.makedirs(os.path.dirname(description_path), exist_ok=True) with open(description_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) return jsonify({"status": "success"}) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route("/api/directory-history/") def get_directory_history(group): history = config_manager.get_directory_history(group) return jsonify(history) if __name__ == "__main__": app.run(debug=True)