ParamManagerScripts/lib/python_launcher_manager.py

900 lines
40 KiB
Python

import os
import json
import subprocess
import sys
import threading
import time
from typing import Dict, Any, List, Optional
from datetime import datetime
import uuid
class PythonLauncherManager:
def __init__(self, data_path: str):
self.data_path = data_path
self.launcher_config_path = os.path.join(data_path, "python_launcher_projects.json")
self.favorites_path = os.path.join(data_path, "python_launcher_favorites.json")
self.history_path = os.path.join(data_path, "python_launcher_history.json")
self.script_metadata_path = os.path.join(data_path, "python_launcher_script_metadata.json")
# Procesos en ejecución para Python (servidores, etc.)
self.running_processes = {}
self.process_lock = threading.Lock()
# Inicializar archivos si no existen
self._initialize_files()
def _initialize_files(self):
"""Crear archivos de configuración por defecto si no existen"""
# Inicializar python_launcher_projects.json
if not os.path.exists(self.launcher_config_path):
default_config = {
"version": "1.0",
"projects": [],
"categories": {
"MCP Servers": {
"color": "#3B82F6",
"icon": "🔌",
"subcategories": ["Anthropic", "Custom", "OpenAI"]
},
"Flask Apps": {
"color": "#10B981",
"icon": "🌐",
"subcategories": ["API", "Web App", "Microservice"]
},
"Scripts": {
"color": "#8B5CF6",
"icon": "📜",
"subcategories": ["Automatización", "Utiles", "Procesamiento"]
},
"Bots": {
"color": "#F59E0B",
"icon": "🤖",
"subcategories": ["Discord", "Telegram", "Slack"]
},
"Data Processing": {
"color": "#EF4444",
"icon": "📊",
"subcategories": ["ETL", "Analysis", "ML"]
},
"Otros": {
"color": "#6B7280",
"icon": "📁",
"subcategories": ["Misceláneos"]
}
},
"settings": {
"default_execution_directory": "project_directory",
"enable_argument_validation": True,
"max_history_entries": 100,
"auto_cleanup_days": 30,
"default_python_env": "base"
}
}
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2, ensure_ascii=False)
# Inicializar python_launcher_favorites.json
if not os.path.exists(self.favorites_path):
default_favorites = {"favorites": []}
with open(self.favorites_path, 'w', encoding='utf-8') as f:
json.dump(default_favorites, f, indent=2, ensure_ascii=False)
# Inicializar python_launcher_history.json
if not os.path.exists(self.history_path):
default_history = {
"history": [],
"settings": {
"max_entries": 100,
"auto_cleanup_days": 30,
"track_execution_time": True,
"track_arguments": True
}
}
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(default_history, f, indent=2, ensure_ascii=False)
# Inicializar python_launcher_script_metadata.json
if not os.path.exists(self.script_metadata_path):
default_metadata = {
"version": "1.0",
"script_metadata": {}
}
with open(self.script_metadata_path, 'w', encoding='utf-8') as f:
json.dump(default_metadata, f, indent=2, ensure_ascii=False)
def get_python_projects(self) -> List[Dict[str, Any]]:
"""Obtener todos los proyectos Python"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get("projects", [])
except Exception as e:
print(f"Error loading Python projects: {e}")
return []
def get_python_project(self, project_id: str) -> Optional[Dict[str, Any]]:
"""Obtener un proyecto específico por ID"""
projects = self.get_python_projects()
for project in projects:
if project.get("id") == project_id:
return project
return None
def add_python_project(self, project_data: Dict[str, Any]) -> Dict[str, str]:
"""Agregar nuevo proyecto Python"""
try:
# Validar datos requeridos
required_fields = ["name", "directory"]
for field in required_fields:
if not project_data.get(field):
return {"status": "error", "message": f"Campo requerido: {field}"}
# Validar que el directorio existe
if not os.path.isdir(project_data["directory"]):
return {"status": "error", "message": "El directorio especificado no existe"}
# Generar ID único si no se proporciona
if not project_data.get("id"):
project_data["id"] = str(uuid.uuid4())[:8]
# Verificar que el ID no exista
if self.get_python_project(project_data["id"]):
return {"status": "error", "message": "Ya existe un proyecto con este ID"}
# Agregar campos por defecto
current_time = datetime.now().isoformat() + "Z"
project_data.setdefault("description", "")
project_data.setdefault("category", "Otros")
project_data.setdefault("version", "1.0")
project_data.setdefault("author", "")
project_data.setdefault("tags", [])
project_data.setdefault("python_env", "base") # Entorno Python por defecto
project_data.setdefault("created_date", current_time)
project_data["updated_date"] = current_time
# Cargar configuración y agregar proyecto
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
config["projects"].append(project_data)
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": "Proyecto agregado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error agregando proyecto: {str(e)}"}
def update_python_project(self, project_id: str, project_data: Dict[str, Any]) -> Dict[str, str]:
"""Actualizar proyecto existente"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# Buscar y actualizar el proyecto
project_found = False
for i, project in enumerate(config["projects"]):
if project["id"] == project_id:
# Mantener ID y fechas de creación
project_data["id"] = project_id
project_data["created_date"] = project.get("created_date", datetime.now().isoformat() + "Z")
project_data["updated_date"] = datetime.now().isoformat() + "Z"
config["projects"][i] = project_data
project_found = True
break
if not project_found:
return {"status": "error", "message": "Proyecto no encontrado"}
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": "Proyecto actualizado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error actualizando proyecto: {str(e)}"}
def delete_python_project(self, project_id: str) -> Dict[str, str]:
"""Eliminar proyecto Python"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# Filtrar el proyecto a eliminar
original_count = len(config["projects"])
config["projects"] = [p for p in config["projects"] if p["id"] != project_id]
if len(config["projects"]) == original_count:
return {"status": "error", "message": "Proyecto no encontrado"}
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# Limpiar metadatos y favoritos relacionados
self._cleanup_script_metadata_for_project(project_id)
self._cleanup_favorites_for_project(project_id)
return {"status": "success", "message": "Proyecto eliminado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error eliminando proyecto: {str(e)}"}
def get_project_scripts(self, project_id: str) -> List[Dict[str, Any]]:
"""Obtener scripts de un proyecto (solo .py visibles)"""
project = self.get_python_project(project_id)
if not project:
return []
project_dir = project["directory"]
if not os.path.isdir(project_dir):
return []
scripts = []
script_metadata = self._load_script_metadata()
# Buscar archivos .py en el directorio del proyecto
for filename in os.listdir(project_dir):
if filename.endswith('.py') and not filename.startswith('__'):
script_path = os.path.join(project_dir, filename)
if os.path.isfile(script_path):
# Obtener metadatos del script
metadata_key = f"{project_id}:{filename}"
metadata = script_metadata.get("script_metadata", {}).get(metadata_key, {})
# Solo mostrar scripts no ocultos
if not metadata.get("hidden", False):
scripts.append({
"filename": filename,
"display_name": metadata.get("display_name", filename.replace('.py', '')),
"description": metadata.get("description", ""),
"tags": metadata.get("tags", []),
"arguments": metadata.get("arguments", []),
"is_server": metadata.get("is_server", False), # Indica si es un servidor que corre en background
"server_port": metadata.get("server_port", ""),
"requires_background": metadata.get("requires_background", False)
})
return sorted(scripts, key=lambda x: x["display_name"])
def get_all_project_scripts(self, project_id: str) -> List[Dict[str, Any]]:
"""Obtener TODOS los scripts de un proyecto (incluyendo ocultos) para gestión"""
project = self.get_python_project(project_id)
if not project:
return []
project_dir = project["directory"]
if not os.path.isdir(project_dir):
return []
scripts = []
script_metadata = self._load_script_metadata()
# Buscar archivos .py en el directorio del proyecto
for filename in os.listdir(project_dir):
if filename.endswith('.py') and not filename.startswith('__'):
script_path = os.path.join(project_dir, filename)
if os.path.isfile(script_path):
# Obtener metadatos del script
metadata_key = f"{project_id}:{filename}"
metadata = script_metadata.get("script_metadata", {}).get(metadata_key, {})
scripts.append({
"filename": filename,
"display_name": metadata.get("display_name", filename.replace('.py', '')),
"description": metadata.get("description", ""),
"tags": metadata.get("tags", []),
"arguments": metadata.get("arguments", []),
"hidden": metadata.get("hidden", False),
"is_server": metadata.get("is_server", False),
"server_port": metadata.get("server_port", ""),
"requires_background": metadata.get("requires_background", False)
})
return sorted(scripts, key=lambda x: x["display_name"])
def get_script_metadata(self, project_id: str, script_name: str) -> Dict[str, Any]:
"""Obtener metadatos de un script específico"""
script_metadata = self._load_script_metadata()
metadata_key = f"{project_id}:{script_name}"
return script_metadata.get("script_metadata", {}).get(metadata_key, {})
def update_script_metadata(self, project_id: str, script_name: str, metadata: Dict[str, Any]) -> Dict[str, str]:
"""Actualizar metadatos de un script"""
try:
script_metadata = self._load_script_metadata()
metadata_key = f"{project_id}:{script_name}"
if "script_metadata" not in script_metadata:
script_metadata["script_metadata"] = {}
script_metadata["script_metadata"][metadata_key] = metadata
self._save_script_metadata(script_metadata)
return {"status": "success", "message": "Metadatos actualizados exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error actualizando metadatos: {str(e)}"}
def get_available_python_envs(self) -> List[Dict[str, str]]:
"""Obtener lista de entornos de Python/Miniconda disponibles"""
try:
envs = [{"name": "base", "display_name": "Base (Sistema)", "path": sys.executable}]
# Intentar encontrar Miniconda
miniconda_paths = [
r"C:\Users\migue\miniconda3",
r"C:\ProgramData\miniconda3",
r"C:\miniconda3",
os.path.expanduser("~/miniconda3"),
os.path.expanduser("~/anaconda3")
]
for base_path in miniconda_paths:
if os.path.exists(base_path):
envs_path = os.path.join(base_path, "envs")
if os.path.exists(envs_path):
for env_name in os.listdir(envs_path):
env_path = os.path.join(envs_path, env_name)
python_exe = os.path.join(env_path, "python.exe")
if os.path.exists(python_exe):
envs.append({
"name": env_name,
"display_name": f"{env_name} (Miniconda)",
"path": python_exe
})
break # Solo usar el primer Miniconda encontrado
return envs
except Exception as e:
print(f"Error getting Python environments: {e}")
return [{"name": "base", "display_name": "Base (Sistema)", "path": sys.executable}]
def execute_python_script(self, project_id: str, script_name: str, script_args: List[str],
broadcast_func, working_dir: str = None, run_in_background: bool = False) -> Dict[str, Any]:
"""Ejecutar script Python con argumentos opcionales"""
try:
project = self.get_python_project(project_id)
if not project:
return {"error": "Proyecto no encontrado"}
# Construir ruta del script
script_path = os.path.join(project["directory"], script_name)
if not os.path.exists(script_path):
return {"error": f"Script '{script_name}' no encontrado"}
# Determinar directorio de trabajo
if working_dir and os.path.isdir(working_dir):
work_dir = working_dir
else:
work_dir = project["directory"]
# Obtener ejecutable de Python
python_env = project.get("python_env", "base")
python_exe = self._get_python_executable(python_env)
# Construir comando
cmd = [python_exe, script_path] + script_args
# ID único para esta ejecución
execution_id = str(uuid.uuid4())[:8]
start_time = time.time()
broadcast_func(f"🚀 Ejecutando script: {script_name}")
broadcast_func(f"📁 Directorio: {work_dir}")
broadcast_func(f"🐍 Python: {python_exe}")
if script_args:
broadcast_func(f"⚙️ Argumentos: {' '.join(script_args)}")
# Agregar a historial
history_entry = {
"id": execution_id,
"project_id": project_id,
"script_name": script_name,
"arguments": script_args,
"working_directory": work_dir,
"python_env": python_env,
"timestamp": datetime.now().isoformat() + "Z",
"status": "running",
"execution_time": None
}
self._add_to_history(history_entry)
# Configurar proceso
if sys.platform == "win32":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
if run_in_background:
# Para procesos en background (servidores), crear ventana nueva
creationflags |= subprocess.CREATE_NEW_CONSOLE
else:
creationflags = 0
# Ejecutar proceso
process = subprocess.Popen(
cmd,
cwd=work_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
creationflags=creationflags if sys.platform == "win32" else None
)
# Guardar proceso en la lista de procesos activos
with self.process_lock:
self.running_processes[process.pid] = {
"pid": process.pid,
"project_id": project_id,
"script_name": script_name,
"start_time": datetime.now().isoformat() + "Z",
"execution_id": execution_id,
"working_directory": work_dir,
"is_background": run_in_background
}
broadcast_func(f"✅ Proceso iniciado con PID: {process.pid}")
if run_in_background:
# Para procesos en background, no esperamos la salida
broadcast_func(f"🔄 Script ejecutándose en segundo plano (PID: {process.pid})")
return {
"status": "success",
"message": f"Script '{script_name}' iniciado en segundo plano",
"execution_id": execution_id,
"pid": process.pid,
"background": True
}
else:
# Para scripts normales, leer salida en tiempo real
def read_output():
try:
for line in iter(process.stdout.readline, ''):
if line:
broadcast_func(line.rstrip())
except Exception as e:
broadcast_func(f"Error leyendo salida: {e}")
finally:
if process.stdout:
process.stdout.close()
# Iniciar lectura de salida en hilo separado
output_thread = threading.Thread(target=read_output, daemon=True)
output_thread.start()
# Monitorear finalización del proceso
def monitor_completion():
try:
return_code = process.wait()
end_time = time.time()
execution_time = end_time - start_time
# Actualizar historial
self._update_history_status(execution_id, return_code, execution_time)
# Remover de procesos activos
with self.process_lock:
if process.pid in self.running_processes:
del self.running_processes[process.pid]
if return_code == 0:
broadcast_func(f"✅ Script completado exitosamente (código: {return_code})")
else:
broadcast_func(f"❌ Script terminó con errores (código: {return_code})")
broadcast_func(f"⏱️ Tiempo de ejecución: {execution_time:.2f} segundos")
except Exception as e:
broadcast_func(f"Error monitoreando proceso: {e}")
# Iniciar monitoreo en hilo separado
monitor_thread = threading.Thread(target=monitor_completion, daemon=True)
monitor_thread.start()
return {
"status": "success",
"message": f"Script '{script_name}' ejecutándose...",
"execution_id": execution_id,
"pid": process.pid,
"background": False
}
except Exception as e:
error_msg = f"Error ejecutando script Python: {str(e)}"
broadcast_func(error_msg)
return {"error": error_msg}
def _get_python_executable(self, env_name: str) -> str:
"""Obtener ejecutable de Python para el entorno especificado"""
if env_name == "base":
return sys.executable
# Intentar encontrar entorno de conda en todas las ubicaciones posibles
miniconda_paths = [
r"C:\Users\migue\miniconda3",
r"C:\ProgramData\miniconda3",
r"C:\miniconda3",
os.path.expanduser("~/miniconda3"),
os.path.expanduser("~/anaconda3")
]
for base_path in miniconda_paths:
if os.path.exists(base_path):
env_path = os.path.join(base_path, "envs", env_name)
python_exe = os.path.join(env_path, "python.exe")
if os.path.exists(python_exe):
return python_exe
# Fallback al Python del sistema
print(f"Warning: Python environment '{env_name}' not found, using system Python")
return sys.executable
def _load_script_metadata(self) -> Dict[str, Any]:
"""Cargar metadatos de scripts desde archivo"""
try:
with open(self.script_metadata_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {"version": "1.0", "script_metadata": {}}
def _save_script_metadata(self, metadata: Dict[str, Any]):
"""Guardar metadatos de scripts en archivo"""
try:
with open(self.script_metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error saving script metadata: {e}")
def _cleanup_script_metadata_for_project(self, project_id: str):
"""Limpiar metadatos de scripts al eliminar un proyecto"""
try:
script_metadata = self._load_script_metadata()
if "script_metadata" in script_metadata:
# Filtrar metadatos que no pertenezcan al proyecto eliminado
script_metadata["script_metadata"] = {
k: v for k, v in script_metadata["script_metadata"].items()
if not k.startswith(f"{project_id}:")
}
self._save_script_metadata(script_metadata)
except Exception as e:
print(f"Error cleaning script metadata for project {project_id}: {e}")
def get_favorites(self) -> List[Dict[str, Any]]:
"""Obtener scripts favoritos"""
try:
with open(self.favorites_path, 'r', encoding='utf-8') as f:
favorites_data = json.load(f)
return favorites_data.get("favorites", [])
except Exception:
return []
def toggle_favorite(self, project_id: str, script_name: str) -> Dict[str, str]:
"""Agregar o quitar de favoritos"""
try:
favorites_data = {"favorites": self.get_favorites()}
# Buscar si ya está en favoritos
favorite_key = f"{project_id}:{script_name}"
existing_favorite = None
for i, fav in enumerate(favorites_data["favorites"]):
if fav.get("project_id") == project_id and fav.get("script_name") == script_name:
existing_favorite = i
break
if existing_favorite is not None:
# Quitar de favoritos
del favorites_data["favorites"][existing_favorite]
message = "Removido de favoritos"
is_favorite = False
else:
# Agregar a favoritos
project = self.get_python_project(project_id)
if project:
script_metadata = self.get_script_metadata(project_id, script_name)
favorites_data["favorites"].append({
"project_id": project_id,
"project_name": project["name"],
"script_name": script_name,
"display_name": script_metadata.get("display_name", script_name.replace('.py', '')),
"description": script_metadata.get("description", ""),
"added_date": datetime.now().isoformat() + "Z"
})
message = "Agregado a favoritos"
is_favorite = True
else:
return {"status": "error", "message": "Proyecto no encontrado"}
with open(self.favorites_path, 'w', encoding='utf-8') as f:
json.dump(favorites_data, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": message, "is_favorite": is_favorite}
except Exception as e:
return {"status": "error", "message": f"Error gestionando favoritos: {str(e)}"}
def get_history(self) -> List[Dict[str, Any]]:
"""Obtener historial de ejecuciones"""
try:
with open(self.history_path, 'r', encoding='utf-8') as f:
history_data = json.load(f)
return history_data.get("history", [])
except Exception:
return []
def clear_history(self) -> Dict[str, str]:
"""Limpiar historial de ejecuciones"""
try:
history_data = {
"history": [],
"settings": {
"max_entries": 100,
"auto_cleanup_days": 30,
"track_execution_time": True,
"track_arguments": True
}
}
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(history_data, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": "Historial limpiado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error limpiando historial: {str(e)}"}
def get_categories(self) -> Dict[str, Any]:
"""Obtener categorías disponibles"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get("categories", {})
except Exception:
return {}
def _add_to_history(self, entry: Dict[str, Any]):
"""Agregar entrada al historial"""
try:
history_data = {"history": self.get_history()}
# Agregar nueva entrada al inicio
history_data["history"].insert(0, entry)
# Mantener máximo de entradas
max_entries = 100
if len(history_data["history"]) > max_entries:
history_data["history"] = history_data["history"][:max_entries]
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(history_data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error adding to history: {e}")
def _cleanup_favorites_for_project(self, project_id: str):
"""Limpiar favoritos al eliminar un proyecto"""
try:
favorites_data = {"favorites": self.get_favorites()}
# Filtrar favoritos que no pertenezcan al proyecto eliminado
favorites_data["favorites"] = [
fav for fav in favorites_data["favorites"]
if fav.get("project_id") != project_id
]
with open(self.favorites_path, 'w', encoding='utf-8') as f:
json.dump(favorites_data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error cleaning favorites for project {project_id}: {e}")
def _update_history_status(self, execution_id: str, final_code: int, final_execution_time: float):
"""Actualizar estado final en el historial"""
try:
history_data = {"history": self.get_history()}
for entry in history_data["history"]:
if entry.get("id") == execution_id:
entry["status"] = "completed" if final_code == 0 else "error"
entry["return_code"] = final_code
entry["execution_time"] = final_execution_time
break
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(history_data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error updating history status: {e}")
def focus_process(self, pid: int) -> Dict[str, str]:
"""Intentar dar foco a un proceso por su PID (Windows)"""
try:
if sys.platform == "win32":
import ctypes
from ctypes import wintypes
def enum_windows_proc(hwnd, pid):
if ctypes.windll.user32.IsWindowVisible(hwnd):
process_id = wintypes.DWORD()
ctypes.windll.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(process_id))
if process_id.value == pid:
ctypes.windll.user32.SetForegroundWindow(hwnd)
return False # Detener enumeración
return True # Continuar enumeración
# Definir el tipo de callback
EnumWindowsProc = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM)
callback = EnumWindowsProc(enum_windows_proc)
ctypes.windll.user32.EnumWindows(callback, pid)
return {"status": "success", "message": f"Intentando dar foco al proceso {pid}"}
else:
return {"status": "info", "message": "Función de foco no disponible en esta plataforma"}
except Exception as e:
return {"status": "error", "message": f"Error dando foco al proceso: {str(e)}"}
def terminate_process(self, pid: int) -> Dict[str, str]:
"""Terminar un proceso por su PID"""
try:
with self.process_lock:
if pid in self.running_processes:
process_info = self.running_processes[pid]
del self.running_processes[pid]
# Intentar terminar el proceso
if sys.platform == "win32":
subprocess.run(["taskkill", "/F", "/PID", str(pid)],
capture_output=True, check=False)
else:
import signal
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
pass # Proceso ya terminado
return {
"status": "success",
"message": f"Proceso {pid} terminado ({process_info.get('script_name', 'N/A')})"
}
else:
return {"status": "error", "message": "Proceso no encontrado en la lista de procesos activos"}
except Exception as e:
return {"status": "error", "message": f"Error terminando proceso: {str(e)}"}
def get_running_processes(self) -> List[Dict[str, Any]]:
"""Obtener lista de procesos en ejecución"""
try:
with self.process_lock:
processes = []
dead_pids = []
for pid, info in self.running_processes.items():
# Verificar si el proceso sigue vivo
try:
if sys.platform == "win32":
result = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}"],
capture_output=True, text=True, check=False
)
if str(pid) not in result.stdout:
dead_pids.append(pid)
continue
else:
os.kill(pid, 0) # No mata el proceso, solo verifica si existe
except (ProcessLookupError, subprocess.SubprocessError):
dead_pids.append(pid)
continue
# Agregar información del proceso
project = self.get_python_project(info["project_id"])
processes.append({
"pid": pid,
"project_id": info["project_id"],
"project_name": project["name"] if project else "Proyecto no encontrado",
"script_name": info["script_name"],
"start_time": info["start_time"],
"execution_id": info["execution_id"],
"working_directory": info["working_directory"],
"is_background": info.get("is_background", False)
})
# Limpiar procesos muertos
for pid in dead_pids:
del self.running_processes[pid]
return processes
except Exception as e:
print(f"Error getting running processes: {e}")
return []
def get_markdown_files(self, project_id: str) -> List[Dict[str, Any]]:
"""Obtener archivos Markdown de un proyecto"""
try:
project = self.get_python_project(project_id)
if not project:
return []
project_dir = project["directory"]
if not os.path.isdir(project_dir):
return []
markdown_files = []
# Buscar archivos .md en el directorio del proyecto
for root, dirs, files in os.walk(project_dir):
# Excluir directorios comunes que no contienen documentación relevante
dirs[:] = [d for d in dirs if d not in ['.git', '__pycache__', '.vscode', 'node_modules']]
for filename in files:
if filename.lower().endswith('.md'):
file_path = os.path.join(root, filename)
relative_path = os.path.relpath(file_path, project_dir)
# Obtener información básica del archivo
try:
stat = os.stat(file_path)
size = stat.st_size
modified = datetime.fromtimestamp(stat.st_mtime).isoformat() + "Z"
# Intentar leer las primeras líneas para obtener el título
title = filename.replace('.md', '')
try:
with open(file_path, 'r', encoding='utf-8') as f:
first_line = f.readline().strip()
if first_line.startswith('#'):
title = first_line.lstrip('#').strip()
except Exception:
pass
markdown_files.append({
"filename": filename,
"relative_path": relative_path.replace('\\', '/'), # Normalizar separadores
"title": title,
"size": size,
"modified": modified
})
except Exception as e:
print(f"Error getting file info for {file_path}: {e}")
continue
# Ordenar por ruta relativa
return sorted(markdown_files, key=lambda x: x["relative_path"])
except Exception as e:
print(f"Error getting markdown files for project {project_id}: {e}")
return []
def read_markdown_file(self, project_id: str, relative_path: str) -> Dict[str, Any]:
"""Obtener contenido de un archivo Markdown"""
try:
project = self.get_python_project(project_id)
if not project:
return {"error": "Proyecto no encontrado"}
# Construir ruta completa y validar que esté dentro del proyecto
project_dir = os.path.abspath(project["directory"])
file_path = os.path.abspath(os.path.join(project_dir, relative_path))
# Verificar que el archivo esté dentro del directorio del proyecto (seguridad)
if not file_path.startswith(project_dir):
return {"error": "Acceso no autorizado al archivo"}
if not os.path.exists(file_path):
return {"error": "Archivo no encontrado"}
# Leer contenido del archivo
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Obtener información del archivo
stat = os.stat(file_path)
return {
"content": content,
"filename": os.path.basename(file_path),
"relative_path": relative_path,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat() + "Z"
}
except Exception as e:
return {"error": f"Error leyendo archivo: {str(e)}"}