489 lines
18 KiB
Python
489 lines
18 KiB
Python
import subprocess
|
|
import threading
|
|
import time
|
|
import signal
|
|
import os
|
|
import psutil
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, List
|
|
|
|
from app.models import Script, User, ExecutionLog, UserProject
|
|
from app.services.conda_service import CondaService
|
|
from app.services.data_manager import DataManager
|
|
from app.services.port_manager import PortManager
|
|
from app.config.database import db
|
|
from app.config.config import Config
|
|
|
|
|
|
class ScriptExecutor:
|
|
"""Service for executing scripts with multi-user support."""
|
|
|
|
def __init__(self, app=None):
|
|
self.conda_service = CondaService()
|
|
self.data_manager = DataManager()
|
|
self.port_manager = PortManager(Config.PORT_RANGE_START, Config.PORT_RANGE_END)
|
|
self.active_processes = {} # {process_id: process_info}
|
|
self.app = app
|
|
|
|
def execute_script(
|
|
self,
|
|
script_id: int,
|
|
user_id: int,
|
|
parameters: Dict = None,
|
|
project_id: Optional[int] = None,
|
|
) -> Dict:
|
|
"""Execute a script with user context and project isolation."""
|
|
print(
|
|
f"[SCRIPT_EXEC] Starting execution for script_id={script_id}, user_id={user_id}"
|
|
)
|
|
|
|
# Get script and user
|
|
script = Script.query.get(script_id)
|
|
user = User.query.get(user_id)
|
|
|
|
if not script or not user:
|
|
error_msg = f"Script or user not found: script={script}, user={user}"
|
|
print(f"[SCRIPT_EXEC] ERROR: {error_msg}")
|
|
return {"error": "Script or user not found"}
|
|
|
|
print(
|
|
f"[SCRIPT_EXEC] Found script: {script.filename} in group: {script.script_group.name}"
|
|
)
|
|
print(f"[SCRIPT_EXEC] Found user: {user.username} ({user.user_level})")
|
|
|
|
# Get or create user project
|
|
if project_id:
|
|
project = UserProject.query.get(project_id)
|
|
if not project or project.user_id != user_id:
|
|
error_msg = (
|
|
f"Invalid project: project_id={project_id}, user_id={user_id}"
|
|
)
|
|
print(f"[SCRIPT_EXEC] ERROR: {error_msg}")
|
|
return {"error": "Invalid project"}
|
|
else:
|
|
# Use default project
|
|
project = self.data_manager.get_or_create_default_project(
|
|
user_id, script.group_id
|
|
)
|
|
|
|
print(f"[SCRIPT_EXEC] Using project: {project.project_name} (id={project.id})")
|
|
|
|
# Allocate port for script interface
|
|
port = self.port_manager.allocate_port(script_id, user_id, project.id)
|
|
if not port:
|
|
error_msg = "No available ports"
|
|
print(f"[SCRIPT_EXEC] ERROR: {error_msg}")
|
|
return {"error": "No available ports"}
|
|
|
|
print(f"[SCRIPT_EXEC] Allocated port: {port}")
|
|
|
|
try:
|
|
# Prepare execution environment
|
|
data_dir = self.data_manager.get_user_project_path(
|
|
user_id, script.group_id, project.project_name
|
|
)
|
|
self.data_manager.ensure_project_directory(
|
|
user_id, script.group_id, project.project_name
|
|
)
|
|
|
|
print(f"[SCRIPT_EXEC] Data directory: {data_dir}")
|
|
|
|
# Build command
|
|
script_path = Path(script.script_group.directory_path) / script.filename
|
|
print(f"[SCRIPT_EXEC] Script path: {script_path}")
|
|
|
|
command = self.build_script_command(
|
|
script_path,
|
|
data_dir,
|
|
user.user_level,
|
|
port,
|
|
project.id,
|
|
project.project_name,
|
|
user.preferred_theme,
|
|
user.preferred_language,
|
|
parameters,
|
|
)
|
|
|
|
print(f"[SCRIPT_EXEC] Command built: {' '.join(command)}")
|
|
|
|
# Execute script
|
|
result = self.start_script_process(script, user, project, command, port)
|
|
|
|
print(f"[SCRIPT_EXEC] Execution result: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Release port on error
|
|
self.port_manager.release_port(port)
|
|
error_msg = str(e)
|
|
print(f"[SCRIPT_EXEC] EXCEPTION: {error_msg}")
|
|
return {"error": error_msg}
|
|
|
|
def build_script_command(
|
|
self,
|
|
script_path: Path,
|
|
data_dir: Path,
|
|
user_level: str,
|
|
port: int,
|
|
project_id: int,
|
|
project_name: str,
|
|
theme: str,
|
|
language: str,
|
|
parameters: Dict = None,
|
|
) -> List[str]:
|
|
"""Build command to execute script with required parameters."""
|
|
|
|
# Base command with required parameters
|
|
command = [
|
|
"python",
|
|
str(script_path),
|
|
"--data-dir",
|
|
str(data_dir),
|
|
"--user-level",
|
|
user_level,
|
|
"--port",
|
|
str(port),
|
|
"--project-id",
|
|
str(project_id),
|
|
"--project-name",
|
|
project_name,
|
|
"--theme",
|
|
theme,
|
|
"--language",
|
|
language,
|
|
]
|
|
|
|
# Add custom parameters if provided
|
|
if parameters:
|
|
for key, value in parameters.items():
|
|
command.extend([f"--{key}", str(value)])
|
|
|
|
return command
|
|
|
|
def start_script_process(
|
|
self,
|
|
script: Script,
|
|
user: User,
|
|
project: UserProject,
|
|
command: List[str],
|
|
port: int,
|
|
) -> Dict:
|
|
"""Start script process with conda environment."""
|
|
print(f"[SCRIPT_PROC] Starting process for script: {script.filename}")
|
|
|
|
# Get conda environment for script group
|
|
conda_env = script.script_group.conda_environment or "base"
|
|
print(f"[SCRIPT_PROC] Using conda environment: {conda_env}")
|
|
|
|
# Build conda command if conda is available
|
|
if self.conda_service.is_available():
|
|
print(f"[SCRIPT_PROC] Conda is available, building conda command")
|
|
try:
|
|
original_command = command.copy()
|
|
command = self.conda_service.build_conda_command(conda_env, command)
|
|
print(f"[SCRIPT_PROC] Original command: {' '.join(original_command)}")
|
|
print(f"[SCRIPT_PROC] Conda command: {' '.join(command)}")
|
|
except Exception as e:
|
|
error_msg = f"Could not use conda environment {conda_env}: {e}"
|
|
print(f"[SCRIPT_PROC] WARNING: {error_msg}")
|
|
else:
|
|
print(f"[SCRIPT_PROC] Conda not available, using direct command")
|
|
|
|
print(f"[SCRIPT_PROC] Final command: {' '.join(command)}")
|
|
|
|
# Get current working directory for logging
|
|
working_dir = os.getcwd()
|
|
|
|
# Create execution log with debug information
|
|
log_entry = ExecutionLog(
|
|
script_id=script.id,
|
|
user_id=user.id,
|
|
status="starting",
|
|
command_executed=" ".join(command),
|
|
conda_environment=conda_env if conda_env else "none",
|
|
working_directory=working_dir,
|
|
port_allocated=port,
|
|
)
|
|
db.session.add(log_entry)
|
|
db.session.commit()
|
|
print(f"[SCRIPT_PROC] Created execution log with id: {log_entry.id}")
|
|
print(f"[SCRIPT_PROC] Saved command: {' '.join(command)}")
|
|
conda_env_str = conda_env if conda_env else "none"
|
|
print(f"[SCRIPT_PROC] Saved conda env: {conda_env_str}")
|
|
print(f"[SCRIPT_PROC] Saved working dir: {working_dir}")
|
|
|
|
try:
|
|
# Start process
|
|
print(f"[SCRIPT_PROC] Starting subprocess...")
|
|
process = subprocess.Popen(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
universal_newlines=True,
|
|
)
|
|
|
|
print(f"[SCRIPT_PROC] Process started with PID: {process.pid}")
|
|
|
|
# Update log with process ID
|
|
log_entry.process_id = process.pid
|
|
db.session.commit()
|
|
|
|
# Register process
|
|
process_info = {
|
|
"process": process,
|
|
"script_id": script.id,
|
|
"user_id": user.id,
|
|
"project_id": project.id,
|
|
"port": port,
|
|
"log_id": log_entry.id,
|
|
"start_time": time.time(),
|
|
}
|
|
|
|
self.active_processes[process.pid] = process_info
|
|
print(f"[SCRIPT_PROC] Registered process {process.pid} in active_processes")
|
|
|
|
# Start monitoring thread
|
|
monitor_thread = threading.Thread(
|
|
target=self.monitor_process, args=(process.pid,), daemon=True
|
|
)
|
|
monitor_thread.start()
|
|
print(f"[SCRIPT_PROC] Started monitoring thread for PID: {process.pid}")
|
|
|
|
# Update log
|
|
log_entry.status = "running"
|
|
db.session.commit()
|
|
print(f"[SCRIPT_PROC] Updated log status to 'running'")
|
|
|
|
result = {
|
|
"success": True,
|
|
"process_id": process.pid,
|
|
"port": port,
|
|
"interface_url": f"http://127.0.0.1:{port}",
|
|
"log_id": log_entry.id,
|
|
}
|
|
print(f"[SCRIPT_PROC] Returning success result: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
print(f"[SCRIPT_PROC] EXCEPTION during process start: {error_msg}")
|
|
# Update log with error
|
|
log_entry.status = "failed"
|
|
log_entry.error_output = error_msg
|
|
log_entry.end_time = db.func.now()
|
|
db.session.commit()
|
|
|
|
# Release port
|
|
self.port_manager.release_port(port)
|
|
|
|
raise e
|
|
|
|
except Exception as e:
|
|
# Update log with error
|
|
log_entry.status = "failed"
|
|
log_entry.error_output = str(e)
|
|
log_entry.end_time = db.func.now()
|
|
db.session.commit()
|
|
|
|
# Release port
|
|
self.port_manager.release_port(port)
|
|
|
|
raise e
|
|
|
|
def monitor_process(self, process_id: int):
|
|
"""Monitor a running process and handle completion."""
|
|
print(f"[SCRIPT_MON] Starting monitoring for PID: {process_id}")
|
|
|
|
if process_id not in self.active_processes:
|
|
print(f"[SCRIPT_MON] ERROR: PID {process_id} not in active_processes")
|
|
return
|
|
|
|
process_info = self.active_processes[process_id]
|
|
process = process_info["process"]
|
|
print(
|
|
f"[SCRIPT_MON] Found process info for script_id: {process_info['script_id']}"
|
|
)
|
|
|
|
try:
|
|
print(f"[SCRIPT_MON] Waiting for process {process_id} to complete...")
|
|
# Wait for process completion
|
|
stdout, stderr = process.communicate()
|
|
|
|
print(
|
|
f"[SCRIPT_MON] Process {process_id} completed with return code: {process.returncode}"
|
|
)
|
|
print(f"[SCRIPT_MON] STDOUT length: {len(stdout) if stdout else 0}")
|
|
print(f"[SCRIPT_MON] STDERR length: {len(stderr) if stderr else 0}")
|
|
|
|
if stderr:
|
|
print(f"[SCRIPT_MON] STDERR content: {stderr[:500]}...")
|
|
|
|
# Use application context for database operations
|
|
if self.app:
|
|
print(f"[SCRIPT_MON] Updating database with application context")
|
|
with self.app.app_context():
|
|
# Update execution log
|
|
log_entry = ExecutionLog.query.get(process_info["log_id"])
|
|
if log_entry:
|
|
print(
|
|
f"[SCRIPT_MON] Found log entry {log_entry.id}, updating..."
|
|
)
|
|
if process.returncode == 0:
|
|
status = "completed"
|
|
else:
|
|
status = "failed"
|
|
log_entry.status = status
|
|
log_entry.output = stdout
|
|
log_entry.error_output = stderr
|
|
log_entry.exit_code = process.returncode
|
|
log_entry.end_time = db.func.now()
|
|
db.session.commit()
|
|
print(f"[SCRIPT_MON] Updated log entry to status: {status}")
|
|
else:
|
|
print(
|
|
f"[SCRIPT_MON] ERROR: Log entry not found for id: {process_info['log_id']}"
|
|
)
|
|
else:
|
|
print(f"[SCRIPT_MON] WARNING: No app context available")
|
|
|
|
# Release port
|
|
print(f"[SCRIPT_MON] Releasing port: {process_info['port']}")
|
|
self.port_manager.release_port(process_info["port"])
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
print(f"[SCRIPT_MON] ERROR monitoring process {process_id}: {error_msg}")
|
|
|
|
finally:
|
|
# Clean up
|
|
if process_id in self.active_processes:
|
|
print(
|
|
f"[SCRIPT_MON] Removing process {process_id} from active_processes"
|
|
)
|
|
del self.active_processes[process_id]
|
|
else:
|
|
print(
|
|
f"[SCRIPT_MON] WARNING: Process {process_id} not found in cleanup"
|
|
)
|
|
|
|
def stop_script(self, process_id: int, user_id: int) -> bool:
|
|
"""Stop a running script process."""
|
|
if process_id not in self.active_processes:
|
|
return False
|
|
|
|
process_info = self.active_processes[process_id]
|
|
|
|
# Check if user owns this process
|
|
if process_info["user_id"] != user_id:
|
|
return False
|
|
|
|
try:
|
|
process = process_info["process"]
|
|
|
|
# Try graceful termination first
|
|
process.terminate()
|
|
|
|
# Wait a bit for graceful shutdown
|
|
try:
|
|
process.wait(timeout=10)
|
|
except subprocess.TimeoutExpired:
|
|
# Force kill if necessary
|
|
process.kill()
|
|
process.wait()
|
|
|
|
# Update log
|
|
log_entry = ExecutionLog.query.get(process_info["log_id"])
|
|
if log_entry:
|
|
log_entry.status = "stopped"
|
|
log_entry.end_time = db.func.now()
|
|
db.session.commit()
|
|
|
|
# Release port
|
|
self.port_manager.release_port(process_info["port"])
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error stopping process {process_id}: {e}")
|
|
return False
|
|
|
|
finally:
|
|
# Clean up
|
|
if process_id in self.active_processes:
|
|
del self.active_processes[process_id]
|
|
|
|
def get_process_status(self, process_id: int, user_id: int) -> Optional[Dict]:
|
|
"""Get status of a running process."""
|
|
if process_id not in self.active_processes:
|
|
return None
|
|
|
|
process_info = self.active_processes[process_id]
|
|
|
|
# Check if user owns this process
|
|
if process_info["user_id"] != user_id:
|
|
return None
|
|
|
|
process = process_info["process"]
|
|
|
|
try:
|
|
# Check if process is still running
|
|
if process.poll() is None:
|
|
# Get CPU and memory usage
|
|
ps_process = psutil.Process(process_id)
|
|
cpu_percent = ps_process.cpu_percent()
|
|
memory_info = ps_process.memory_info()
|
|
|
|
return {
|
|
"status": "running",
|
|
"cpu_percent": cpu_percent,
|
|
"memory_mb": memory_info.rss / 1024 / 1024,
|
|
"start_time": process_info["start_time"],
|
|
"port": process_info["port"],
|
|
}
|
|
else:
|
|
return {"status": "completed", "exit_code": process.returncode}
|
|
|
|
except psutil.NoSuchProcess:
|
|
return {"status": "not_found"}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
def cleanup_orphaned_processes(self):
|
|
"""Clean up orphaned or zombie processes."""
|
|
to_remove = []
|
|
|
|
for process_id, process_info in self.active_processes.items():
|
|
try:
|
|
process = process_info["process"]
|
|
if process.poll() is not None:
|
|
# Process has finished
|
|
to_remove.append(process_id)
|
|
|
|
# Release port
|
|
self.port_manager.release_port(process_info["port"])
|
|
|
|
except Exception:
|
|
# Process no longer exists
|
|
to_remove.append(process_id)
|
|
|
|
# Remove from active processes
|
|
for process_id in to_remove:
|
|
if process_id in self.active_processes:
|
|
del self.active_processes[process_id]
|
|
|
|
def get_user_active_processes(self, user_id: int) -> List[Dict]:
|
|
"""Get all active processes for a user."""
|
|
user_processes = []
|
|
|
|
for process_id, process_info in self.active_processes.items():
|
|
if process_info["user_id"] == user_id:
|
|
status = self.get_process_status(process_id, user_id)
|
|
if status:
|
|
status["process_id"] = process_id
|
|
status["script_id"] = process_info["script_id"]
|
|
user_processes.append(status)
|
|
|
|
return user_processes
|