""" Script Proxy Service - Gestiona la ejecución y proxy de scripts Flask internos Permite múltiples proyectos por usuario con puertos internos no expuestos """ import requests import threading import time import subprocess import os import signal import json import psutil from datetime import datetime, timedelta from typing import Dict, Optional, Tuple, List from dataclasses import dataclass, asdict from flask import current_app import logging from contextlib import contextmanager logger = logging.getLogger(__name__) @dataclass class RunningScript: """Información de un script en ejecución""" project_id: str script_id: str user_id: str port: int process: subprocess.Popen workspace_path: str started_at: datetime last_activity: datetime script_name: str = "" status: str = "running" # running, stopping, stopped, error @property def script_key(self) -> str: """Clave única para identificar el script""" return f"{self.project_id}_{self.script_id}_{self.user_id}" @property def is_active(self) -> bool: """Verifica si el script ha tenido actividad reciente""" return (datetime.now() - self.last_activity).seconds < 300 # 5 minutos def to_dict(self) -> dict: """Convierte a diccionario para serialización""" data = asdict(self) data['started_at'] = self.started_at.isoformat() data['last_activity'] = self.last_activity.isoformat() data.pop('process', None) # No serializar el proceso return data class ScriptProxyService: """ Servicio que gestiona la ejecución de scripts Flask como procesos internos y hace de proxy para las peticiones HTTP """ def __init__(self, port_range_start: int = 5200, port_range_end: int = 5400): self.port_range_start = port_range_start self.port_range_end = port_range_end self.running_scripts: Dict[str, RunningScript] = {} self.port_pool = set(range(port_range_start, port_range_end + 1)) self.used_ports = set() self.lock = threading.RLock() # Configuración self.script_timeout = 3600 # 1 hora sin actividad self.health_check_interval = 30 # 30 segundos self.cleanup_interval = 300 # 5 minutos # Iniciar threads de monitoreo self._start_monitoring_threads() logger.info(f"ScriptProxyService iniciado con puertos {port_range_start}-{port_range_end}") def _start_monitoring_threads(self): """Inicia los threads de monitoreo y limpieza""" self._cleanup_thread = threading.Thread( target=self._cleanup_inactive_scripts, daemon=True, name="ScriptCleanup" ) self._cleanup_thread.start() self._health_check_thread = threading.Thread( target=self._health_check_loop, daemon=True, name="ScriptHealthCheck" ) self._health_check_thread.start() def _get_available_port(self) -> Optional[int]: """Obtiene un puerto disponible del pool""" with self.lock: available_ports = self.port_pool - self.used_ports if not available_ports: logger.warning("No hay puertos disponibles, intentando limpiar scripts inactivos") self._force_cleanup_inactive() available_ports = self.port_pool - self.used_ports if available_ports: port = min(available_ports) self.used_ports.add(port) return port logger.error("Pool de puertos agotado") return None def _release_port(self, port: int): """Libera un puerto del pool""" with self.lock: self.used_ports.discard(port) logger.debug(f"Puerto {port} liberado") def _is_port_responding(self, port: int, timeout: int = 5) -> bool: """Verifica si un puerto está respondiendo""" try: response = requests.get( f"http://localhost:{port}/health", timeout=timeout, headers={'User-Agent': 'ScriptsManager-HealthCheck/1.0'} ) return response.status_code == 200 except Exception as e: logger.debug(f"Puerto {port} no responde: {e}") return False def _is_script_alive(self, script: RunningScript) -> bool: """Verifica si un script está vivo y funcionando""" if not script.process: return False try: # Verificar si el proceso existe if script.process.poll() is not None: return False # Verificar si responde HTTP return self._is_port_responding(script.port, timeout=3) except Exception as e: logger.debug(f"Error verificando script {script.script_key}: {e}") return False def start_script(self, project_id: str, script_id: str, user_id: str, script_content: str, script_name: str = "", parameters: Dict = None, environment: Dict = None) -> Tuple[bool, str, Optional[int]]: """ Inicia un script Flask en un puerto interno Returns: (success, message, port) """ script_key = f"{project_id}_{script_id}_{user_id}" with self.lock: # Verificar si ya está ejecutándose if script_key in self.running_scripts: existing = self.running_scripts[script_key] if self._is_script_alive(existing): existing.last_activity = datetime.now() logger.info(f"Script {script_key} ya está ejecutándose en puerto {existing.port}") return True, "Script already running", existing.port else: # Limpiar script muerto logger.info(f"Script {script_key} encontrado muerto, limpiando...") self._cleanup_script(script_key) # Obtener puerto disponible port = self._get_available_port() if not port: return False, "No hay puertos disponibles", None try: # Crear workspace para el script workspace_path = self._create_workspace(project_id, script_id, user_id) # Preparar el script Flask script_file_path = self._prepare_script_file( workspace_path, script_content, port, parameters, environment ) # Iniciar el proceso process = self._start_script_process(script_file_path, workspace_path, environment) if not process: self._release_port(port) return False, "Error iniciando el proceso del script", None # Registrar script ejecutándose running_script = RunningScript( project_id=project_id, script_id=script_id, user_id=user_id, port=port, process=process, workspace_path=workspace_path, started_at=datetime.now(), last_activity=datetime.now(), script_name=script_name or f"Script-{script_id}", status="starting" ) self.running_scripts[script_key] = running_script # Esperar a que el script esté listo if self._wait_for_script_ready(port, timeout=30): running_script.status = "running" logger.info(f"Script {script_key} iniciado exitosamente en puerto {port}") return True, "Script iniciado correctamente", port else: # Fallo al iniciar, limpiar self._cleanup_script(script_key) return False, "Script no respondió en tiempo esperado", None except Exception as e: self._release_port(port) logger.error(f"Error iniciando script {script_key}: {e}") return False, f"Error: {str(e)}", None def _create_workspace(self, project_id: str, script_id: str, user_id: str) -> str: """Crea el workspace para el script""" workspace_path = os.path.join( "/app/workspaces", f"user_{user_id}", f"project_{project_id}", f"script_{script_id}" ) os.makedirs(workspace_path, exist_ok=True) # Crear directorios estándar for subdir in ['static', 'templates', 'data', 'logs']: os.makedirs(os.path.join(workspace_path, subdir), exist_ok=True) return workspace_path def _prepare_script_file(self, workspace_path: str, script_content: str, port: int, parameters: Dict = None, environment: Dict = None) -> str: """Prepara el archivo del script con configuración Flask""" # Wrappear el script del usuario en una aplicación Flask flask_wrapper = f''' import sys import os sys.path.insert(0, "/app") # Configuración del workspace WORKSPACE_PATH = "{workspace_path}" os.chdir(WORKSPACE_PATH) # Variables disponibles para el script del usuario PROJECT_WORKSPACE = WORKSPACE_PATH PARAMETERS = {parameters or {}} ENVIRONMENT = {environment or {}} # Importar Flask y configurar aplicación from flask import Flask, request, jsonify, render_template, send_file, redirect, url_for import threading import signal import atexit app = Flask(__name__) app.config['SECRET_KEY'] = 'script-session-key' # Health check endpoint requerido @app.route('/health') def health_check(): return {{"status": "ok", "workspace": WORKSPACE_PATH}}, 200 # Función para limpiar al cerrar def cleanup(): print("Script shutting down...") atexit.register(cleanup) signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) # === CÓDIGO DEL USUARIO === {script_content} # === FIN CÓDIGO DEL USUARIO === if __name__ == "__main__": app.run(host="127.0.0.1", port={port}, debug=False, threaded=True) ''' script_file_path = os.path.join(workspace_path, "script_app.py") with open(script_file_path, 'w', encoding='utf-8') as f: f.write(flask_wrapper) # Guardar metadatos metadata = { 'port': port, 'workspace': workspace_path, 'parameters': parameters, 'environment': environment, 'created_at': datetime.now().isoformat() } with open(os.path.join(workspace_path, 'metadata.json'), 'w') as f: json.dump(metadata, f, indent=2) return script_file_path def _start_script_process(self, script_file_path: str, workspace_path: str, environment: Dict = None) -> Optional[subprocess.Popen]: """Inicia el proceso del script""" try: # Preparar environment env = os.environ.copy() env.update(environment or {}) env['PYTHONPATH'] = '/app' env['WORKSPACE_PATH'] = workspace_path # Activar entorno conda si está disponible conda_env = env.get('CONDA_DEFAULT_ENV', 'scriptsmanager') # Comando para ejecutar el script cmd = [ 'bash', '-c', f'source activate {conda_env} && python {script_file_path}' ] # Iniciar proceso process = subprocess.Popen( cmd, cwd=workspace_path, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid # Crear nuevo group para fácil cleanup ) logger.info(f"Proceso iniciado con PID {process.pid} para script en {workspace_path}") return process except Exception as e: logger.error(f"Error iniciando proceso del script: {e}") return None def _wait_for_script_ready(self, port: int, timeout: int = 30) -> bool: """Espera a que el script esté listo para recibir peticiones""" start_time = time.time() while time.time() - start_time < timeout: if self._is_port_responding(port): return True time.sleep(1) return False def stop_script(self, project_id: str, script_id: str, user_id: str) -> Tuple[bool, str]: """Detiene un script específico""" script_key = f"{project_id}_{script_id}_{user_id}" with self.lock: if script_key not in self.running_scripts: return False, "Script no encontrado" script = self.running_scripts[script_key] script.status = "stopping" try: # Intentar terminar gracefully if script.process.poll() is None: os.killpg(os.getpgid(script.process.pid), signal.SIGTERM) # Esperar terminación try: script.process.wait(timeout=10) except subprocess.TimeoutExpired: # Forzar terminación os.killpg(os.getpgid(script.process.pid), signal.SIGKILL) script.process.wait() self._cleanup_script(script_key) logger.info(f"Script {script_key} detenido exitosamente") return True, "Script detenido correctamente" except Exception as e: logger.error(f"Error deteniendo script {script_key}: {e}") return False, f"Error deteniendo script: {str(e)}" def _cleanup_script(self, script_key: str): """Limpia un script del registro""" if script_key in self.running_scripts: script = self.running_scripts[script_key] self._release_port(script.port) script.status = "stopped" del self.running_scripts[script_key] logger.debug(f"Script {script_key} limpiado del registro") def _cleanup_inactive_scripts(self): """Thread que limpia scripts inactivos periódicamente""" while True: try: time.sleep(self.cleanup_interval) self._force_cleanup_inactive() except Exception as e: logger.error(f"Error en cleanup thread: {e}") def _force_cleanup_inactive(self): """Fuerza la limpieza de scripts inactivos o muertos""" with self.lock: to_cleanup = [] for script_key, script in self.running_scripts.items(): # Verificar si está vivo if not self._is_script_alive(script): to_cleanup.append(script_key) continue # Verificar timeout de inactividad inactive_time = (datetime.now() - script.last_activity).seconds if inactive_time > self.script_timeout: to_cleanup.append(script_key) logger.info(f"Script {script_key} inactivo por {inactive_time}s, marcado para limpieza") # Limpiar scripts marcados for script_key in to_cleanup: try: script = self.running_scripts.get(script_key) if script and script.process.poll() is None: # Terminar proceso si sigue vivo os.killpg(os.getpgid(script.process.pid), signal.SIGTERM) self._cleanup_script(script_key) except Exception as e: logger.error(f"Error limpiando script {script_key}: {e}") def _health_check_loop(self): """Thread que verifica la salud de los scripts""" while True: try: time.sleep(self.health_check_interval) with self.lock: for script_key, script in list(self.running_scripts.items()): if not self._is_script_alive(script): logger.warning(f"Script {script_key} no responde, marcando para limpieza") script.status = "error" except Exception as e: logger.error(f"Error en health check thread: {e}") def get_running_scripts(self, user_id: str = None, project_id: str = None) -> List[dict]: """Obtiene la lista de scripts en ejecución""" with self.lock: scripts = [] for script in self.running_scripts.values(): if user_id and script.user_id != user_id: continue if project_id and script.project_id != project_id: continue scripts.append(script.to_dict()) return scripts def update_script_activity(self, project_id: str, script_id: str, user_id: str): """Actualiza la última actividad de un script""" script_key = f"{project_id}_{script_id}_{user_id}" with self.lock: if script_key in self.running_scripts: self.running_scripts[script_key].last_activity = datetime.now() def get_script_info(self, project_id: str, script_id: str, user_id: str) -> Optional[dict]: """Obtiene información de un script específico""" script_key = f"{project_id}_{script_id}_{user_id}" with self.lock: script = self.running_scripts.get(script_key) return script.to_dict() if script else None def get_proxy_stats(self) -> dict: """Obtiene estadísticas del servicio de proxy""" with self.lock: return { 'total_scripts': len(self.running_scripts), 'used_ports': len(self.used_ports), 'available_ports': len(self.port_pool) - len(self.used_ports), 'port_range': f"{self.port_range_start}-{self.port_range_end}", 'uptime_hours': (datetime.now() - datetime.now()).total_seconds() / 3600, 'scripts_by_status': { status: len([s for s in self.running_scripts.values() if s.status == status]) for status in ['running', 'starting', 'stopping', 'error'] } } # Instancia global del servicio proxy_service = None def get_proxy_service() -> ScriptProxyService: """Obtiene la instancia global del servicio de proxy""" global proxy_service if proxy_service is None: proxy_service = ScriptProxyService() return proxy_service