ParamManagerScripts/lib/launcher_manager.py

1018 lines
45 KiB
Python

import os
import json
import subprocess
import sys
import threading
import time
from typing import Dict, Any, List, Optional
from datetime import datetime
import uuid
class LauncherManager:
def __init__(self, data_path: str):
self.data_path = data_path
self.launcher_config_path = os.path.join(data_path, "launcher_scripts.json")
self.favorites_path = os.path.join(data_path, "launcher_favorites.json")
self.history_path = os.path.join(data_path, "launcher_history.json")
self.script_metadata_path = os.path.join(data_path, "launcher_script_metadata.json")
# Inicializar archivos si no existen
self._initialize_files()
def _initialize_files(self):
"""Crear archivos de configuración por defecto si no existen"""
# Inicializar launcher_scripts.json
if not os.path.exists(self.launcher_config_path):
default_config = {
"version": "1.0",
"groups": [],
"categories": {
"Herramientas": {
"color": "#3B82F6",
"icon": "🔧",
"subcategories": ["Generales", "Desarrollo", "Sistema"]
},
"Análisis": {
"color": "#10B981",
"icon": "📊",
"subcategories": ["Datos", "Estadísticas", "Visualización"]
},
"Utilidades": {
"color": "#8B5CF6",
"icon": "⚙️",
"subcategories": ["Archivos", "Texto", "Conversión"]
},
"Desarrollo": {
"color": "#F59E0B",
"icon": "💻",
"subcategories": ["Code", "Testing", "Deploy"]
},
"Visualización": {
"color": "#EF4444",
"icon": "📈",
"subcategories": ["Gráficos", "Reportes", "Dashboard"]
},
"Otros": {
"color": "#6B7280",
"icon": "📁",
"subcategories": ["Misceláneos"]
}
},
"settings": {
"default_execution_directory": "script_directory",
"enable_argument_validation": True,
"max_history_entries": 100,
"auto_cleanup_days": 30
}
}
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2, ensure_ascii=False)
# Inicializar launcher_favorites.json
if not os.path.exists(self.favorites_path):
default_favorites = {"favorites": []}
with open(self.favorites_path, 'w', encoding='utf-8') as f:
json.dump(default_favorites, f, indent=2, ensure_ascii=False)
# Inicializar launcher_history.json
if not os.path.exists(self.history_path):
default_history = {
"history": [],
"settings": {
"max_entries": 100,
"auto_cleanup_days": 30,
"track_execution_time": True,
"track_arguments": True
}
}
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(default_history, f, indent=2, ensure_ascii=False)
# Inicializar launcher_script_metadata.json
if not os.path.exists(self.script_metadata_path):
default_metadata = {
"version": "1.0",
"script_metadata": {}
}
with open(self.script_metadata_path, 'w', encoding='utf-8') as f:
json.dump(default_metadata, f, indent=2, ensure_ascii=False)
def get_launcher_groups(self) -> List[Dict[str, Any]]:
"""Obtener todos los grupos de scripts GUI"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get("groups", [])
except Exception as e:
print(f"Error loading launcher groups: {e}")
return []
def get_launcher_group(self, group_id: str) -> Optional[Dict[str, Any]]:
"""Obtener un grupo específico por ID"""
groups = self.get_launcher_groups()
for group in groups:
if group.get("id") == group_id:
return group
return None
def add_launcher_group(self, group_data: Dict[str, Any]) -> Dict[str, str]:
"""Agregar nuevo grupo de scripts GUI"""
try:
# Validar datos requeridos
required_fields = ["name", "directory"]
for field in required_fields:
if not group_data.get(field):
return {"status": "error", "message": f"Campo requerido: {field}"}
# Validar que el directorio existe
if not os.path.isdir(group_data["directory"]):
return {"status": "error", "message": "El directorio especificado no existe"}
# Generar ID único si no se proporciona
if not group_data.get("id"):
group_data["id"] = str(uuid.uuid4())[:8]
# Verificar que el ID no exista
if self.get_launcher_group(group_data["id"]):
return {"status": "error", "message": "Ya existe un grupo con este ID"}
# Agregar campos por defecto
current_time = datetime.now().isoformat() + "Z"
group_data.setdefault("description", "")
group_data.setdefault("category", "Otros")
group_data.setdefault("version", "1.0")
group_data.setdefault("author", "")
group_data.setdefault("tags", [])
group_data.setdefault("python_env", "base") # Entorno por defecto
group_data.setdefault("created_date", current_time)
group_data["updated_date"] = current_time
# Cargar configuración y agregar grupo
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
config["groups"].append(group_data)
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": "Grupo agregado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error agregando grupo: {str(e)}"}
def update_launcher_group(self, group_id: str, group_data: Dict[str, Any]) -> Dict[str, str]:
"""Actualizar grupo existente"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# Buscar y actualizar el grupo
group_found = False
for i, group in enumerate(config["groups"]):
if group["id"] == group_id:
# Mantener ID y fechas de creación
group_data["id"] = group_id
group_data["created_date"] = group.get("created_date", datetime.now().isoformat() + "Z")
group_data["updated_date"] = datetime.now().isoformat() + "Z"
config["groups"][i] = group_data
group_found = True
break
if not group_found:
return {"status": "error", "message": "Grupo no encontrado"}
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": "Grupo actualizado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error actualizando grupo: {str(e)}"}
def delete_launcher_group(self, group_id: str) -> Dict[str, str]:
"""Eliminar grupo de scripts GUI"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
# Filtrar el grupo a eliminar
original_count = len(config["groups"])
config["groups"] = [g for g in config["groups"] if g["id"] != group_id]
if len(config["groups"]) == original_count:
return {"status": "error", "message": "Grupo no encontrado"}
with open(self.launcher_config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# Limpiar favoritos del grupo eliminado
self._cleanup_favorites_for_group(group_id)
# Limpiar metadatos de scripts del grupo eliminado
self._cleanup_script_metadata_for_group(group_id)
return {"status": "success", "message": "Grupo eliminado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error eliminando grupo: {str(e)}"}
def get_group_scripts(self, group_id: str) -> List[Dict[str, Any]]:
"""Obtener scripts de un grupo específico con metadatos"""
try:
print(f"[DEBUG] Loading scripts for group: {group_id}")
group = self.get_launcher_group(group_id)
if not group:
print(f"[DEBUG] Group {group_id} not found")
return []
directory = group["directory"]
print(f"[DEBUG] Group directory: {directory}")
if not os.path.isdir(directory):
print(f"[DEBUG] Directory {directory} does not exist or is not a directory")
return []
# Cargar metadatos de scripts
script_metadata = self._load_script_metadata()
scripts = []
python_files = [f for f in os.listdir(directory) if f.endswith('.py') and not f.startswith('_')]
print(f"[DEBUG] Found {len(python_files)} Python files: {python_files}")
for file in python_files:
script_path = os.path.join(directory, file)
if os.path.isfile(script_path):
# Clave para metadatos
metadata_key = f"{group_id}_{file}"
metadata = script_metadata.get(metadata_key, {})
# Verificar si está oculto
if metadata.get("hidden", False):
print(f"[DEBUG] Script {file} is hidden, skipping")
continue
script_info = {
"name": file,
"display_name": metadata.get("display_name", file[:-3]),
"description": metadata.get("description", ""),
"long_description": metadata.get("long_description", ""),
"path": script_path,
"size": os.path.getsize(script_path),
"modified": datetime.fromtimestamp(os.path.getmtime(script_path)).isoformat(),
"hidden": metadata.get("hidden", False)
}
scripts.append(script_info)
print(f"[DEBUG] Added script: {script_info['display_name']} ({file})")
print(f"[DEBUG] Returning {len(scripts)} scripts for group {group_id}")
return sorted(scripts, key=lambda x: x["display_name"])
except Exception as e:
print(f"Error getting scripts for group {group_id}: {e}")
import traceback
traceback.print_exc()
return []
def get_all_group_scripts(self, group_id: str) -> List[Dict[str, Any]]:
"""Obtener TODOS los scripts de un grupo (incluyendo ocultos) para gestión"""
try:
group = self.get_launcher_group(group_id)
if not group:
return []
directory = group["directory"]
if not os.path.isdir(directory):
return []
# Cargar metadatos de scripts
script_metadata = self._load_script_metadata()
scripts = []
for file in os.listdir(directory):
if file.endswith('.py') and not file.startswith('_'):
script_path = os.path.join(directory, file)
if os.path.isfile(script_path):
# Clave para metadatos
metadata_key = f"{group_id}_{file}"
metadata = script_metadata.get(metadata_key, {})
scripts.append({
"name": file,
"display_name": metadata.get("display_name", file[:-3]),
"description": metadata.get("description", ""),
"long_description": metadata.get("long_description", ""),
"path": script_path,
"size": os.path.getsize(script_path),
"modified": datetime.fromtimestamp(os.path.getmtime(script_path)).isoformat(),
"hidden": metadata.get("hidden", False)
})
return sorted(scripts, key=lambda x: x["name"])
except Exception as e:
print(f"Error getting all scripts for group {group_id}: {e}")
return []
def get_script_metadata(self, group_id: str, script_name: str) -> Dict[str, Any]:
"""Obtener metadatos de un script específico"""
try:
script_metadata = self._load_script_metadata()
metadata_key = f"{group_id}_{script_name}"
return script_metadata.get(metadata_key, {
"display_name": script_name[:-3] if script_name.endswith('.py') else script_name,
"description": "",
"long_description": "",
"hidden": False
})
except Exception as e:
print(f"Error getting script metadata for {group_id}/{script_name}: {e}")
return {}
def update_script_metadata(self, group_id: str, script_name: str, metadata: Dict[str, Any]) -> Dict[str, str]:
"""Actualizar metadatos de un script"""
try:
script_metadata = self._load_script_metadata()
metadata_key = f"{group_id}_{script_name}"
# Actualizar metadatos
script_metadata[metadata_key] = {
"display_name": metadata.get("display_name", script_name[:-3]),
"description": metadata.get("description", ""),
"long_description": metadata.get("long_description", ""),
"hidden": metadata.get("hidden", False),
"updated_date": datetime.now().isoformat() + "Z"
}
# Guardar
self._save_script_metadata(script_metadata)
return {"status": "success", "message": "Metadatos actualizados exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error actualizando metadatos: {str(e)}"}
def get_available_python_envs(self) -> List[Dict[str, str]]:
"""Obtener lista de entornos de Python/Miniconda disponibles"""
try:
envs = [{"name": "base", "display_name": "Base (Sistema)", "path": sys.executable}]
# Intentar encontrar Miniconda
miniconda_paths = [
r"C:\Users\migue\miniconda3",
r"C:\ProgramData\miniconda3",
r"C:\miniconda3",
os.path.expanduser("~/miniconda3"),
os.path.expanduser("~/anaconda3")
]
for base_path in miniconda_paths:
if os.path.exists(base_path):
envs_path = os.path.join(base_path, "envs")
if os.path.exists(envs_path):
for env_name in os.listdir(envs_path):
env_path = os.path.join(envs_path, env_name)
python_exe = os.path.join(env_path, "python.exe")
if os.path.exists(python_exe):
envs.append({
"name": env_name,
"display_name": f"{env_name} (Miniconda)",
"path": python_exe
})
break # Solo usar el primer Miniconda encontrado
return envs
except Exception as e:
print(f"Error getting Python environments: {e}")
return [{"name": "base", "display_name": "Base (Sistema)", "path": sys.executable}]
def execute_gui_script(self, group_id: str, script_name: str, script_args: List[str],
broadcast_func, working_dir: str = None, use_pythonw: bool = False) -> Dict[str, Any]:
"""Ejecutar script GUI con argumentos opcionales y entorno específico
Args:
use_pythonw: Si True, usa pythonw.exe (sin logging), si False usa python.exe (con logging)
"""
try:
group = self.get_launcher_group(group_id)
if not group:
return {"status": "error", "message": "Grupo no encontrado"}
script_path = os.path.join(group["directory"], script_name)
if not os.path.isfile(script_path):
return {"status": "error", "message": "Script no encontrado"}
# Determinar el ejecutable de Python a usar
python_env = group.get("python_env", "base")
python_executable = self._get_python_executable(python_env, use_pythonw)
# Determinar directorio de trabajo
if working_dir and os.path.isdir(working_dir):
exec_working_dir = working_dir
else:
exec_working_dir = group["directory"] # Por defecto directorio del script
# Configurar variables de entorno para UTF-8
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
env['PYTHONLEGACYWINDOWSSTDIO'] = '0'
# Variables adicionales para encoding
env['LANG'] = 'en_US.UTF-8'
env['LC_ALL'] = 'en_US.UTF-8'
env['PYTHONUNBUFFERED'] = '1'
# Forzar UTF-8 en consola de Windows
if sys.platform == "win32":
env['PYTHONUTF8'] = '1'
env['PYTHONLEGACYWINDOWSFSENCODING'] = '0'
# Construir comando con flag UTF-8
if use_pythonw:
cmd = [python_executable, script_path] + script_args
else:
# Para python.exe, agregar flag UTF-8 explícitamente
if python_executable.endswith('python.exe'):
cmd = [python_executable, '-X', 'utf8', script_path] + script_args
else:
cmd = [python_executable, script_path] + script_args
broadcast_func(f"Ejecutando script GUI: {script_name}")
broadcast_func(f"Entorno Python: {python_env}")
broadcast_func(f"Ejecutable: {'pythonw.exe (sin logging)' if use_pythonw else 'python.exe (con logging)'}")
broadcast_func(f"Comando: {' '.join(cmd)}")
broadcast_func(f"Directorio: {exec_working_dir}")
broadcast_func(f"Encoding: UTF-8 (PYTHONUTF8=1, PYTHONIOENCODING=utf-8)")
if '-X' in cmd and 'utf8' in cmd:
broadcast_func("Usando flag -X utf8 para forzar UTF-8")
broadcast_func("=" * 50)
# Ejecutar script
start_time = datetime.now()
if use_pythonw:
# Con pythonw.exe: No capturar salida, solo ejecutar
try:
process = subprocess.Popen(
cmd,
cwd=exec_working_dir,
env=env,
creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == "win32" else 0
)
broadcast_func("Script GUI ejecutado sin logging (pythonw.exe)")
broadcast_func(f"PID: {process.pid}")
broadcast_func("=" * 50)
status = "running"
except subprocess.SubprocessError as e:
error_msg = f"Error ejecutando con pythonw.exe: {str(e)}"
broadcast_func(error_msg)
return {"status": "error", "message": error_msg}
else:
# Con python.exe: Capturar salida para logging
try:
process = subprocess.Popen(
cmd,
cwd=exec_working_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Combinar stderr con stdout
text=True,
encoding='utf-8', # Forzar UTF-8
universal_newlines=True,
bufsize=1 # Line buffered
)
# Leer salida en tiempo real
def read_output():
try:
for line in iter(process.stdout.readline, ''):
if line.strip(): # Solo enviar líneas no vacías
broadcast_func(line.rstrip())
process.stdout.close()
except Exception as e:
broadcast_func(f"Error leyendo salida: {str(e)}")
# Iniciar thread para leer salida
output_thread = threading.Thread(target=read_output, daemon=True)
output_thread.start()
# Esperar más tiempo para capturar salida inicial y detectar finalización
time.sleep(2)
# Verificar si el proceso sigue corriendo
poll_result = process.poll()
if poll_result is not None:
# El proceso terminó
execution_time = (datetime.now() - start_time).total_seconds()
# Esperar a que termine el thread de lectura
output_thread.join(timeout=1)
if poll_result == 0:
broadcast_func("=" * 50)
broadcast_func(f"Script completado exitosamente en {execution_time:.2f}s")
status = "success"
else:
broadcast_func("=" * 50)
broadcast_func(f"Script terminó con código de error: {poll_result}")
status = "error"
else:
# El proceso sigue corriendo (típico para GUIs)
broadcast_func("=" * 50)
broadcast_func(f"Script GUI iniciado correctamente (PID: {process.pid})")
broadcast_func("Nota: El script sigue ejecutándose en segundo plano")
status = "running"
# Iniciar un thread separado para monitorear la finalización
def monitor_completion():
try:
final_code = process.wait() # Esperar hasta que termine
end_time = datetime.now()
final_execution_time = (end_time - start_time).total_seconds()
# Actualizar el historial cuando termine
self._update_history_status(execution_id, final_code, final_execution_time)
if final_code == 0:
broadcast_func(f"[{end_time.strftime('%H:%M:%S')}] Script {script_name} completado exitosamente ({final_execution_time:.2f}s)")
else:
broadcast_func(f"[{end_time.strftime('%H:%M:%S')}] Script {script_name} terminó con error (código: {final_code})")
except Exception as e:
broadcast_func(f"Error monitoreando finalización: {str(e)}")
monitor_thread = threading.Thread(target=monitor_completion, daemon=True)
monitor_thread.start()
except subprocess.SubprocessError as e:
error_msg = f"Error ejecutando con python.exe: {str(e)}"
broadcast_func(error_msg)
return {"status": "error", "message": error_msg}
# Registrar en historial
execution_id = str(uuid.uuid4())[:8]
self._add_to_history({
"id": execution_id,
"group_id": group_id,
"script_name": script_name,
"executed_date": start_time.isoformat() + "Z",
"arguments": script_args,
"working_directory": exec_working_dir,
"python_env": python_env,
"executable_type": "pythonw.exe" if use_pythonw else "python.exe",
"status": status,
"pid": process.pid,
"execution_time": (datetime.now() - start_time).total_seconds() if status != "running" else None
})
broadcast_func(f"ID de ejecución: {execution_id}")
return {
"status": "success",
"message": "Script GUI ejecutado exitosamente",
"execution_id": execution_id,
"pid": process.pid
}
except Exception as e:
error_msg = f"Error ejecutando script GUI: {str(e)}"
broadcast_func(error_msg)
return {"status": "error", "message": error_msg}
def _get_python_executable(self, env_name: str, use_pythonw: bool = False) -> str:
"""Obtener el ejecutable de Python para un entorno específico
Args:
env_name: Nombre del entorno Python
use_pythonw: Si True, usa pythonw.exe (sin consola), si False usa python.exe (con logging)
"""
if env_name == "base":
# Para el sistema base
base_dir = os.path.dirname(sys.executable)
if use_pythonw:
pythonw_path = os.path.join(base_dir, "pythonw.exe")
if os.path.exists(pythonw_path):
return pythonw_path
python_path = os.path.join(base_dir, "python.exe")
if os.path.exists(python_path):
return python_path
return sys.executable
# Buscar en entornos de Miniconda
miniconda_paths = [
r"C:\Users\migue\miniconda3",
r"C:\ProgramData\miniconda3",
r"C:\miniconda3",
os.path.expanduser("~/miniconda3"),
os.path.expanduser("~/anaconda3")
]
for base_path in miniconda_paths:
if use_pythonw:
# Intentar pythonw.exe para GUI sin consola
env_pythonw_path = os.path.join(base_path, "envs", env_name, "pythonw.exe")
if os.path.exists(env_pythonw_path):
return env_pythonw_path
# Intentar python.exe para logging
env_python_path = os.path.join(base_path, "envs", env_name, "python.exe")
if os.path.exists(env_python_path):
return env_python_path
# Fallback final al sistema
base_dir = os.path.dirname(sys.executable)
if use_pythonw:
pythonw_path = os.path.join(base_dir, "pythonw.exe")
if os.path.exists(pythonw_path):
return pythonw_path
return sys.executable
def _load_script_metadata(self) -> Dict[str, Any]:
"""Cargar metadatos de scripts"""
try:
with open(self.script_metadata_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("script_metadata", {})
except Exception as e:
print(f"Error loading script metadata: {e}")
return {}
def _save_script_metadata(self, metadata: Dict[str, Any]):
"""Guardar metadatos de scripts"""
try:
data = {
"version": "1.0",
"script_metadata": metadata,
"updated_date": datetime.now().isoformat() + "Z"
}
with open(self.script_metadata_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error saving script metadata: {e}")
def _cleanup_script_metadata_for_group(self, group_id: str):
"""Limpiar metadatos de scripts de un grupo eliminado"""
try:
script_metadata = self._load_script_metadata()
# Filtrar metadatos del grupo eliminado
filtered_metadata = {k: v for k, v in script_metadata.items() if not k.startswith(f"{group_id}_")}
self._save_script_metadata(filtered_metadata)
except Exception as e:
print(f"Error cleaning up script metadata for group {group_id}: {e}")
def get_favorites(self) -> List[Dict[str, Any]]:
"""Obtener lista de favoritos"""
try:
with open(self.favorites_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("favorites", [])
except Exception as e:
print(f"Error loading favorites: {e}")
return []
def toggle_favorite(self, group_id: str, script_name: str) -> Dict[str, str]:
"""Agregar o quitar script de favoritos"""
try:
with open(self.favorites_path, 'r', encoding='utf-8') as f:
data = json.load(f)
favorites = data.get("favorites", [])
favorite_id = f"{group_id}_{script_name}"
# Buscar si ya existe
existing_favorite = None
for i, fav in enumerate(favorites):
if fav["group_id"] == group_id and fav["script_name"] == script_name:
existing_favorite = i
break
if existing_favorite is not None:
# Quitar de favoritos
favorites.pop(existing_favorite)
action = "removed"
else:
# Agregar a favoritos
favorites.append({
"id": favorite_id,
"group_id": group_id,
"script_name": script_name,
"added_date": datetime.now().isoformat() + "Z",
"execution_count": 0,
"last_executed": None
})
action = "added"
data["favorites"] = favorites
with open(self.favorites_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return {"status": "success", "action": action}
except Exception as e:
return {"status": "error", "message": f"Error managing favorite: {str(e)}"}
def get_history(self) -> List[Dict[str, Any]]:
"""Obtener historial de ejecución"""
try:
with open(self.history_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("history", [])
except Exception as e:
print(f"Error loading history: {e}")
return []
def clear_history(self) -> Dict[str, str]:
"""Limpiar historial de ejecución"""
try:
with open(self.history_path, 'r', encoding='utf-8') as f:
data = json.load(f)
data["history"] = []
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": "Historial limpiado exitosamente"}
except Exception as e:
return {"status": "error", "message": f"Error clearing history: {str(e)}"}
def get_categories(self) -> Dict[str, Any]:
"""Obtener categorías disponibles"""
try:
with open(self.launcher_config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get("categories", {})
except Exception as e:
print(f"Error loading categories: {e}")
return {}
def _add_to_history(self, entry: Dict[str, Any]):
"""Agregar entrada al historial"""
try:
with open(self.history_path, 'r', encoding='utf-8') as f:
data = json.load(f)
history = data.get("history", [])
history.insert(0, entry) # Agregar al inicio
# Limitar tamaño del historial
max_entries = data.get("settings", {}).get("max_entries", 100)
if len(history) > max_entries:
history = history[:max_entries]
data["history"] = history
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error adding to history: {e}")
def _cleanup_favorites_for_group(self, group_id: str):
"""Limpiar favoritos de un grupo eliminado"""
try:
with open(self.favorites_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Filtrar favoritos del grupo eliminado
data["favorites"] = [f for f in data.get("favorites", []) if f.get("group_id") != group_id]
with open(self.favorites_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error cleaning up favorites for group {group_id}: {e}")
def _update_history_status(self, execution_id: str, final_code: int, final_execution_time: float):
"""Actualizar el estado del historial de ejecución"""
try:
with open(self.history_path, 'r', encoding='utf-8') as f:
data = json.load(f)
history = data.get("history", [])
for i, entry in enumerate(history):
if entry["id"] == execution_id:
history[i]["status"] = "success" if final_code == 0 else "error"
history[i]["execution_time"] = final_execution_time
break
data["history"] = history
with open(self.history_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error updating history status: {e}")
def focus_process(self, pid: int) -> Dict[str, str]:
"""Activar el foco de un proceso por su PID"""
try:
if sys.platform == "win32":
import ctypes
from ctypes import wintypes
# Funciones de Windows API
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
# Encontrar ventana por PID
def enum_windows_proc(hwnd, pid):
window_pid = wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(window_pid))
if window_pid.value == pid:
# Traer ventana al frente
user32.SetForegroundWindow(hwnd)
user32.ShowWindow(hwnd, 9) # SW_RESTORE
return False # Detener enumeración
return True
# Enumeración de ventanas
WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM)
enum_proc = WNDENUMPROC(enum_windows_proc)
user32.EnumWindows(enum_proc, pid)
return {"status": "success", "message": f"Proceso {pid} activado"}
else:
return {"status": "error", "message": "Función solo disponible en Windows"}
except Exception as e:
return {"status": "error", "message": f"Error activando proceso: {str(e)}"}
def terminate_process(self, pid: int) -> Dict[str, str]:
"""Cerrar un proceso por su PID"""
try:
import psutil
process = psutil.Process(pid)
process_name = process.name()
# Intentar cerrar suavemente primero
process.terminate()
# Esperar un momento para que cierre
time.sleep(1)
# Si sigue corriendo, forzar cierre
if process.is_running():
process.kill()
return {"status": "success", "message": f"Proceso {process_name} (PID: {pid}) cerrado exitosamente"}
except ImportError:
# Fallback sin psutil
try:
if sys.platform == "win32":
subprocess.run(["taskkill", "/F", "/PID", str(pid)], check=True)
else:
subprocess.run(["kill", "-9", str(pid)], check=True)
return {"status": "success", "message": f"Proceso {pid} cerrado"}
except subprocess.CalledProcessError as e:
return {"status": "error", "message": f"Error cerrando proceso: {str(e)}"}
except Exception as e:
return {"status": "error", "message": f"Error: {str(e)}"}
def get_running_processes(self) -> List[Dict[str, Any]]:
"""Obtener lista de procesos en ejecución del historial"""
try:
history = self.get_history()
running_processes = []
for entry in history:
if entry.get("status") == "running" and entry.get("pid"):
try:
# Verificar si el proceso sigue corriendo
if sys.platform == "win32":
result = subprocess.run(
["tasklist", "/FI", f"PID eq {entry['pid']}", "/FO", "CSV"],
capture_output=True, text=True
)
if str(entry['pid']) in result.stdout:
running_processes.append(entry)
else:
# Linux/Mac
result = subprocess.run(
["ps", "-p", str(entry['pid'])],
capture_output=True, text=True
)
if result.returncode == 0:
running_processes.append(entry)
except Exception:
continue
return running_processes
except Exception as e:
print(f"Error getting running processes: {e}")
return []
def get_markdown_files(self, group_id: str) -> List[Dict[str, Any]]:
"""Obtener archivos Markdown de un grupo (root y subdirectorios hasta 10 archivos)"""
try:
print(f"[DEBUG] Looking for markdown files in group: {group_id}")
group = self.get_launcher_group(group_id)
if not group:
print(f"[DEBUG] Group {group_id} not found for markdown search")
return []
directory = group["directory"]
print(f"[DEBUG] Searching markdown files in directory: {directory}")
if not os.path.isdir(directory):
print(f"[DEBUG] Directory {directory} does not exist for markdown search")
return []
markdown_files = []
# Buscar en directorio root
all_files = os.listdir(directory)
print(f"[DEBUG] All files in directory: {all_files}")
for file in all_files:
if file.lower().endswith('.md'):
file_path = os.path.join(directory, file)
if os.path.isfile(file_path):
print(f"[DEBUG] Found markdown file: {file}")
markdown_files.append({
"name": file,
"display_name": file[:-3], # Sin extensión .md
"path": file_path,
"relative_path": file,
"level": 0, # Root level
"size": os.path.getsize(file_path),
"modified": datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
})
else:
print(f"[DEBUG] {file} is not a file, skipping")
# Buscar en subdirectorios (máximo 1 nivel)
try:
for subdir in all_files:
subdir_path = os.path.join(directory, subdir)
if os.path.isdir(subdir_path) and (not subdir.startswith('.') or subdir.startswith('.doc')):
print(f"[DEBUG] Searching in subdirectory: {subdir}")
subdir_files = os.listdir(subdir_path)
for file in subdir_files:
if file.lower().endswith('.md'):
file_path = os.path.join(subdir_path, file)
if os.path.isfile(file_path):
print(f"[DEBUG] Found markdown file in subdir: {subdir}/{file}")
markdown_files.append({
"name": file,
"display_name": f"{subdir}/{file[:-3]}",
"path": file_path,
"relative_path": f"{subdir}/{file}",
"level": 1, # Subdirectory level
"size": os.path.getsize(file_path),
"modified": datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
})
# Limitar a 10 archivos total
if len(markdown_files) >= 10:
break
if len(markdown_files) >= 10:
break
except PermissionError as e:
print(f"[DEBUG] Permission error accessing subdirectories: {e}")
# Ignorar subdirectorios sin permisos
pass
# Ordenar por modificación (más recientes primero) y limitar a 10
markdown_files.sort(key=lambda x: x["modified"], reverse=True)
print(f"[DEBUG] Found {len(markdown_files)} total markdown files")
return markdown_files[:10]
except Exception as e:
print(f"Error getting markdown files for group {group_id}: {e}")
import traceback
traceback.print_exc()
return []
def read_markdown_file(self, group_id: str, relative_path: str) -> Dict[str, Any]:
"""Leer contenido de un archivo Markdown"""
try:
group = self.get_launcher_group(group_id)
if not group:
return {"status": "error", "message": "Grupo no encontrado"}
file_path = os.path.join(group["directory"], relative_path)
# Verificar que el archivo existe y está dentro del directorio del grupo (seguridad)
if not os.path.isfile(file_path):
return {"status": "error", "message": "Archivo no encontrado"}
if not file_path.startswith(group["directory"]):
return {"status": "error", "message": "Acceso denegado"}
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"status": "success",
"content": content,
"file_name": os.path.basename(file_path),
"file_path": relative_path,
"size": len(content),
"modified": datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
}
except Exception as e:
return {"status": "error", "message": f"Error leyendo archivo: {str(e)}"}