SIDEL_ScriptsManager/app/app.py

1130 lines
40 KiB
Python

import os
from flask import (
Flask,
render_template,
request,
jsonify,
session,
redirect,
url_for,
flash,
)
from flask_login import login_user, logout_user, login_required, current_user
from flask_socketio import SocketIO, emit, join_room, leave_room
from werkzeug.security import check_password_hash
import json
from datetime import datetime
from app.config.config import config
from app.config.database import init_db, login_manager
from app.config.permissions import (
require_permission,
can_edit_metadata,
can_access_script,
)
from app.models import User, ScriptGroup, Script, UserProject, ExecutionLog
from app.services.script_discovery import ScriptDiscoveryService
from app.services.script_executor import ScriptExecutor
from app.services.conda_service import CondaService
from app.services.data_manager import DataManager
from app.services.translation_service import TranslationService
from app.services.backup_service import BackupService
def register_routes(app):
"""Register all routes and WebSocket events for the application"""
# Get services from app config
db = app.extensions["sqlalchemy"]
# Initialize services
script_discovery = ScriptDiscoveryService()
script_executor = ScriptExecutor(app)
conda_service = CondaService()
data_manager = DataManager()
translation_service = TranslationService()
backup_service = BackupService()
# Initialize SocketIO
socketio = SocketIO(app, cors_allowed_origins="*")
# Helper functions for script description management
def _load_script_description(description_path, language="en"):
"""Load script description from markdown file with language support."""
import os
if not description_path:
return ""
# Try language-specific file first (e.g., description_es.md)
base_path, ext = os.path.splitext(description_path)
lang_path = f"{base_path}_{language}{ext}"
# Try absolute paths first, then relative to script groups
for path_to_try in [lang_path, description_path]:
if os.path.isabs(path_to_try):
abs_path = path_to_try
else:
# Assume relative to data/script_groups/
abs_path = os.path.join(
app.config.get("DATA_PATH", "data"), "script_groups", path_to_try
)
if os.path.exists(abs_path):
try:
with open(abs_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
app.logger.warning(
f"Error reading description file {abs_path}: {e}"
)
# Fallback to English if language-specific file doesn't exist
if language != "en":
return _load_script_description(description_path, "en")
return ""
def _save_script_description(description_path, content, language="en"):
"""Save script description to markdown file with language support."""
import os
if not description_path:
return False
# Use language-specific file (e.g., description_es.md)
base_path, ext = os.path.splitext(description_path)
lang_path = f"{base_path}_{language}{ext}"
# Determine absolute path
if os.path.isabs(lang_path):
abs_path = lang_path
else:
# Assume relative to data/script_groups/
abs_path = os.path.join(
app.config.get("DATA_PATH", "data"), "script_groups", lang_path
)
try:
# Ensure directory exists
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "w", encoding="utf-8") as f:
f.write(content)
return True
except Exception as e:
app.logger.error(f"Error saving description file {abs_path}: {e}")
return False
# User loader for Flask-Login
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# Template context processor for translations
@app.context_processor
def inject_translations():
if current_user.is_authenticated:
user_lang = translation_service.get_user_language(current_user)
translations = translation_service.get_all_translations(user_lang)
return dict(t=translations, current_lang=user_lang)
else:
translations = translation_service.get_all_translations()
return dict(t=translations, current_lang="en")
# Routes
@app.route("/")
def index():
if current_user.is_authenticated:
return redirect(url_for("dashboard"))
return redirect(url_for("login"))
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
user = User.query.filter_by(username=username).first()
if user and user.check_password(password) and user.is_active:
login_user(user)
user.last_login = datetime.utcnow()
db.session.commit()
next_page = request.args.get("next")
return (
redirect(next_page) if next_page else redirect(url_for("dashboard"))
)
else:
flash("Invalid username or password", "error")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("login"))
@app.route("/dashboard")
@login_required
def dashboard():
# Discover scripts on dashboard load
script_groups = script_discovery.scan_script_groups()
# Get user's active projects
user_projects = (
data_manager.list_user_projects(current_user.id, None) # All groups
if script_groups
else []
)
return render_template(
"dashboard.html", script_groups=script_groups, user_projects=user_projects
)
@app.route("/script-group/<int:group_id>")
@login_required
def script_group_view(group_id):
group = ScriptGroup.query.get_or_404(group_id)
# Check if user can access this group
if not can_access_script(current_user.user_level, group.required_level):
flash("Insufficient permissions to access this script group", "error")
return redirect(url_for("dashboard"))
scripts = Script.query.filter_by(group_id=group_id, is_active=True).all()
# Filter scripts by user permissions
accessible_scripts = [
script
for script in scripts
if can_access_script(current_user.user_level, script.required_level)
]
# Get user's projects for this group
user_projects = data_manager.list_user_projects(current_user.id, group_id)
# Get user's language preference
user_language = getattr(current_user, "language", "en") or "en"
return render_template(
"script_group.html",
script_group=group,
scripts=accessible_scripts,
user_projects=user_projects,
can_edit=can_edit_metadata(current_user.user_level),
user_language=user_language,
)
# Administration routes
@app.route("/admin")
@login_required
@require_permission("admin")
def admin_dashboard():
return redirect(url_for("admin_users"))
@app.route("/admin/users")
@login_required
@require_permission("admin")
def admin_users():
users = User.query.all()
return render_template("admin/users.html", users=users)
@app.route("/admin/users/create", methods=["GET", "POST"])
@login_required
@require_permission("admin")
def admin_create_user():
if request.method == "POST":
# Handle both JSON and form data
if request.is_json:
data = request.json
else:
data = request.form
username = data.get("username")
email = data.get("email")
user_level = data.get("user_level")
password = data.get("password")
# Validate data
if not all([username, email, user_level, password]):
if request.is_json:
return jsonify({"error": "All fields are required"}), 400
flash("All fields are required", "error")
return render_template("admin/user_form.html")
# Check if user exists
if User.query.filter_by(username=username).first():
if request.is_json:
return jsonify({"error": "Username already exists"}), 400
flash("Username already exists", "error")
return render_template("admin/user_form.html")
# Create new user
from werkzeug.security import generate_password_hash
new_user = User(
username=username,
email=email,
user_level=user_level,
password_hash=generate_password_hash(password),
)
try:
db.session.add(new_user)
db.session.commit()
if request.is_json:
return jsonify({"message": "User created successfully"}), 201
flash("User created successfully", "success")
return redirect(url_for("admin_users"))
except Exception as e:
db.session.rollback()
if request.is_json:
return jsonify({"error": str(e)}), 500
flash(f"Error creating user: {str(e)}", "error")
return render_template("admin/user_form.html")
@app.route("/admin/users/<int:user_id>/edit", methods=["GET", "POST"])
@login_required
@require_permission("admin")
def admin_edit_user(user_id):
user = User.query.get_or_404(user_id)
if request.method == "POST":
# Handle both JSON and form data
if request.is_json:
data = request.json
else:
data = request.form
# Update user data
user.username = data.get("username", user.username)
user.email = data.get("email", user.email)
user.user_level = data.get("user_level", user.user_level)
# Update password if provided
new_password = data.get("password")
if new_password:
from werkzeug.security import generate_password_hash
user.password_hash = generate_password_hash(new_password)
try:
db.session.commit()
if request.is_json:
return jsonify({"message": "User updated successfully"})
flash("User updated successfully", "success")
return redirect(url_for("admin_users"))
except Exception as e:
db.session.rollback()
if request.is_json:
return jsonify({"error": str(e)}), 500
flash(f"Error updating user: {str(e)}", "error")
return render_template("admin/user_form.html", user=user)
@app.route("/admin/users/<int:user_id>/delete", methods=["POST"])
@login_required
@require_permission("admin")
def admin_delete_user(user_id):
user = User.query.get_or_404(user_id)
# Prevent deleting own account
if user.id == current_user.id:
if request.is_json:
return jsonify({"error": "Cannot delete your own account"}), 400
flash("Cannot delete your own account", "error")
return redirect(url_for("admin_users"))
try:
db.session.delete(user)
db.session.commit()
if request.is_json:
return jsonify({"message": "User deleted successfully"})
flash("User deleted successfully", "success")
except Exception as e:
db.session.rollback()
if request.is_json:
return jsonify({"error": str(e)}), 500
flash(f"Error deleting user: {str(e)}", "error")
return redirect(url_for("admin_users"))
@app.route("/admin/conda")
@login_required
@require_permission("developer")
def admin_conda():
"""Conda environments management page."""
environments = []
conda_available = conda_service.is_available()
if conda_available:
try:
environments = conda_service.refresh_environments()
except Exception as e:
flash(f"Error refreshing conda environments: {str(e)}", "error")
return render_template(
"admin/conda.html",
environments=environments,
conda_available=conda_available,
)
@app.route("/admin/conda/refresh", methods=["POST"])
@login_required
@require_permission("developer")
def admin_conda_refresh():
"""Refresh conda environments."""
try:
if conda_service.is_available():
environments = conda_service.refresh_environments()
flash(f"Refreshed {len(environments)} conda environments", "success")
else:
flash("Conda is not available on this system", "error")
except Exception as e:
flash(f"Error refreshing conda environments: {str(e)}", "error")
return redirect(url_for("admin_conda"))
@app.route("/admin/backup")
@login_required
@require_permission("admin")
def admin_backup():
"""Backup management page."""
try:
backup_status = backup_service.get_backup_status()
backups = backup_service.list_available_backups()
except Exception as e:
flash(f"Error getting backup information: {str(e)}", "error")
backup_status = {}
backups = []
return render_template(
"admin/backup.html", backup_status=backup_status, backups=backups
)
@app.route("/admin/backup/create", methods=["POST"])
@login_required
@require_permission("admin")
def admin_backup_create():
"""Create manual backup."""
try:
description = request.form.get("description", "Manual backup")
backup_result = backup_service.create_manual_backup(description)
flash(
f"Backup created successfully: {backup_result['backup_file']}",
"success",
)
except Exception as e:
flash(f"Error creating backup: {str(e)}", "error")
return redirect(url_for("admin_backup"))
@app.route("/admin/backup/delete/<backup_date>", methods=["POST"])
@login_required
@require_permission("admin")
def admin_backup_delete(backup_date):
"""Delete specific backup."""
try:
if backup_service.delete_backup(backup_date):
flash(f"Backup {backup_date} deleted successfully", "success")
else:
flash(f"Failed to delete backup {backup_date}", "error")
except Exception as e:
flash(f"Error deleting backup: {str(e)}", "error")
return redirect(url_for("admin_backup"))
# API Routes
@app.route("/api/script-groups")
@login_required
def api_script_groups():
groups = ScriptGroup.query.filter_by(is_active=True).all()
accessible_groups = [
group.to_dict()
for group in groups
if can_access_script(current_user.user_level, group.required_level)
]
return jsonify(accessible_groups)
@app.route("/api/script-groups/<int:group_id>/scripts")
@login_required
def api_group_scripts(group_id):
group = ScriptGroup.query.get_or_404(group_id)
if not can_access_script(current_user.user_level, group.required_level):
return jsonify({"error": "Insufficient permissions"}), 403
scripts = Script.query.filter_by(group_id=group_id, is_active=True).all()
accessible_scripts = [
script.to_dict()
for script in scripts
if can_access_script(current_user.user_level, script.required_level)
]
return jsonify(accessible_scripts)
@app.route("/api/script-groups/<int:group_id>", methods=["GET"])
@login_required
def api_get_group(group_id):
"""Get group details for editing."""
group = ScriptGroup.query.get_or_404(group_id)
if not can_access_script(current_user.user_level, group.required_level):
return jsonify({"error": "Insufficient permissions"}), 403
return jsonify(group.to_dict())
@app.route("/api/script-groups/<int:group_id>", methods=["PUT"])
@login_required
def api_update_group(group_id):
"""Update group settings."""
if current_user.user_level not in ["admin", "superuser"]:
return jsonify({"error": "Admin privileges required"}), 403
group = ScriptGroup.query.get_or_404(group_id)
data = request.json
try:
# Update allowed fields
if "conda_environment" in data:
group.conda_environment = data["conda_environment"]
if "name" in data:
group.name = data["name"]
if "description" in data:
group.description = data["description"]
if "required_level" in data:
group.required_level = data["required_level"]
db.session.commit()
# Update metadata.json file to prevent discovery service override
if "conda_environment" in data:
script_discovery.update_group_metadata_file(
group, {"conda_environment": data["conda_environment"]}
)
return jsonify({"success": True, "group": group.to_dict()})
except Exception as e:
db.session.rollback()
return jsonify({"error": str(e)}), 500
@app.route("/api/scripts/<int:script_id>/execute", methods=["POST"])
@login_required
def api_execute_script(script_id):
print(
f"[API_EXEC] Execute request for script_id={script_id}, user={current_user.username}"
)
script = Script.query.get_or_404(script_id)
print(
f"[API_EXEC] Found script: {script.filename} in group: {script.script_group.name}"
)
if not can_access_script(current_user.user_level, script.required_level):
print(
f"[API_EXEC] Permission denied: user_level={current_user.user_level}, required={script.required_level}"
)
return jsonify({"error": "Insufficient permissions"}), 403
parameters = request.json.get("parameters", {}) if request.json else {}
project_id = request.json.get("project_id") if request.json else None
print(f"[API_EXEC] Parameters: {parameters}")
print(f"[API_EXEC] Project ID from request: {project_id}")
# If no project_id specified, use active project from session
if not project_id:
active_project_id = session.get("active_project_id")
print(f"[API_EXEC] Active project ID from session: {active_project_id}")
if active_project_id:
# Verify the project belongs to current user and group
active_project = UserProject.query.filter_by(
id=active_project_id,
user_id=current_user.id,
group_id=script.group_id,
).first()
if active_project:
project_id = active_project_id
print(f"[API_EXEC] Using verified active project: {project_id}")
else:
print(f"[API_EXEC] Active project verification failed")
print(f"[API_EXEC] Final project_id: {project_id}")
print(f"[API_EXEC] Calling script_executor.execute_script...")
result = script_executor.execute_script(
script_id, current_user.id, parameters, project_id
)
print(f"[API_EXEC] Script executor result: {result}")
return jsonify(result)
@app.route("/api/scripts/<int:script_id>/stop", methods=["POST"])
@login_required
def api_stop_script(script_id):
process_id = request.json.get("process_id")
if not process_id:
return jsonify({"error": "Process ID required"}), 400
success = script_executor.stop_script(process_id, current_user.id)
return jsonify({"success": success})
@app.route("/api/scripts/check-interface", methods=["POST"])
@login_required
def api_check_script_interface():
"""Check if a script interface is ready"""
url = request.json.get("url")
if not url:
return jsonify({"error": "URL required"}), 400
try:
import requests
# Try to make a simple request to the script interface
response = requests.get(url, timeout=2)
return jsonify(
{
"ready": response.status_code == 200,
"status_code": response.status_code,
}
)
except requests.exceptions.RequestException:
return jsonify({"ready": False, "status_code": None})
@app.route("/api/scripts/<int:script_id>", methods=["GET"])
@login_required
def api_get_script(script_id):
"""Get script details with multilingual descriptions."""
script = Script.query.get_or_404(script_id)
# Check if user has access to the script group
group_level = script.script_group.required_level
if not can_access_script(current_user.user_level, group_level):
return jsonify({"error": "Insufficient permissions"}), 403
# Get user's language preference
user_language = getattr(current_user, "language", "en") or "en"
# Load long description if available
long_description = ""
if script.description_long_path:
long_description = _load_script_description(
script.description_long_path, user_language
)
script_data = script.to_dict()
script_data["description_long"] = long_description
script_data["user_language"] = user_language
return jsonify(script_data)
@app.route("/api/scripts/<int:script_id>", methods=["PUT"])
@login_required
def api_update_script(script_id):
"""Update script metadata (admin only)."""
if not can_edit_metadata(current_user.user_level):
return jsonify({"error": "Admin privileges required"}), 403
script = Script.query.get_or_404(script_id)
# Check if user has access to the script group
group_level = script.script_group.required_level
if not can_access_script(current_user.user_level, group_level):
return jsonify({"error": "Insufficient permissions"}), 403
data = request.get_json()
try:
# Update basic fields
if "display_name" in data:
script.display_name = data["display_name"].strip()
if "description" in data:
script.description = data["description"].strip()
# Update long description if provided
if "description_long" in data and script.description_long_path:
_save_script_description(
script.description_long_path,
data["description_long"],
data.get("language", "en"),
)
# Update timestamp
script.last_modified = datetime.utcnow()
db.session.commit()
return jsonify(
{
"success": True,
"message": "Script updated successfully",
"script": script.to_dict(),
}
)
except Exception as e:
db.session.rollback()
error_msg = f"Failed to update script: {str(e)}"
return jsonify({"error": error_msg}), 500
@app.route("/api/scripts/<int:script_id>/logs", methods=["GET"])
@login_required
def api_get_script_logs(script_id):
"""Get execution logs for a script."""
script = Script.query.get_or_404(script_id)
# Check if user has access to the script group
group_level = script.script_group.required_level
if not can_access_script(current_user.user_level, group_level):
return jsonify({"error": "Insufficient permissions"}), 403
# Get recent execution logs for this script
logs = (
ExecutionLog.query.filter_by(script_id=script_id)
.order_by(ExecutionLog.start_time.desc())
.limit(10)
.all()
)
log_data = []
for log in logs:
log_dict = log.to_dict()
# Combine output and error_output for display
combined_output = ""
if log.output:
combined_output += log.output
if log.error_output:
if combined_output:
combined_output += "\n--- STDERR ---\n"
combined_output += log.error_output
log_dict["combined_output"] = combined_output
# Format for frontend consumption
formatted_log = {
"message": combined_output or "No output available",
"level": log.status or "info",
"timestamp": log.start_time.isoformat() if log.start_time else "",
"raw_data": log_dict, # Include full log data for reference
}
log_data.append(formatted_log)
return jsonify({"success": True, "logs": log_data})
@app.route("/api/conda/environments")
@login_required
def api_conda_environments():
"""Get available conda environments."""
if current_user.user_level not in ["admin", "superuser"]:
return jsonify({"error": "Admin privileges required"}), 403
if conda_service.is_available():
environments = conda_service.refresh_environments()
return jsonify([env.to_dict() for env in environments])
else:
return jsonify({"error": "Conda not available"}), 503
@app.route("/api/script-groups/refresh", methods=["POST"])
@login_required
@require_permission("operator")
def api_refresh_script_groups():
"""Refresh all script groups - rediscover scripts."""
try:
# Re-discover all script groups
script_groups = script_discovery.scan_script_groups()
return jsonify(
{
"success": True,
"message": f"Refreshed {len(script_groups)} script groups",
"groups_count": len(script_groups),
}
)
except Exception as e:
return (
jsonify(
{
"success": False,
"error": f"Error refreshing script groups: {str(e)}",
}
),
500,
)
@app.route("/api/script-groups/<int:group_id>/refresh", methods=["GET", "POST"])
@login_required
@require_permission("operator")
def api_refresh_group_scripts(group_id):
"""Refresh scripts for a specific group."""
try:
group = ScriptGroup.query.get_or_404(group_id)
# Check if user can access this group
if not can_access_script(current_user.user_level, group.required_level):
return jsonify({"error": "Insufficient permissions"}), 403
# Re-discover scripts in this specific group
from pathlib import Path
group_path = Path(group.directory_path)
if group_path.exists():
script_discovery.discover_scripts_in_group(group, group_path)
db.session.commit()
# Count refreshed scripts
scripts_count = Script.query.filter_by(
group_id=group_id, is_active=True
).count()
return jsonify(
{
"success": True,
"message": f"Refreshed {scripts_count} scripts in group '{group.name}'",
"scripts_count": scripts_count,
}
)
else:
return (
jsonify(
{
"success": False,
"error": f"Group directory not found: {group.directory_path}",
}
),
404,
)
except Exception as e:
db.session.rollback()
return (
jsonify(
{
"success": False,
"error": f"Error refreshing group scripts: {str(e)}",
}
),
500,
)
@app.route("/api/projects", methods=["GET", "POST"])
@login_required
def api_projects():
if request.method == "POST":
data = request.json
project_name = data.get("project_name")
group_id = data.get("group_id")
description = data.get("description", "")
try:
project = data_manager.create_project(
current_user.id, group_id, project_name, description
)
return jsonify(project.to_dict()), 201
except ValueError as e:
return jsonify({"error": str(e)}), 400
else:
# GET - list user's projects
group_id = request.args.get("group_id", type=int)
projects = data_manager.list_user_projects(current_user.id, group_id)
return jsonify(projects)
@app.route("/api/projects/<int:project_id>", methods=["DELETE"])
@login_required
def api_delete_project(project_id):
try:
success = data_manager.delete_project(current_user.id, project_id)
if success:
return jsonify({"success": True})
else:
return jsonify({"error": "Project not found"}), 404
except ValueError as e:
return jsonify({"error": str(e)}), 400
@app.route("/api/projects/<int:project_id>", methods=["PUT"])
@login_required
def api_update_project(project_id):
"""Update project details."""
data = request.json
try:
project = UserProject.query.filter_by(
id=project_id, user_id=current_user.id
).first()
if not project:
return jsonify({"error": "Project not found"}), 404
if "project_name" in data:
project.project_name = data["project_name"]
if "description" in data:
project.description = data["description"]
project.last_accessed = datetime.utcnow()
db.session.commit()
return jsonify(project.to_dict())
except Exception as e:
db.session.rollback()
return jsonify({"error": str(e)}), 500
@app.route("/api/projects/<int:project_id>/set-active", methods=["POST"])
@login_required
def api_set_active_project(project_id):
"""Set active project for current session."""
try:
project = UserProject.query.filter_by(
id=project_id, user_id=current_user.id
).first()
if not project:
return jsonify({"error": "Project not found"}), 404
# Store in session
session["active_project_id"] = project_id
session["active_group_id"] = project.group_id
# Update last accessed
project.last_accessed = datetime.utcnow()
db.session.commit()
return jsonify({"success": True, "active_project": project.to_dict()})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/projects/active", methods=["GET"])
@login_required
def api_get_active_project():
"""Get current active project."""
active_project_id = session.get("active_project_id")
if not active_project_id:
return jsonify({"active_project": None})
project = UserProject.query.filter_by(
id=active_project_id, user_id=current_user.id
).first()
if not project:
# Clear invalid session data
session.pop("active_project_id", None)
session.pop("active_group_id", None)
return jsonify({"active_project": None})
return jsonify({"active_project": project.to_dict()})
@app.route("/api/projects/<int:project_id>/backup", methods=["POST"])
@login_required
def api_backup_project(project_id):
"""Create project data backup."""
try:
project = UserProject.query.filter_by(
id=project_id, user_id=current_user.id
).first()
if not project:
return jsonify({"error": "Project not found"}), 404
backup_path = data_manager.backup_project_data(
current_user.id, project.group_id, project.project_name
)
return jsonify(
{
"success": True,
"backup_path": str(backup_path),
"message": (
f"Project '{project.project_name}' " f"backed up successfully"
),
}
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/projects/<int:project_id>/files", methods=["GET"])
@login_required
def api_project_files(project_id):
"""List files in project directory."""
try:
project = UserProject.query.filter_by(
id=project_id, user_id=current_user.id
).first()
if not project:
return jsonify({"error": "Project not found"}), 404
files = data_manager.list_project_files(
current_user.id, project.group_id, project.project_name
)
return jsonify({"files": files})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route(
"/api/projects/<int:project_id>/files/<filename>",
methods=["GET", "POST", "DELETE"],
)
@login_required
def api_project_file(project_id, filename):
"""Manage individual project files."""
try:
project = UserProject.query.filter_by(
id=project_id, user_id=current_user.id
).first()
if not project:
return jsonify({"error": "Project not found"}), 404
if request.method == "GET":
content = data_manager.get_project_file_content(
current_user.id, project.group_id, project.project_name, filename
)
if content is None:
return jsonify({"error": "File not found"}), 404
return jsonify({"content": content})
elif request.method == "POST":
content = request.json.get("content", "")
success = data_manager.save_project_file_content(
current_user.id,
project.group_id,
project.project_name,
filename,
content,
)
if success:
return jsonify({"success": True})
else:
return jsonify({"error": "Failed to save file"}), 500
elif request.method == "DELETE":
# Delete file implementation
project_path = data_manager.get_user_project_path(
current_user.id, project.group_id, project.project_name
)
file_path = project_path / filename
if file_path.exists():
file_path.unlink()
return jsonify({"success": True})
else:
return jsonify({"error": "File not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/user/preferences", methods=["POST"])
@login_required
def api_update_preferences():
data = request.json
if "language" in data:
if translation_service.set_user_language(current_user, data["language"]):
db.session.commit()
if "theme" in data:
current_user.preferred_theme = data["theme"]
db.session.commit()
return jsonify({"success": True})
@app.route("/api/i18n/languages")
def api_languages():
return jsonify(translation_service.get_supported_languages())
@app.route("/api/i18n/<language>")
def api_translations(language):
translations = translation_service.get_all_translations(language)
return jsonify(translations)
# WebSocket events
@socketio.on("connect", namespace="/logs")
def handle_connect():
if current_user.is_authenticated:
join_room(f"user_{current_user.id}")
emit("connected", {"status": "Connected to logs"})
@socketio.on("disconnect", namespace="/logs")
def handle_disconnect():
if current_user.is_authenticated:
leave_room(f"user_{current_user.id}")
# Error handlers
@app.errorhandler(404)
def not_found(error):
return render_template("404.html"), 404
@app.errorhandler(403)
def forbidden(error):
return render_template("403.html"), 403
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
return render_template("500.html"), 500
# Health check endpoint
@app.route("/health")
def health_check():
return jsonify(
{
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"conda_available": conda_service.is_available(),
"active_scripts": len(script_executor.active_processes),
}
)
return socketio
def create_app(config_name="default"):
"""Create and configure Flask application."""
app = Flask(__name__)
# Load configuration
app.config.from_object(config[config_name])
# Add custom Jinja2 filters
@app.template_filter("fromjson")
def fromjson_filter(value):
"""Parse JSON string to Python object"""
try:
return json.loads(value) if isinstance(value, str) else value
except (json.JSONDecodeError, TypeError):
return {}
# Initialize database
db = init_db(app)
return app
# Create application instance for direct execution
if __name__ == "__main__":
app = create_app(os.getenv("FLASK_CONFIG", "default"))
# Register routes
socketio = register_routes(app)
# Development server
socketio.run(app, host="127.0.0.1", port=5000, debug=True)