commit d92eeb8c756920ebadaa9de019be3732518c24ef Author: Miguel Date: Sat Feb 8 23:38:04 2025 +0100 Primera version funcionando diff --git a/__pycache__/config_manager.cpython-310.pyc b/__pycache__/config_manager.cpython-310.pyc new file mode 100644 index 0000000..f48bd70 Binary files /dev/null and b/__pycache__/config_manager.cpython-310.pyc differ diff --git a/app.py b/app.py new file mode 100644 index 0000000..681807f --- /dev/null +++ b/app.py @@ -0,0 +1,151 @@ +from flask import Flask, render_template, request, jsonify +from flask_sock import Sock +from config_manager import ConfigurationManager +import os + +app = Flask(__name__) +sock = Sock(app) +config_manager = ConfigurationManager() + +# Lista global para mantener las conexiones WebSocket activas +websocket_connections = set() + + +@sock.route("/ws") +def handle_websocket(ws): + try: + websocket_connections.add(ws) + while True: + message = ws.receive() + if message: + broadcast_message(message) + except Exception as e: + print(f"WebSocket error: {e}") + finally: + websocket_connections.remove(ws) + + +def broadcast_message(message): + """Envía un mensaje a todas las conexiones WebSocket activas.""" + dead_connections = set() + for ws in websocket_connections: + try: + ws.send(message) + except Exception: + dead_connections.add(ws) + + # Limpiar conexiones muertas + websocket_connections.difference_update(dead_connections) + + +@app.route("/api/execute_script", methods=["POST"]) +def execute_script(): + try: + script_group = request.json["group"] + script_name = request.json["script"] + + # Ejecutar el script y obtener resultado + result = config_manager.execute_script( + script_group, script_name, broadcast_message + ) + + return jsonify(result) + except Exception as e: + error_msg = f"Error ejecutando script: {str(e)}" + broadcast_message(error_msg) + return jsonify({"error": error_msg}) + + +@app.route("/") +def index(): + script_groups = config_manager.get_script_groups() + return render_template("index.html", script_groups=script_groups) + + +@app.route("/api/config/", methods=["GET", "POST"]) +def handle_config(level): + group = request.args.get("group") + if request.method == "GET": + try: + return jsonify(config_manager.get_config(level, group)) + except FileNotFoundError: + return jsonify({}) + else: + data = request.json + config_manager.update_config(level, data, group) + return jsonify({"status": "success"}) + + +@app.route("/api/schema/", methods=["GET", "POST"]) +def handle_schema(level): + group = request.args.get("group") + if request.method == "GET": + return jsonify(config_manager.get_schema(level, group)) + else: + data = request.json + config_manager.update_schema(level, data, group) + return jsonify({"status": "success"}) + + +@app.route("/api/scripts/") +def get_scripts(group): + return jsonify(config_manager.list_scripts(group)) + + +@app.route("/api/working-directory", methods=["POST"]) +def set_working_directory(): + data = request.json + if not data: + return jsonify({"status": "error", "message": "No data provided"}) + + path = data.get("path") + group = data.get("group") + + if not path or not group: + return jsonify( + { + "status": "error", + "message": f"Missing required fields. Path: {path}, Group: {group}", + } + ) + + print(f"Setting working directory - Path: {path}, Group: {group}") # Debug line + return jsonify(config_manager.set_work_dir(group, path)) + + +@app.route("/api/working-directory/", methods=["GET"]) +def get_working_directory(group): + path = config_manager.get_work_dir(group) + return jsonify({"path": path, "status": "success" if path else "not_set"}) + + +@app.route("/api/browse-directories") +def browse_directories(): + import tkinter as tk + from tkinter import filedialog + + # Obtener el directorio inicial + current_dir = request.args.get("current_path") + if not current_dir or not os.path.exists(current_dir): + current_dir = os.path.dirname(os.path.abspath(__file__)) + + # Crear y configurar la ventana principal de tkinter + root = tk.Tk() + root.attributes("-topmost", True) # Mantener la ventana siempre arriba + root.withdraw() + + # Abrir el diálogo de selección de directorio + directory = filedialog.askdirectory( + initialdir=current_dir, title="Seleccionar Directorio de Trabajo" + ) + + # Destruir la ventana de tkinter + root.destroy() + + if directory: + return jsonify({"status": "success", "path": directory}) + return jsonify({"status": "cancelled"}) + + +if __name__ == "__main__": + app.run(debug=True) diff --git a/backend/script_groups/esquema.json b/backend/script_groups/esquema.json new file mode 100644 index 0000000..16cbb07 --- /dev/null +++ b/backend/script_groups/esquema.json @@ -0,0 +1,20 @@ +{ + "properties": { + "batch_size": { + "description": "N\u00c3\u00bamero de elementos a procesar por lote", + "title": "Tama\u00c3\u00b1o de Lote", + "type": "number" + }, + "input_dir": { + "description": "Ruta al directorio de archivos de entrada", + "title": "Directorio de Entrada", + "type": "string" + }, + "output_dir": { + "description": "Ruta al directorio para archivos generados", + "title": "Directorio de Salida", + "type": "string" + } + }, + "type": "object" +} \ No newline at end of file diff --git a/backend/script_groups/example_group/data.json b/backend/script_groups/example_group/data.json new file mode 100644 index 0000000..680b684 --- /dev/null +++ b/backend/script_groups/example_group/data.json @@ -0,0 +1,5 @@ +{ + "input_dir": "D:/Datos/Entrada", + "output_dir": "D:/Datos/Salida", + "batch_size": 50 +} diff --git a/backend/script_groups/example_group/esquema.json b/backend/script_groups/example_group/esquema.json new file mode 100644 index 0000000..74e1f04 --- /dev/null +++ b/backend/script_groups/example_group/esquema.json @@ -0,0 +1,21 @@ +{ + "type": "object", + "properties": { + "project_name": { + "type": "string", + "title": "Nombre del Proyecto", + "description": "Identificador único del proyecto" + }, + "process_type": { + "type": "string", + "title": "Tipo de Proceso", + "enum": ["basic", "advanced", "full"], + "description": "Nivel de procesamiento a aplicar" + }, + "debug_mode": { + "type": "boolean", + "title": "Modo Debug", + "description": "Activar logging detallado" + } + } +} diff --git a/backend/script_groups/example_group/work_dir.json b/backend/script_groups/example_group/work_dir.json new file mode 100644 index 0000000..92dc8d3 --- /dev/null +++ b/backend/script_groups/example_group/work_dir.json @@ -0,0 +1,3 @@ +{ + "path": "C:/Estudio" +} \ No newline at end of file diff --git a/backend/script_groups/example_group/x1.py b/backend/script_groups/example_group/x1.py new file mode 100644 index 0000000..6dfde88 --- /dev/null +++ b/backend/script_groups/example_group/x1.py @@ -0,0 +1,28 @@ +""" +Script de prueba que imprime las configuraciones y realiza una tarea simple. +Este script demuestra cómo acceder a las configuraciones de los tres niveles. +""" + +import json +import os +import time + +def main(): + # Cargar configuraciones desde variable de entorno + configs = json.loads(os.environ.get('SCRIPT_CONFIGS', '{}')) + + print("=== Ejecutando Script de Prueba 1 ===") + print("\nConfiguraciones cargadas:") + print("Nivel 1:", json.dumps(configs.get('level1', {}), indent=2)) + print("Nivel 2:", json.dumps(configs.get('level2', {}), indent=2)) + print("Nivel 3:", json.dumps(configs.get('level3', {}), indent=2)) + + print("\nSimulando procesamiento...") + for i in range(5): + time.sleep(1) + print(f"Progreso: {(i+1)*20}%") + + print("\n¡Proceso completado!") + +if __name__ == '__main__': + main() diff --git a/backend/script_groups/example_group/x2.py b/backend/script_groups/example_group/x2.py new file mode 100644 index 0000000..cd17f26 --- /dev/null +++ b/backend/script_groups/example_group/x2.py @@ -0,0 +1,36 @@ +""" +Script de prueba que simula un proceso de análisis de datos. +Demuestra el manejo de errores y logging. +""" + +import json +import os +import time +import random + +def main(): + configs = json.loads(os.environ.get('SCRIPT_CONFIGS', '{}')) + + print("=== Ejecutando Script de Prueba 2 ===") + print("\nIniciando análisis de datos simulado...") + + # Simular proceso con posible error + try: + for i in range(3): + time.sleep(1) + print(f"Analizando lote {i+1}...") + + # Simular error aleatorio + if random.random() < 0.3: + raise Exception("Error simulado en el procesamiento") + + print(f"Lote {i+1} completado exitosamente") + + print("\n¡Análisis completado sin errores!") + + except Exception as e: + print(f"\nERROR: {str(e)}") + print("El proceso se detuvo debido a un error") + +if __name__ == '__main__': + main() diff --git a/claude_file_organizer.py b/claude_file_organizer.py new file mode 100644 index 0000000..2889dae --- /dev/null +++ b/claude_file_organizer.py @@ -0,0 +1,171 @@ +import os +import shutil +from pathlib import Path +import re + +class ClaudeProjectOrganizer: + def __init__(self): + self.source_dir = Path.cwd() + self.claude_dir = self.source_dir / 'claude' + self.file_mapping = {} + + def should_skip_directory(self, dir_name): + skip_dirs = {'.git', '__pycache__', 'venv', 'env', '.pytest_cache', '.vscode', 'claude'} + return dir_name in skip_dirs + + def get_comment_prefix(self, file_extension): + """Determina el prefijo de comentario según la extensión del archivo""" + comment_styles = { + '.py': '#', + '.js': '//', + '.css': '/*', + '.html': '', + } + return comment_suffixes.get(file_extension.lower(), '') + + def normalize_path(self, path_str: str) -> str: + """Normaliza la ruta usando forward slashes""" + return str(path_str).replace('\\', '/') + + def check_existing_path_comment(self, content: str, normalized_path: str, comment_prefix: str) -> bool: + """Verifica si ya existe un comentario con la ruta en el archivo""" + # Escapar caracteres especiales en el prefijo de comentario para regex + escaped_prefix = re.escape(comment_prefix) + + # Crear patrones para buscar tanto forward como backward slashes + forward_pattern = f"{escaped_prefix}\\s*{re.escape(normalized_path)}\\b" + backward_path = normalized_path.replace('/', '\\\\') # Doble backslash para el patrón + backward_pattern = f"{escaped_prefix}\\s*{re.escape(backward_path)}" + + # Buscar en las primeras líneas del archivo + first_lines = content.split('\n')[:5] + for line in first_lines: + if (re.search(forward_pattern, line) or + re.search(backward_pattern, line)): + return True + return False + + def add_path_comment(self, file_path: Path, content: str) -> str: + """Agrega un comentario con la ruta al inicio del archivo si no existe""" + relative_path = file_path.relative_to(self.source_dir) + normalized_path = self.normalize_path(relative_path) + comment_prefix = self.get_comment_prefix(file_path.suffix) + + if comment_prefix is None: + return content + + comment_suffix = self.get_comment_suffix(file_path.suffix) + + # Verificar si ya existe el comentario + if self.check_existing_path_comment(content, normalized_path, comment_prefix): + print(f" - Comentario de ruta ya existe en {file_path}") + return content + + path_comment = f"{comment_prefix} {normalized_path}{comment_suffix}\n" + + # Para archivos HTML, insertar después del doctype si existe + if file_path.suffix.lower() == '.html': + if content.lower().startswith('') + 1 + return content[:doctype_end] + '\n' + path_comment + content[doctype_end:] + + return path_comment + content + + def clean_claude_directory(self): + if self.claude_dir.exists(): + shutil.rmtree(self.claude_dir) + self.claude_dir.mkdir() + print(f"Directorio claude limpiado: {self.claude_dir}") + + def copy_files(self): + self.clean_claude_directory() + + for root, dirs, files in os.walk(self.source_dir): + dirs[:] = [d for d in dirs if not self.should_skip_directory(d)] + current_path = Path(root) + + for file in files: + file_path = current_path / file + + if file.endswith(('.py', '.js', '.css', '.html', '.json', '.yml', '.yaml', + '.tsx', '.ts', '.jsx', '.scss', '.less')): + target_path = self.claude_dir / file + + # Si el archivo ya existe en el directorio claude, agregar un sufijo numérico + if target_path.exists(): + base = target_path.stem + ext = target_path.suffix + counter = 1 + while target_path.exists(): + target_path = self.claude_dir / f"{base}_{counter}{ext}" + counter += 1 + + try: + # Leer el contenido del archivo + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Agregar el comentario con la ruta si no existe + modified_content = self.add_path_comment(file_path, content) + + # Escribir el nuevo contenido + with open(target_path, 'w', encoding='utf-8', newline='\n') as f: + f.write(modified_content) + + self.file_mapping[str(file_path)] = target_path.name + print(f"Copiado: {file_path} -> {target_path}") + + except UnicodeDecodeError: + print(f"Advertencia: No se pudo procesar {file_path} como texto. Copiando sin modificar...") + shutil.copy2(file_path, target_path) + except Exception as e: + print(f"Error procesando {file_path}: {str(e)}") + + def generate_tree_report(self): + """Genera el reporte en formato árbol visual""" + report = ["Estructura del proyecto original:\n"] + + def add_to_report(path, prefix="", is_last=True): + report.append(prefix + ("└── " if is_last else "├── ") + path.name) + + if path.is_dir() and not self.should_skip_directory(path.name): + children = sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name)) + children = [c for c in children if not (c.is_dir() and self.should_skip_directory(c.name))] + + for i, child in enumerate(children): + is_last_child = i == len(children) - 1 + new_prefix = prefix + (" " if is_last else "│ ") + add_to_report(child, new_prefix, is_last_child) + + add_to_report(self.source_dir) + + report_path = self.claude_dir / "project_structure.txt" + with open(report_path, "w", encoding="utf-8") as f: + f.write("\n".join(report)) + print(f"\nReporte generado en: {report_path}") + +def main(): + try: + print("Iniciando organización de archivos para Claude...") + organizer = ClaudeProjectOrganizer() + organizer.copy_files() + organizer.generate_tree_report() + print("\n¡Proceso completado exitosamente!") + except Exception as e: + print(f"\nError durante la ejecución: {str(e)}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/config_manager.py b/config_manager.py new file mode 100644 index 0000000..e58bf81 --- /dev/null +++ b/config_manager.py @@ -0,0 +1,261 @@ +import os +import json +import subprocess +import re +from typing import Dict, Any, List + + +class ConfigurationManager: + def __init__(self): + self.base_path = os.path.dirname(os.path.abspath(__file__)) + self.data_path = os.path.join(self.base_path, "data") + self.script_groups_path = os.path.join( + self.base_path, "backend", "script_groups" + ) + self.working_directory = None + + def set_working_directory(self, path: str) -> Dict[str, str]: + """Set and validate working directory.""" + if not os.path.exists(path): + return {"status": "error", "message": "Directory does not exist"} + + self.working_directory = path + + # Create default data.json if it doesn't exist + data_path = os.path.join(path, "data.json") + if not os.path.exists(data_path): + with open(data_path, "w") as f: + json.dump({}, f, indent=2) + + return {"status": "success", "path": path} + + def get_script_groups(self) -> List[str]: + """Returns list of available script groups.""" + return [ + d + for d in os.listdir(self.script_groups_path) + if os.path.isdir(os.path.join(self.script_groups_path, d)) + ] + + def get_config(self, level: str, group: str = None) -> Dict[str, Any]: + """Get configuration for specified level.""" + if level == "1": + path = os.path.join(self.data_path, "data.json") + elif level == "2": + path = os.path.join(self.script_groups_path, group, "data.json") + elif level == "3": + if not self.working_directory: + return {} # Return empty config if working directory not set + path = os.path.join(self.working_directory, "data.json") + + try: + with open(path, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} # Return empty config if file doesn't exist + + def get_schema(self, level: str, group: str = None) -> Dict[str, Any]: + """Get schema for specified level.""" + # Clean level parameter (remove -form suffix if present) + level = level.split("-")[0] + + try: + if level == "1": + path = os.path.join(self.data_path, "esquema.json") + elif level == "2": + path = os.path.join(self.script_groups_path, "esquema.json") + elif level == "3": + if not group: + return {} # Return empty schema if no group is specified + path = os.path.join(self.script_groups_path, group, "esquema.json") + else: + return {} # Return empty schema for invalid levels + + with open(path, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} # Return empty schema if file doesn't exist + except json.JSONDecodeError: + return {} # Return empty schema if file is invalid JSON + + def update_config( + self, level: str, data: Dict[str, Any], group: str = None + ) -> Dict[str, str]: + """Update configuration for specified level.""" + if level == "3" and not self.working_directory: + return {"status": "error", "message": "Working directory not set"} + + if level == "1": + path = os.path.join(self.data_path, "data.json") + elif level == "2": + path = os.path.join(self.script_groups_path, group, "data.json") + elif level == "3": + path = os.path.join(self.working_directory, "data.json") + + with open(path, "w") as f: + json.dump(data, f, indent=2) + + def update_schema( + self, level: str, data: Dict[str, Any], group: str = None + ) -> None: + """Update schema for specified level.""" + if level == "1": + path = os.path.join(self.data_path, "esquema.json") + elif level == "2": + path = os.path.join(self.script_groups_path, "esquema.json") + elif level == "3": + path = os.path.join(self.script_groups_path, group, "esquema.json") + + with open(path, "w") as f: + json.dump(data, f, indent=2) + + def list_scripts(self, group: str) -> List[Dict[str, str]]: + """List all scripts in a group with their descriptions.""" + try: + scripts_dir = os.path.join(self.script_groups_path, group) + scripts = [] + + if not os.path.exists(scripts_dir): + print(f"Directory not found: {scripts_dir}") + return [] + + for file in os.listdir(scripts_dir): + # Modificar la condición para incluir cualquier archivo .py + if file.endswith(".py"): + path = os.path.join(scripts_dir, file) + description = self._extract_script_description(path) + print( + f"Found script: {file} with description: {description}" + ) # Debug line + scripts.append({"name": file, "description": description}) + + print(f"Total scripts found: {len(scripts)}") # Debug line + return scripts + except Exception as e: + print(f"Error listing scripts: {str(e)}") # Debug line + return [] + + def _extract_script_description(self, script_path: str) -> str: + """Extract description from script's docstring or initial comments.""" + try: + with open(script_path, "r", encoding="utf-8") as f: + content = f.read() + + # Try to find docstring + docstring_match = re.search(r'"""(.*?)"""', content, re.DOTALL) + if docstring_match: + return docstring_match.group(1).strip() + + # Try to find initial comment + comment_match = re.search(r"^#\s*(.*?)$", content, re.MULTILINE) + if comment_match: + return comment_match.group(1).strip() + + return "No description available" + except Exception as e: + print( + f"Error extracting description from {script_path}: {str(e)}" + ) # Debug line + return "Error reading script description" + + def execute_script( + self, group: str, script_name: str, broadcast_fn=None + ) -> Dict[str, Any]: + """Execute script with real-time logging via WebSocket broadcast function.""" + script_path = os.path.join(self.script_groups_path, group, script_name) + if not os.path.exists(script_path): + return {"error": "Script not found"} + + # Obtener el directorio de trabajo del grupo + working_dir = self.get_work_dir(group) + if not working_dir: + return {"error": "Working directory not set for this script group"} + + configs = { + "level1": self.get_config("1"), + "level2": self.get_config("2", group), + "level3": self.get_config("3") if self.working_directory else {}, + } + + try: + if broadcast_fn: + broadcast_fn(f"\nIniciando ejecución de {script_name}...\n") + + process = subprocess.Popen( + ["python", script_path], + cwd=working_dir or self.base_path, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + env=dict(os.environ, **{"SCRIPT_CONFIGS": json.dumps(configs)}), + ) + + output = [] + while True: + line = process.stdout.readline() + if not line and process.poll() is not None: + break + if line: + output.append(line) + if broadcast_fn: + broadcast_fn(line) + + stderr = process.stderr.read() + if stderr: + if broadcast_fn: + broadcast_fn(f"\nERROR: {stderr}\n") + output.append(f"ERROR: {stderr}") + + if broadcast_fn: + broadcast_fn("\nEjecución completada.\n") + + return { + "output": "".join(output), + "error": stderr if stderr else None, + "status": "success" if process.returncode == 0 else "error", + } + except Exception as e: + error_msg = str(e) + if broadcast_fn: + broadcast_fn(f"\nError inesperado: {error_msg}\n") + return {"error": error_msg} + + def get_work_dir(self, group: str) -> str: + """Get working directory path for a script group.""" + work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") + try: + with open(work_dir_path, "r") as f: + data = json.load(f) + path = data.get("path", "") + # Actualizar la variable de instancia si hay una ruta válida + if path and os.path.exists(path): + self.working_directory = path + return path + except (FileNotFoundError, json.JSONDecodeError): + return "" + + def set_work_dir(self, group: str, path: str) -> Dict[str, str]: + """Set working directory path for a script group.""" + if not os.path.exists(path): + return {"status": "error", "message": "Directory does not exist"} + + work_dir_path = os.path.join(self.script_groups_path, group, "work_dir.json") + + try: + # Guardar la ruta en work_dir.json + with open(work_dir_path, "w") as f: + json.dump({"path": path}, f, indent=2) + + # Actualizar la variable de instancia + self.working_directory = path + + # Crear data.json en el directorio de trabajo si no existe + data_path = os.path.join(path, "data.json") + if not os.path.exists(data_path): + with open(data_path, "w") as f: + json.dump({}, f, indent=2) + + return {"status": "success", "path": path} + except Exception as e: + return {"status": "error", "message": str(e)} diff --git a/data/data.json b/data/data.json new file mode 100644 index 0000000..e599297 --- /dev/null +++ b/data/data.json @@ -0,0 +1,6 @@ +{ + "api_key": "your-api-key-here", + "model": "gpt-3.5-turbo", + "max_tokens": 1000, + "temperature": 0.7 +} diff --git a/data/esquema.json b/data/esquema.json new file mode 100644 index 0000000..88b52aa --- /dev/null +++ b/data/esquema.json @@ -0,0 +1,32 @@ +{ + "properties": { + "api_key": { + "description": "Tu clave de API para servicios externos", + "title": "API Key", + "type": "string" + }, + "max_tokens": { + "description": "N\u00c3\u00bamero m\u00c3\u00a1ximo de tokens por respuesta", + "title": "Tokens M\u00c3\u00a1ximos", + "type": "number" + }, + "model": { + "description": "Modelo de lenguaje a utilizar", + "enum": [ + "gpt-3.5-turbo", + "gpt-4", + "claude-v1" + ], + "title": "Modelo LLM", + "type": "string" + }, + "temperature": { + "description": "Creatividad de las respuestas (0-1)", + "maximum": 1, + "minimum": 0, + "title": "Temperatura", + "type": "number" + } + }, + "type": "object" +} \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..4fd9867 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,504 @@ + + + + + + Script Parameter Manager + + + + +
+ +
+
+

Configuración General (Nivel 1)

+ +
+ +
+ + +
+

Grupo de Scripts

+ +
+ + +
+
+

Configuración del Grupo (Nivel 2)

+ +
+ +
+ + +
+

Directorio de Trabajo

+
+
+ + +
+ +
+
+ + +
+
+

Configuración del Directorio (Nivel 3)

+ +
+ +
+ + +
+

Scripts Disponibles

+
+
+ + +
+
+

Logs

+ +
+
+
+
+
+ + + + +