import os import json import subprocess import sys import threading import time from typing import Dict, Any, List, Optional from datetime import datetime import uuid class PythonLauncherManager: def __init__(self, data_path: str): self.data_path = data_path self.launcher_config_path = os.path.join(data_path, "python_launcher_projects.json") self.favorites_path = os.path.join(data_path, "python_launcher_favorites.json") self.history_path = os.path.join(data_path, "python_launcher_history.json") self.script_metadata_path = os.path.join(data_path, "python_launcher_script_metadata.json") # Procesos en ejecución para Python (servidores, etc.) self.running_processes = {} self.process_lock = threading.Lock() # Inicializar archivos si no existen self._initialize_files() def _initialize_files(self): """Crear archivos de configuración por defecto si no existen""" # Inicializar python_launcher_projects.json if not os.path.exists(self.launcher_config_path): default_config = { "version": "1.0", "projects": [], "categories": { "MCP Servers": { "color": "#3B82F6", "icon": "🔌", "subcategories": ["Anthropic", "Custom", "OpenAI"] }, "Flask Apps": { "color": "#10B981", "icon": "🌐", "subcategories": ["API", "Web App", "Microservice"] }, "Scripts": { "color": "#8B5CF6", "icon": "📜", "subcategories": ["Automatización", "Utiles", "Procesamiento"] }, "Bots": { "color": "#F59E0B", "icon": "🤖", "subcategories": ["Discord", "Telegram", "Slack"] }, "Data Processing": { "color": "#EF4444", "icon": "📊", "subcategories": ["ETL", "Analysis", "ML"] }, "Otros": { "color": "#6B7280", "icon": "📁", "subcategories": ["Misceláneos"] } }, "settings": { "default_execution_directory": "project_directory", "enable_argument_validation": True, "max_history_entries": 100, "auto_cleanup_days": 30, "default_python_env": "base" } } with open(self.launcher_config_path, 'w', encoding='utf-8') as f: json.dump(default_config, f, indent=2, ensure_ascii=False) # Inicializar python_launcher_favorites.json if not os.path.exists(self.favorites_path): default_favorites = {"favorites": []} with open(self.favorites_path, 'w', encoding='utf-8') as f: json.dump(default_favorites, f, indent=2, ensure_ascii=False) # Inicializar python_launcher_history.json if not os.path.exists(self.history_path): default_history = { "history": [], "settings": { "max_entries": 100, "auto_cleanup_days": 30, "track_execution_time": True, "track_arguments": True } } with open(self.history_path, 'w', encoding='utf-8') as f: json.dump(default_history, f, indent=2, ensure_ascii=False) # Inicializar python_launcher_script_metadata.json if not os.path.exists(self.script_metadata_path): default_metadata = { "version": "1.0", "script_metadata": {} } with open(self.script_metadata_path, 'w', encoding='utf-8') as f: json.dump(default_metadata, f, indent=2, ensure_ascii=False) def get_python_projects(self) -> List[Dict[str, Any]]: """Obtener todos los proyectos Python""" try: with open(self.launcher_config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get("projects", []) except Exception as e: print(f"Error loading Python projects: {e}") return [] def get_python_project(self, project_id: str) -> Optional[Dict[str, Any]]: """Obtener un proyecto específico por ID""" projects = self.get_python_projects() for project in projects: if project.get("id") == project_id: return project return None def add_python_project(self, project_data: Dict[str, Any]) -> Dict[str, str]: """Agregar nuevo proyecto Python""" try: # Validar datos requeridos required_fields = ["name", "directory"] for field in required_fields: if not project_data.get(field): return {"status": "error", "message": f"Campo requerido: {field}"} # Validar que el directorio existe if not os.path.isdir(project_data["directory"]): return {"status": "error", "message": "El directorio especificado no existe"} # Generar ID único si no se proporciona if not project_data.get("id"): project_data["id"] = str(uuid.uuid4())[:8] # Verificar que el ID no exista if self.get_python_project(project_data["id"]): return {"status": "error", "message": "Ya existe un proyecto con este ID"} # Agregar campos por defecto current_time = datetime.now().isoformat() + "Z" project_data.setdefault("description", "") project_data.setdefault("category", "Otros") project_data.setdefault("version", "1.0") project_data.setdefault("author", "") project_data.setdefault("tags", []) project_data.setdefault("python_env", "base") # Entorno Python por defecto project_data.setdefault("created_date", current_time) project_data["updated_date"] = current_time # Cargar configuración y agregar proyecto with open(self.launcher_config_path, 'r', encoding='utf-8') as f: config = json.load(f) config["projects"].append(project_data) with open(self.launcher_config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) return {"status": "success", "message": "Proyecto agregado exitosamente"} except Exception as e: return {"status": "error", "message": f"Error agregando proyecto: {str(e)}"} def update_python_project(self, project_id: str, project_data: Dict[str, Any]) -> Dict[str, str]: """Actualizar proyecto existente""" try: with open(self.launcher_config_path, 'r', encoding='utf-8') as f: config = json.load(f) # Buscar y actualizar el proyecto project_found = False for i, project in enumerate(config["projects"]): if project["id"] == project_id: # Mantener ID y fechas de creación project_data["id"] = project_id project_data["created_date"] = project.get("created_date", datetime.now().isoformat() + "Z") project_data["updated_date"] = datetime.now().isoformat() + "Z" config["projects"][i] = project_data project_found = True break if not project_found: return {"status": "error", "message": "Proyecto no encontrado"} with open(self.launcher_config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) return {"status": "success", "message": "Proyecto actualizado exitosamente"} except Exception as e: return {"status": "error", "message": f"Error actualizando proyecto: {str(e)}"} def delete_python_project(self, project_id: str) -> Dict[str, str]: """Eliminar proyecto Python""" try: with open(self.launcher_config_path, 'r', encoding='utf-8') as f: config = json.load(f) # Filtrar el proyecto a eliminar original_count = len(config["projects"]) config["projects"] = [p for p in config["projects"] if p["id"] != project_id] if len(config["projects"]) == original_count: return {"status": "error", "message": "Proyecto no encontrado"} with open(self.launcher_config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) # Limpiar metadatos y favoritos relacionados self._cleanup_script_metadata_for_project(project_id) self._cleanup_favorites_for_project(project_id) return {"status": "success", "message": "Proyecto eliminado exitosamente"} except Exception as e: return {"status": "error", "message": f"Error eliminando proyecto: {str(e)}"} def get_project_scripts(self, project_id: str) -> List[Dict[str, Any]]: """Obtener scripts de un proyecto (solo .py visibles)""" project = self.get_python_project(project_id) if not project: return [] project_dir = project["directory"] if not os.path.isdir(project_dir): return [] scripts = [] script_metadata = self._load_script_metadata() # Buscar archivos .py en el directorio del proyecto for filename in os.listdir(project_dir): if filename.endswith('.py') and not filename.startswith('__'): script_path = os.path.join(project_dir, filename) if os.path.isfile(script_path): # Obtener metadatos del script metadata_key = f"{project_id}:{filename}" metadata = script_metadata.get("script_metadata", {}).get(metadata_key, {}) # Solo mostrar scripts no ocultos if not metadata.get("hidden", False): scripts.append({ "filename": filename, "display_name": metadata.get("display_name", filename.replace('.py', '')), "description": metadata.get("description", ""), "tags": metadata.get("tags", []), "arguments": metadata.get("arguments", []), "is_server": metadata.get("is_server", False), # Indica si es un servidor que corre en background "server_port": metadata.get("server_port", ""), "requires_background": metadata.get("requires_background", False) }) return sorted(scripts, key=lambda x: x["display_name"]) def get_all_project_scripts(self, project_id: str) -> List[Dict[str, Any]]: """Obtener TODOS los scripts de un proyecto (incluyendo ocultos) para gestión""" project = self.get_python_project(project_id) if not project: return [] project_dir = project["directory"] if not os.path.isdir(project_dir): return [] scripts = [] script_metadata = self._load_script_metadata() # Buscar archivos .py en el directorio del proyecto for filename in os.listdir(project_dir): if filename.endswith('.py') and not filename.startswith('__'): script_path = os.path.join(project_dir, filename) if os.path.isfile(script_path): # Obtener metadatos del script metadata_key = f"{project_id}:{filename}" metadata = script_metadata.get("script_metadata", {}).get(metadata_key, {}) scripts.append({ "filename": filename, "display_name": metadata.get("display_name", filename.replace('.py', '')), "description": metadata.get("description", ""), "tags": metadata.get("tags", []), "arguments": metadata.get("arguments", []), "hidden": metadata.get("hidden", False), "is_server": metadata.get("is_server", False), "server_port": metadata.get("server_port", ""), "requires_background": metadata.get("requires_background", False) }) return sorted(scripts, key=lambda x: x["display_name"]) def get_script_metadata(self, project_id: str, script_name: str) -> Dict[str, Any]: """Obtener metadatos de un script específico""" script_metadata = self._load_script_metadata() metadata_key = f"{project_id}:{script_name}" return script_metadata.get("script_metadata", {}).get(metadata_key, {}) def update_script_metadata(self, project_id: str, script_name: str, metadata: Dict[str, Any]) -> Dict[str, str]: """Actualizar metadatos de un script""" try: script_metadata = self._load_script_metadata() metadata_key = f"{project_id}:{script_name}" if "script_metadata" not in script_metadata: script_metadata["script_metadata"] = {} script_metadata["script_metadata"][metadata_key] = metadata self._save_script_metadata(script_metadata) return {"status": "success", "message": "Metadatos actualizados exitosamente"} except Exception as e: return {"status": "error", "message": f"Error actualizando metadatos: {str(e)}"} def get_available_python_envs(self) -> List[Dict[str, str]]: """Obtener lista de entornos de Python/Miniconda disponibles""" try: envs = [{"name": "base", "display_name": "Base (Sistema)", "path": sys.executable}] # Intentar encontrar Miniconda miniconda_paths = [ r"C:\Users\migue\miniconda3", r"C:\ProgramData\miniconda3", r"C:\miniconda3", os.path.expanduser("~/miniconda3"), os.path.expanduser("~/anaconda3") ] for base_path in miniconda_paths: if os.path.exists(base_path): envs_path = os.path.join(base_path, "envs") if os.path.exists(envs_path): for env_name in os.listdir(envs_path): env_path = os.path.join(envs_path, env_name) python_exe = os.path.join(env_path, "python.exe") if os.path.exists(python_exe): envs.append({ "name": env_name, "display_name": f"{env_name} (Miniconda)", "path": python_exe }) break # Solo usar el primer Miniconda encontrado return envs except Exception as e: print(f"Error getting Python environments: {e}") return [{"name": "base", "display_name": "Base (Sistema)", "path": sys.executable}] def execute_python_script(self, project_id: str, script_name: str, script_args: List[str], broadcast_func, working_dir: str = None, run_in_background: bool = False) -> Dict[str, Any]: """Ejecutar script Python con argumentos opcionales""" try: project = self.get_python_project(project_id) if not project: return {"error": "Proyecto no encontrado"} # Construir ruta del script script_path = os.path.join(project["directory"], script_name) if not os.path.exists(script_path): return {"error": f"Script '{script_name}' no encontrado"} # Determinar directorio de trabajo if working_dir and os.path.isdir(working_dir): work_dir = working_dir else: work_dir = project["directory"] # Obtener ejecutable de Python python_env = project.get("python_env", "base") python_exe = self._get_python_executable(python_env) # Construir comando cmd = [python_exe, script_path] + script_args # ID único para esta ejecución execution_id = str(uuid.uuid4())[:8] start_time = time.time() broadcast_func(f"🚀 Ejecutando script: {script_name}") broadcast_func(f"📁 Directorio: {work_dir}") broadcast_func(f"🐍 Python: {python_exe}") if script_args: broadcast_func(f"⚙️ Argumentos: {' '.join(script_args)}") # Agregar a historial history_entry = { "id": execution_id, "project_id": project_id, "script_name": script_name, "arguments": script_args, "working_directory": work_dir, "python_env": python_env, "timestamp": datetime.now().isoformat() + "Z", "status": "running", "execution_time": None } self._add_to_history(history_entry) # Configurar proceso if sys.platform == "win32": creationflags = subprocess.CREATE_NEW_PROCESS_GROUP if run_in_background: # Para procesos en background (servidores), crear ventana nueva creationflags |= subprocess.CREATE_NEW_CONSOLE else: creationflags = 0 # Ejecutar proceso process = subprocess.Popen( cmd, cwd=work_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, creationflags=creationflags if sys.platform == "win32" else None ) # Guardar proceso en la lista de procesos activos with self.process_lock: self.running_processes[process.pid] = { "pid": process.pid, "project_id": project_id, "script_name": script_name, "start_time": datetime.now().isoformat() + "Z", "execution_id": execution_id, "working_directory": work_dir, "is_background": run_in_background } broadcast_func(f"✅ Proceso iniciado con PID: {process.pid}") if run_in_background: # Para procesos en background, no esperamos la salida broadcast_func(f"🔄 Script ejecutándose en segundo plano (PID: {process.pid})") return { "status": "success", "message": f"Script '{script_name}' iniciado en segundo plano", "execution_id": execution_id, "pid": process.pid, "background": True } else: # Para scripts normales, leer salida en tiempo real def read_output(): try: for line in iter(process.stdout.readline, ''): if line: broadcast_func(line.rstrip()) except Exception as e: broadcast_func(f"Error leyendo salida: {e}") finally: if process.stdout: process.stdout.close() # Iniciar lectura de salida en hilo separado output_thread = threading.Thread(target=read_output, daemon=True) output_thread.start() # Monitorear finalización del proceso def monitor_completion(): try: return_code = process.wait() end_time = time.time() execution_time = end_time - start_time # Actualizar historial self._update_history_status(execution_id, return_code, execution_time) # Remover de procesos activos with self.process_lock: if process.pid in self.running_processes: del self.running_processes[process.pid] if return_code == 0: broadcast_func(f"✅ Script completado exitosamente (código: {return_code})") else: broadcast_func(f"❌ Script terminó con errores (código: {return_code})") broadcast_func(f"⏱️ Tiempo de ejecución: {execution_time:.2f} segundos") except Exception as e: broadcast_func(f"Error monitoreando proceso: {e}") # Iniciar monitoreo en hilo separado monitor_thread = threading.Thread(target=monitor_completion, daemon=True) monitor_thread.start() return { "status": "success", "message": f"Script '{script_name}' ejecutándose...", "execution_id": execution_id, "pid": process.pid, "background": False } except Exception as e: error_msg = f"Error ejecutando script Python: {str(e)}" broadcast_func(error_msg) return {"error": error_msg} def _get_python_executable(self, env_name: str) -> str: """Obtener ejecutable de Python para el entorno especificado""" if env_name == "base": return sys.executable # Intentar encontrar entorno de conda en todas las ubicaciones posibles miniconda_paths = [ r"C:\Users\migue\miniconda3", r"C:\ProgramData\miniconda3", r"C:\miniconda3", os.path.expanduser("~/miniconda3"), os.path.expanduser("~/anaconda3") ] for base_path in miniconda_paths: if os.path.exists(base_path): env_path = os.path.join(base_path, "envs", env_name) python_exe = os.path.join(env_path, "python.exe") if os.path.exists(python_exe): return python_exe # Fallback al Python del sistema print(f"Warning: Python environment '{env_name}' not found, using system Python") return sys.executable def _load_script_metadata(self) -> Dict[str, Any]: """Cargar metadatos de scripts desde archivo""" try: with open(self.script_metadata_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception: return {"version": "1.0", "script_metadata": {}} def _save_script_metadata(self, metadata: Dict[str, Any]): """Guardar metadatos de scripts en archivo""" try: with open(self.script_metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Error saving script metadata: {e}") def _cleanup_script_metadata_for_project(self, project_id: str): """Limpiar metadatos de scripts al eliminar un proyecto""" try: script_metadata = self._load_script_metadata() if "script_metadata" in script_metadata: # Filtrar metadatos que no pertenezcan al proyecto eliminado script_metadata["script_metadata"] = { k: v for k, v in script_metadata["script_metadata"].items() if not k.startswith(f"{project_id}:") } self._save_script_metadata(script_metadata) except Exception as e: print(f"Error cleaning script metadata for project {project_id}: {e}") def get_favorites(self) -> List[Dict[str, Any]]: """Obtener scripts favoritos""" try: with open(self.favorites_path, 'r', encoding='utf-8') as f: favorites_data = json.load(f) return favorites_data.get("favorites", []) except Exception: return [] def toggle_favorite(self, project_id: str, script_name: str) -> Dict[str, str]: """Agregar o quitar de favoritos""" try: favorites_data = {"favorites": self.get_favorites()} # Buscar si ya está en favoritos favorite_key = f"{project_id}:{script_name}" existing_favorite = None for i, fav in enumerate(favorites_data["favorites"]): if fav.get("project_id") == project_id and fav.get("script_name") == script_name: existing_favorite = i break if existing_favorite is not None: # Quitar de favoritos del favorites_data["favorites"][existing_favorite] message = "Removido de favoritos" is_favorite = False else: # Agregar a favoritos project = self.get_python_project(project_id) if project: script_metadata = self.get_script_metadata(project_id, script_name) favorites_data["favorites"].append({ "project_id": project_id, "project_name": project["name"], "script_name": script_name, "display_name": script_metadata.get("display_name", script_name.replace('.py', '')), "description": script_metadata.get("description", ""), "added_date": datetime.now().isoformat() + "Z" }) message = "Agregado a favoritos" is_favorite = True else: return {"status": "error", "message": "Proyecto no encontrado"} with open(self.favorites_path, 'w', encoding='utf-8') as f: json.dump(favorites_data, f, indent=2, ensure_ascii=False) return {"status": "success", "message": message, "is_favorite": is_favorite} except Exception as e: return {"status": "error", "message": f"Error gestionando favoritos: {str(e)}"} def get_history(self) -> List[Dict[str, Any]]: """Obtener historial de ejecuciones""" try: with open(self.history_path, 'r', encoding='utf-8') as f: history_data = json.load(f) return history_data.get("history", []) except Exception: return [] def clear_history(self) -> Dict[str, str]: """Limpiar historial de ejecuciones""" try: history_data = { "history": [], "settings": { "max_entries": 100, "auto_cleanup_days": 30, "track_execution_time": True, "track_arguments": True } } with open(self.history_path, 'w', encoding='utf-8') as f: json.dump(history_data, f, indent=2, ensure_ascii=False) return {"status": "success", "message": "Historial limpiado exitosamente"} except Exception as e: return {"status": "error", "message": f"Error limpiando historial: {str(e)}"} def get_categories(self) -> Dict[str, Any]: """Obtener categorías disponibles""" try: with open(self.launcher_config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get("categories", {}) except Exception: return {} def _add_to_history(self, entry: Dict[str, Any]): """Agregar entrada al historial""" try: history_data = {"history": self.get_history()} # Agregar nueva entrada al inicio history_data["history"].insert(0, entry) # Mantener máximo de entradas max_entries = 100 if len(history_data["history"]) > max_entries: history_data["history"] = history_data["history"][:max_entries] with open(self.history_path, 'w', encoding='utf-8') as f: json.dump(history_data, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Error adding to history: {e}") def _cleanup_favorites_for_project(self, project_id: str): """Limpiar favoritos al eliminar un proyecto""" try: favorites_data = {"favorites": self.get_favorites()} # Filtrar favoritos que no pertenezcan al proyecto eliminado favorites_data["favorites"] = [ fav for fav in favorites_data["favorites"] if fav.get("project_id") != project_id ] with open(self.favorites_path, 'w', encoding='utf-8') as f: json.dump(favorites_data, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Error cleaning favorites for project {project_id}: {e}") def _update_history_status(self, execution_id: str, final_code: int, final_execution_time: float): """Actualizar estado final en el historial""" try: history_data = {"history": self.get_history()} for entry in history_data["history"]: if entry.get("id") == execution_id: entry["status"] = "completed" if final_code == 0 else "error" entry["return_code"] = final_code entry["execution_time"] = final_execution_time break with open(self.history_path, 'w', encoding='utf-8') as f: json.dump(history_data, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Error updating history status: {e}") def focus_process(self, pid: int) -> Dict[str, str]: """Intentar dar foco a un proceso por su PID (Windows)""" try: if sys.platform == "win32": import ctypes from ctypes import wintypes def enum_windows_proc(hwnd, pid): if ctypes.windll.user32.IsWindowVisible(hwnd): process_id = wintypes.DWORD() ctypes.windll.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(process_id)) if process_id.value == pid: ctypes.windll.user32.SetForegroundWindow(hwnd) return False # Detener enumeración return True # Continuar enumeración # Definir el tipo de callback EnumWindowsProc = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM) callback = EnumWindowsProc(enum_windows_proc) ctypes.windll.user32.EnumWindows(callback, pid) return {"status": "success", "message": f"Intentando dar foco al proceso {pid}"} else: return {"status": "info", "message": "Función de foco no disponible en esta plataforma"} except Exception as e: return {"status": "error", "message": f"Error dando foco al proceso: {str(e)}"} def terminate_process(self, pid: int) -> Dict[str, str]: """Terminar un proceso por su PID""" try: with self.process_lock: if pid in self.running_processes: process_info = self.running_processes[pid] del self.running_processes[pid] # Intentar terminar el proceso if sys.platform == "win32": subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, check=False) else: import signal try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: pass # Proceso ya terminado return { "status": "success", "message": f"Proceso {pid} terminado ({process_info.get('script_name', 'N/A')})" } else: return {"status": "error", "message": "Proceso no encontrado en la lista de procesos activos"} except Exception as e: return {"status": "error", "message": f"Error terminando proceso: {str(e)}"} def get_running_processes(self) -> List[Dict[str, Any]]: """Obtener lista de procesos en ejecución""" try: with self.process_lock: processes = [] dead_pids = [] for pid, info in self.running_processes.items(): # Verificar si el proceso sigue vivo try: if sys.platform == "win32": result = subprocess.run( ["tasklist", "/FI", f"PID eq {pid}"], capture_output=True, text=True, check=False ) if str(pid) not in result.stdout: dead_pids.append(pid) continue else: os.kill(pid, 0) # No mata el proceso, solo verifica si existe except (ProcessLookupError, subprocess.SubprocessError): dead_pids.append(pid) continue # Agregar información del proceso project = self.get_python_project(info["project_id"]) processes.append({ "pid": pid, "project_id": info["project_id"], "project_name": project["name"] if project else "Proyecto no encontrado", "script_name": info["script_name"], "start_time": info["start_time"], "execution_id": info["execution_id"], "working_directory": info["working_directory"], "is_background": info.get("is_background", False) }) # Limpiar procesos muertos for pid in dead_pids: del self.running_processes[pid] return processes except Exception as e: print(f"Error getting running processes: {e}") return [] def get_markdown_files(self, project_id: str) -> List[Dict[str, Any]]: """Obtener archivos Markdown de un proyecto""" try: project = self.get_python_project(project_id) if not project: return [] project_dir = project["directory"] if not os.path.isdir(project_dir): return [] markdown_files = [] # Buscar archivos .md en el directorio del proyecto for root, dirs, files in os.walk(project_dir): # Excluir directorios comunes que no contienen documentación relevante dirs[:] = [d for d in dirs if d not in ['.git', '__pycache__', '.vscode', 'node_modules']] for filename in files: if filename.lower().endswith('.md'): file_path = os.path.join(root, filename) relative_path = os.path.relpath(file_path, project_dir) # Obtener información básica del archivo try: stat = os.stat(file_path) size = stat.st_size modified = datetime.fromtimestamp(stat.st_mtime).isoformat() + "Z" # Intentar leer las primeras líneas para obtener el título title = filename.replace('.md', '') try: with open(file_path, 'r', encoding='utf-8') as f: first_line = f.readline().strip() if first_line.startswith('#'): title = first_line.lstrip('#').strip() except Exception: pass markdown_files.append({ "filename": filename, "relative_path": relative_path.replace('\\', '/'), # Normalizar separadores "title": title, "size": size, "modified": modified }) except Exception as e: print(f"Error getting file info for {file_path}: {e}") continue # Ordenar por ruta relativa return sorted(markdown_files, key=lambda x: x["relative_path"]) except Exception as e: print(f"Error getting markdown files for project {project_id}: {e}") return [] def read_markdown_file(self, project_id: str, relative_path: str) -> Dict[str, Any]: """Obtener contenido de un archivo Markdown""" try: project = self.get_python_project(project_id) if not project: return {"error": "Proyecto no encontrado"} # Construir ruta completa y validar que esté dentro del proyecto project_dir = os.path.abspath(project["directory"]) file_path = os.path.abspath(os.path.join(project_dir, relative_path)) # Verificar que el archivo esté dentro del directorio del proyecto (seguridad) if not file_path.startswith(project_dir): return {"error": "Acceso no autorizado al archivo"} if not os.path.exists(file_path): return {"error": "Archivo no encontrado"} # Leer contenido del archivo with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Obtener información del archivo stat = os.stat(file_path) return { "content": content, "filename": os.path.basename(file_path), "relative_path": relative_path, "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat() + "Z" } except Exception as e: return {"error": f"Error leyendo archivo: {str(e)}"}