import os from flask import ( Flask, render_template, request, jsonify, session, redirect, url_for, flash, ) from flask_login import login_user, logout_user, login_required, current_user from flask_socketio import SocketIO, emit, join_room, leave_room from werkzeug.security import check_password_hash import json from datetime import datetime from app.config.config import config from app.config.database import init_db, login_manager from app.config.permissions import ( require_permission, can_edit_metadata, can_access_script, ) from app.models import User, ScriptGroup, Script, UserProject, ExecutionLog from app.services.script_discovery import ScriptDiscoveryService from app.services.script_executor import ScriptExecutor from app.services.conda_service import CondaService from app.services.data_manager import DataManager from app.services.translation_service import TranslationService from app.services.backup_service import BackupService def register_routes(app): """Register all routes and WebSocket events for the application""" # Get services from app config db = app.extensions["sqlalchemy"] # Initialize services script_discovery = ScriptDiscoveryService() script_executor = ScriptExecutor(app) conda_service = CondaService() data_manager = DataManager() translation_service = TranslationService() backup_service = BackupService() # Register blueprints from app.routes.proxy_routes import proxy_bp app.register_blueprint(proxy_bp) # Initialize SocketIO socketio = SocketIO(app, cors_allowed_origins="*") # Helper functions for script description management def _load_script_description(description_path, language="en"): """Load script description from markdown file with language support.""" import os if not description_path: return "" # Try language-specific file first (e.g., description_es.md) base_path, ext = os.path.splitext(description_path) lang_path = f"{base_path}_{language}{ext}" # Try absolute paths first, then relative to script groups for path_to_try in [lang_path, description_path]: if os.path.isabs(path_to_try): abs_path = path_to_try else: # Assume relative to data/script_groups/ abs_path = os.path.join( app.config.get("DATA_PATH", "data"), "script_groups", path_to_try ) if os.path.exists(abs_path): try: with open(abs_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: app.logger.warning( f"Error reading description file {abs_path}: {e}" ) # Fallback to English if language-specific file doesn't exist if language != "en": return _load_script_description(description_path, "en") return "" def _save_script_description(description_path, content, language="en"): """Save script description to markdown file with language support.""" import os if not description_path: return False # Use language-specific file (e.g., description_es.md) base_path, ext = os.path.splitext(description_path) lang_path = f"{base_path}_{language}{ext}" # Determine absolute path if os.path.isabs(lang_path): abs_path = lang_path else: # Assume relative to data/script_groups/ abs_path = os.path.join( app.config.get("DATA_PATH", "data"), "script_groups", lang_path ) try: # Ensure directory exists os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, "w", encoding="utf-8") as f: f.write(content) return True except Exception as e: app.logger.error(f"Error saving description file {abs_path}: {e}") return False # User loader for Flask-Login @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # Template context processor for translations @app.context_processor def inject_translations(): if current_user.is_authenticated: user_lang = translation_service.get_user_language(current_user) translations = translation_service.get_all_translations(user_lang) return dict(t=translations, current_lang=user_lang) else: translations = translation_service.get_all_translations() return dict(t=translations, current_lang="en") # Routes @app.route("/") def index(): if current_user.is_authenticated: return redirect(url_for("dashboard")) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("username") password = request.form.get("password") user = User.query.filter_by(username=username).first() if user and user.check_password(password) and user.is_active: login_user(user) user.last_login = datetime.utcnow() db.session.commit() next_page = request.args.get("next") return ( redirect(next_page) if next_page else redirect(url_for("dashboard")) ) else: flash("Invalid username or password", "error") return render_template("login.html") @app.route("/logout") @login_required def logout(): logout_user() return redirect(url_for("login")) @app.route("/dashboard") @login_required def dashboard(): # Discover scripts on dashboard load script_groups = script_discovery.scan_script_groups() # Get user's active projects user_projects = ( data_manager.list_user_projects(current_user.id, None) # All groups if script_groups else [] ) return render_template( "dashboard.html", script_groups=script_groups, user_projects=user_projects ) @app.route("/script-group/") @login_required def script_group_view(group_id): group = ScriptGroup.query.get_or_404(group_id) # Check if user can access this group if not can_access_script(current_user.user_level, group.required_level): flash("Insufficient permissions to access this script group", "error") return redirect(url_for("dashboard")) scripts = Script.query.filter_by(group_id=group_id, is_active=True).all() # Filter scripts by user permissions accessible_scripts = [ script for script in scripts if can_access_script(current_user.user_level, script.required_level) ] # Get user's projects for this group user_projects = data_manager.list_user_projects(current_user.id, group_id) # Get user's language preference user_language = getattr(current_user, "language", "en") or "en" return render_template( "script_group.html", script_group=group, scripts=accessible_scripts, user_projects=user_projects, can_edit=can_edit_metadata(current_user.user_level), user_language=user_language, ) @app.route("/script//loading") @login_required def script_loading_page(script_id): """ Página de loading intermedia que se muestra mientras el script se inicia """ script = Script.query.get_or_404(script_id) # Check if user can access this script if not can_access_script(current_user.user_level, script.required_level): flash("Insufficient permissions to access this script", "error") return redirect(url_for("dashboard")) # Get active project for this script's group active_project_id = session.get("active_project_id") project = None if active_project_id: project = UserProject.query.filter_by( id=active_project_id, user_id=current_user.id, group_id=script.group_id, ).first() if not project: flash("No active project found for this script group", "error") return redirect(url_for("script_group_view", group_id=script.group_id)) # Generate proxy URL proxy_url = f"/project/{project.id}/script/{script.id}/user/{current_user.id}/" return render_template( "script_loading.html", script=script, script_group=script.script_group, project=project, proxy_url=proxy_url, ) # Administration routes @app.route("/admin") @login_required @require_permission("admin") def admin_dashboard(): return redirect(url_for("admin_users")) @app.route("/admin/users") @login_required @require_permission("admin") def admin_users(): users = User.query.all() return render_template("admin/users.html", users=users) @app.route("/admin/users/create", methods=["GET", "POST"]) @login_required @require_permission("admin") def admin_create_user(): if request.method == "POST": # Handle both JSON and form data if request.is_json: data = request.json else: data = request.form username = data.get("username") email = data.get("email") user_level = data.get("user_level") password = data.get("password") # Validate data if not all([username, email, user_level, password]): if request.is_json: return jsonify({"error": "All fields are required"}), 400 flash("All fields are required", "error") return render_template("admin/user_form.html") # Check if user exists if User.query.filter_by(username=username).first(): if request.is_json: return jsonify({"error": "Username already exists"}), 400 flash("Username already exists", "error") return render_template("admin/user_form.html") # Create new user from werkzeug.security import generate_password_hash new_user = User( username=username, email=email, user_level=user_level, password_hash=generate_password_hash(password), ) try: db.session.add(new_user) db.session.commit() if request.is_json: return jsonify({"message": "User created successfully"}), 201 flash("User created successfully", "success") return redirect(url_for("admin_users")) except Exception as e: db.session.rollback() if request.is_json: return jsonify({"error": str(e)}), 500 flash(f"Error creating user: {str(e)}", "error") return render_template("admin/user_form.html") @app.route("/admin/users//edit", methods=["GET", "POST"]) @login_required @require_permission("admin") def admin_edit_user(user_id): user = User.query.get_or_404(user_id) if request.method == "POST": # Handle both JSON and form data if request.is_json: data = request.json else: data = request.form # Update user data user.username = data.get("username", user.username) user.email = data.get("email", user.email) user.user_level = data.get("user_level", user.user_level) # Update password if provided new_password = data.get("password") if new_password: from werkzeug.security import generate_password_hash user.password_hash = generate_password_hash(new_password) try: db.session.commit() if request.is_json: return jsonify({"message": "User updated successfully"}) flash("User updated successfully", "success") return redirect(url_for("admin_users")) except Exception as e: db.session.rollback() if request.is_json: return jsonify({"error": str(e)}), 500 flash(f"Error updating user: {str(e)}", "error") return render_template("admin/user_form.html", user=user) @app.route("/admin/users//delete", methods=["POST"]) @login_required @require_permission("admin") def admin_delete_user(user_id): user = User.query.get_or_404(user_id) # Prevent deleting own account if user.id == current_user.id: if request.is_json: return jsonify({"error": "Cannot delete your own account"}), 400 flash("Cannot delete your own account", "error") return redirect(url_for("admin_users")) try: db.session.delete(user) db.session.commit() if request.is_json: return jsonify({"message": "User deleted successfully"}) flash("User deleted successfully", "success") except Exception as e: db.session.rollback() if request.is_json: return jsonify({"error": str(e)}), 500 flash(f"Error deleting user: {str(e)}", "error") return redirect(url_for("admin_users")) @app.route("/admin/conda") @login_required @require_permission("developer") def admin_conda(): """Conda environments management page.""" environments = [] conda_available = conda_service.is_available() if conda_available: try: environments = conda_service.refresh_environments() except Exception as e: flash(f"Error refreshing conda environments: {str(e)}", "error") return render_template( "admin/conda.html", environments=environments, conda_available=conda_available, ) @app.route("/admin/conda/refresh", methods=["POST"]) @login_required @require_permission("developer") def admin_conda_refresh(): """Refresh conda environments.""" try: if conda_service.is_available(): environments = conda_service.refresh_environments() flash(f"Refreshed {len(environments)} conda environments", "success") else: flash("Conda is not available on this system", "error") except Exception as e: flash(f"Error refreshing conda environments: {str(e)}", "error") return redirect(url_for("admin_conda")) @app.route("/admin/backup") @login_required @require_permission("admin") def admin_backup(): """Backup management page.""" try: backup_status = backup_service.get_backup_status() backups = backup_service.list_available_backups() except Exception as e: flash(f"Error getting backup information: {str(e)}", "error") backup_status = {} backups = [] return render_template( "admin/backup.html", backup_status=backup_status, backups=backups ) @app.route("/admin/backup/create", methods=["POST"]) @login_required @require_permission("admin") def admin_backup_create(): """Create manual backup.""" try: description = request.form.get("description", "Manual backup") backup_result = backup_service.create_manual_backup(description) flash( f"Backup created successfully: {backup_result['backup_file']}", "success", ) except Exception as e: flash(f"Error creating backup: {str(e)}", "error") return redirect(url_for("admin_backup")) @app.route("/admin/backup/delete/", methods=["POST"]) @login_required @require_permission("admin") def admin_backup_delete(backup_date): """Delete specific backup.""" try: if backup_service.delete_backup(backup_date): flash(f"Backup {backup_date} deleted successfully", "success") else: flash(f"Failed to delete backup {backup_date}", "error") except Exception as e: flash(f"Error deleting backup: {str(e)}", "error") return redirect(url_for("admin_backup")) # API Routes @app.route("/api/script-groups") @login_required def api_script_groups(): groups = ScriptGroup.query.filter_by(is_active=True).all() accessible_groups = [ group.to_dict() for group in groups if can_access_script(current_user.user_level, group.required_level) ] return jsonify(accessible_groups) @app.route("/api/script-groups//scripts") @login_required def api_group_scripts(group_id): group = ScriptGroup.query.get_or_404(group_id) if not can_access_script(current_user.user_level, group.required_level): return jsonify({"error": "Insufficient permissions"}), 403 scripts = Script.query.filter_by(group_id=group_id, is_active=True).all() accessible_scripts = [ script.to_dict() for script in scripts if can_access_script(current_user.user_level, script.required_level) ] return jsonify(accessible_scripts) @app.route("/api/script-groups/", methods=["GET"]) @login_required def api_get_group(group_id): """Get group details for editing.""" group = ScriptGroup.query.get_or_404(group_id) if not can_access_script(current_user.user_level, group.required_level): return jsonify({"error": "Insufficient permissions"}), 403 return jsonify(group.to_dict()) @app.route("/api/script-groups/", methods=["PUT"]) @login_required def api_update_group(group_id): """Update group settings.""" if current_user.user_level not in ["admin", "superuser"]: return jsonify({"error": "Admin privileges required"}), 403 group = ScriptGroup.query.get_or_404(group_id) data = request.json try: # Update allowed fields if "conda_environment" in data: group.conda_environment = data["conda_environment"] if "name" in data: group.name = data["name"] if "description" in data: group.description = data["description"] if "required_level" in data: group.required_level = data["required_level"] db.session.commit() # Update metadata.json file to prevent discovery service override if "conda_environment" in data: script_discovery.update_group_metadata_file( group, {"conda_environment": data["conda_environment"]} ) return jsonify({"success": True, "group": group.to_dict()}) except Exception as e: db.session.rollback() return jsonify({"error": str(e)}), 500 @app.route("/api/scripts//execute", methods=["POST"]) @login_required def api_execute_script(script_id): print( f"[API_EXEC] Execute request for script_id={script_id}, user={current_user.username}" ) script = Script.query.get_or_404(script_id) print( f"[API_EXEC] Found script: {script.filename} in group: {script.script_group.name}" ) if not can_access_script(current_user.user_level, script.required_level): print( f"[API_EXEC] Permission denied: user_level={current_user.user_level}, required={script.required_level}" ) return jsonify({"error": "Insufficient permissions"}), 403 parameters = request.json.get("parameters", {}) if request.json else {} project_id = request.json.get("project_id") if request.json else None print(f"[API_EXEC] Parameters: {parameters}") print(f"[API_EXEC] Project ID from request: {project_id}") # If no project_id specified, use active project from session if not project_id: print(f"[API_EXEC] Session contents: {dict(session)}") active_project_id = session.get("active_project_id") print(f"[API_EXEC] Active project ID from session: {active_project_id}") if active_project_id: # Verify the project belongs to current user and group active_project = UserProject.query.filter_by( id=active_project_id, user_id=current_user.id, group_id=script.group_id, ).first() if active_project: project_id = active_project_id print(f"[API_EXEC] Using verified active project: {project_id}") else: print(f"[API_EXEC] Active project verification failed") print(f"[API_EXEC] Searched for: id={active_project_id}, user_id={current_user.id}, group_id={script.group_id}") print(f"[API_EXEC] Final project_id: {project_id}") print(f"[API_EXEC] Calling script_executor.execute_script_with_proxy...") result = script_executor.execute_script_with_proxy( script_id, current_user.id, project_id, parameters ) print(f"[API_EXEC] Script executor result: {result}") print(f"[API_EXEC] About to return JSON response...") response = jsonify(result) print(f"[API_EXEC] JSON response created successfully") return response @app.route("/api/scripts//stop", methods=["POST"]) @login_required def api_stop_script(script_id): process_id = request.json.get("process_id") if not process_id: return jsonify({"error": "Process ID required"}), 400 success = script_executor.stop_script(process_id, current_user.id) return jsonify({"success": success}) @app.route("/api/scripts/check-interface", methods=["POST"]) @login_required def api_check_script_interface(): """Check if a script interface is ready""" url = request.json.get("url") if not url: return jsonify({"error": "URL required"}), 400 try: import requests # Try to make a simple request to the script interface response = requests.get(url, timeout=2) return jsonify( { "ready": response.status_code == 200, "status_code": response.status_code, } ) except requests.exceptions.RequestException: return jsonify({"ready": False, "status_code": None}) @app.route("/api/scripts/", methods=["GET"]) @login_required def api_get_script(script_id): """Get script details with multilingual descriptions.""" script = Script.query.get_or_404(script_id) # Check if user has access to the script group group_level = script.script_group.required_level if not can_access_script(current_user.user_level, group_level): return jsonify({"error": "Insufficient permissions"}), 403 # Get user's language preference user_language = getattr(current_user, "language", "en") or "en" # Load long description if available long_description = "" if script.description_long_path: long_description = _load_script_description( script.description_long_path, user_language ) script_data = script.to_dict() script_data["description_long"] = long_description script_data["user_language"] = user_language return jsonify(script_data) @app.route("/api/scripts/", methods=["PUT"]) @login_required def api_update_script(script_id): """Update script metadata (admin only).""" if not can_edit_metadata(current_user.user_level): return jsonify({"error": "Admin privileges required"}), 403 script = Script.query.get_or_404(script_id) # Check if user has access to the script group group_level = script.script_group.required_level if not can_access_script(current_user.user_level, group_level): return jsonify({"error": "Insufficient permissions"}), 403 data = request.get_json() try: # Update basic fields if "display_name" in data: script.display_name = data["display_name"].strip() if "description" in data: script.description = data["description"].strip() # Update long description if provided if "description_long" in data and script.description_long_path: _save_script_description( script.description_long_path, data["description_long"], data.get("language", "en"), ) # Update timestamp script.last_modified = datetime.utcnow() db.session.commit() return jsonify( { "success": True, "message": "Script updated successfully", "script": script.to_dict(), } ) except Exception as e: db.session.rollback() error_msg = f"Failed to update script: {str(e)}" return jsonify({"error": error_msg}), 500 @app.route("/api/scripts//logs", methods=["GET"]) @login_required def api_get_script_logs(script_id): """Get execution logs for a script.""" script = Script.query.get_or_404(script_id) # Check if user has access to the script group group_level = script.script_group.required_level if not can_access_script(current_user.user_level, group_level): return jsonify({"error": "Insufficient permissions"}), 403 # Get recent execution logs for this script logs = ( ExecutionLog.query.filter_by(script_id=script_id) .order_by(ExecutionLog.start_time.desc()) .limit(10) .all() ) log_data = [] for log in logs: log_dict = log.to_dict() # Combine output and error_output for display combined_output = "" if log.output: combined_output += log.output if log.error_output: if combined_output: combined_output += "\n--- STDERR ---\n" combined_output += log.error_output log_dict["combined_output"] = combined_output # Format for frontend consumption formatted_log = { "message": combined_output or "No output available", "level": log.status or "info", "timestamp": log.start_time.isoformat() if log.start_time else "", "raw_data": log_dict, # Include full log data for reference } log_data.append(formatted_log) return jsonify({"success": True, "logs": log_data}) @app.route("/api/conda/environments") @login_required def api_conda_environments(): """Get available conda environments.""" if current_user.user_level not in ["admin", "superuser"]: return jsonify({"error": "Admin privileges required"}), 403 if conda_service.is_available(): environments = conda_service.refresh_environments() return jsonify([env.to_dict() for env in environments]) else: return jsonify({"error": "Conda not available"}), 503 @app.route("/api/script-groups/refresh", methods=["POST"]) @login_required @require_permission("operator") def api_refresh_script_groups(): """Refresh all script groups - rediscover scripts.""" try: # Re-discover all script groups script_groups = script_discovery.scan_script_groups() return jsonify( { "success": True, "message": f"Refreshed {len(script_groups)} script groups", "groups_count": len(script_groups), } ) except Exception as e: return ( jsonify( { "success": False, "error": f"Error refreshing script groups: {str(e)}", } ), 500, ) @app.route("/api/script-groups//refresh", methods=["GET", "POST"]) @login_required @require_permission("operator") def api_refresh_group_scripts(group_id): """Refresh scripts for a specific group.""" try: group = ScriptGroup.query.get_or_404(group_id) # Check if user can access this group if not can_access_script(current_user.user_level, group.required_level): return jsonify({"error": "Insufficient permissions"}), 403 # Re-discover scripts in this specific group from pathlib import Path group_path = Path(group.directory_path) if group_path.exists(): script_discovery.discover_scripts_in_group(group, group_path) db.session.commit() # Count refreshed scripts scripts_count = Script.query.filter_by( group_id=group_id, is_active=True ).count() return jsonify( { "success": True, "message": f"Refreshed {scripts_count} scripts in group '{group.name}'", "scripts_count": scripts_count, } ) else: return ( jsonify( { "success": False, "error": f"Group directory not found: {group.directory_path}", } ), 404, ) except Exception as e: db.session.rollback() return ( jsonify( { "success": False, "error": f"Error refreshing group scripts: {str(e)}", } ), 500, ) @app.route("/api/projects", methods=["GET", "POST"]) @login_required def api_projects(): if request.method == "POST": data = request.json project_name = data.get("project_name") group_id = data.get("group_id") description = data.get("description", "") try: project = data_manager.create_project( current_user.id, group_id, project_name, description ) return jsonify(project.to_dict()), 201 except ValueError as e: return jsonify({"error": str(e)}), 400 else: # GET - list user's projects group_id = request.args.get("group_id", type=int) projects = data_manager.list_user_projects(current_user.id, group_id) return jsonify(projects) @app.route("/api/projects/", methods=["DELETE"]) @login_required def api_delete_project(project_id): try: success = data_manager.delete_project(current_user.id, project_id) if success: return jsonify({"success": True}) else: return jsonify({"error": "Project not found"}), 404 except ValueError as e: return jsonify({"error": str(e)}), 400 @app.route("/api/projects/", methods=["PUT"]) @login_required def api_update_project(project_id): """Update project details.""" data = request.json try: project = UserProject.query.filter_by( id=project_id, user_id=current_user.id ).first() if not project: return jsonify({"error": "Project not found"}), 404 if "project_name" in data: project.project_name = data["project_name"] if "description" in data: project.description = data["description"] project.last_accessed = datetime.utcnow() db.session.commit() return jsonify(project.to_dict()) except Exception as e: db.session.rollback() return jsonify({"error": str(e)}), 500 @app.route("/api/projects//set-active", methods=["POST"]) @login_required def api_set_active_project(project_id): """Set active project for current session.""" try: print(f"[SET_ACTIVE] Setting project {project_id} as active for user {current_user.id}") project = UserProject.query.filter_by( id=project_id, user_id=current_user.id ).first() if not project: print(f"[SET_ACTIVE] Project {project_id} not found for user {current_user.id}") return jsonify({"error": "Project not found"}), 404 # Store in session session["active_project_id"] = project_id session["active_group_id"] = project.group_id print(f"[SET_ACTIVE] Stored in session: active_project_id={project_id}, active_group_id={project.group_id}") print(f"[SET_ACTIVE] Session contents after setting: {dict(session)}") # Update last accessed project.last_accessed = datetime.utcnow() db.session.commit() return jsonify({"success": True, "active_project": project.to_dict()}) except Exception as e: print(f"[SET_ACTIVE] Error: {e}") return jsonify({"error": str(e)}), 500 @app.route("/api/projects/active", methods=["GET"]) @login_required def api_get_active_project(): """Get current active project.""" active_project_id = session.get("active_project_id") if not active_project_id: return jsonify({"active_project": None}) project = UserProject.query.filter_by( id=active_project_id, user_id=current_user.id ).first() if not project: # Clear invalid session data session.pop("active_project_id", None) session.pop("active_group_id", None) return jsonify({"active_project": None}) return jsonify({"active_project": project.to_dict()}) @app.route("/api/projects//backup", methods=["POST"]) @login_required def api_backup_project(project_id): """Create project data backup.""" try: project = UserProject.query.filter_by( id=project_id, user_id=current_user.id ).first() if not project: return jsonify({"error": "Project not found"}), 404 backup_path = data_manager.backup_project_data( current_user.id, project.group_id, project.project_name ) return jsonify( { "success": True, "backup_path": str(backup_path), "message": ( f"Project '{project.project_name}' " f"backed up successfully" ), } ) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/projects//files", methods=["GET"]) @login_required def api_project_files(project_id): """List files in project directory.""" try: project = UserProject.query.filter_by( id=project_id, user_id=current_user.id ).first() if not project: return jsonify({"error": "Project not found"}), 404 files = data_manager.list_project_files( current_user.id, project.group_id, project.project_name ) return jsonify({"files": files}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route( "/api/projects//files/", methods=["GET", "POST", "DELETE"], ) @login_required def api_project_file(project_id, filename): """Manage individual project files.""" try: project = UserProject.query.filter_by( id=project_id, user_id=current_user.id ).first() if not project: return jsonify({"error": "Project not found"}), 404 if request.method == "GET": content = data_manager.get_project_file_content( current_user.id, project.group_id, project.project_name, filename ) if content is None: return jsonify({"error": "File not found"}), 404 return jsonify({"content": content}) elif request.method == "POST": content = request.json.get("content", "") success = data_manager.save_project_file_content( current_user.id, project.group_id, project.project_name, filename, content, ) if success: return jsonify({"success": True}) else: return jsonify({"error": "Failed to save file"}), 500 elif request.method == "DELETE": # Delete file implementation project_path = data_manager.get_user_project_path( current_user.id, project.group_id, project.project_name ) file_path = project_path / filename if file_path.exists(): file_path.unlink() return jsonify({"success": True}) else: return jsonify({"error": "File not found"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/user/preferences", methods=["POST"]) @login_required def api_update_preferences(): data = request.json if "language" in data: if translation_service.set_user_language(current_user, data["language"]): db.session.commit() if "theme" in data: current_user.preferred_theme = data["theme"] db.session.commit() return jsonify({"success": True}) @app.route("/api/i18n/languages") def api_languages(): return jsonify(translation_service.get_supported_languages()) @app.route("/api/i18n/") def api_translations(language): translations = translation_service.get_all_translations(language) return jsonify(translations) # WebSocket events @socketio.on("connect", namespace="/logs") def handle_connect(): if current_user.is_authenticated: join_room(f"user_{current_user.id}") emit("connected", {"status": "Connected to logs"}) @socketio.on("disconnect", namespace="/logs") def handle_disconnect(): if current_user.is_authenticated: leave_room(f"user_{current_user.id}") # Error handlers @app.errorhandler(404) def not_found(error): return render_template("404.html"), 404 @app.errorhandler(403) def forbidden(error): return render_template("403.html"), 403 @app.errorhandler(500) def internal_error(error): db.session.rollback() return render_template("500.html"), 500 # Health check endpoint @app.route("/health") def health_check(): return jsonify( { "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "conda_available": conda_service.is_available(), "active_scripts": len(script_executor.active_processes), } ) return socketio def create_app(config_name="default"): """Create and configure Flask application.""" app = Flask(__name__) # Load configuration config_obj = config[config_name] app.config.from_object(config_obj) # Apply security configuration if present if hasattr(config_obj, 'SECURITY_CONFIG'): security_config = config_obj.SECURITY_CONFIG # Apply session cookie settings if 'session_cookie_secure' in security_config: app.config['SESSION_COOKIE_SECURE'] = security_config['session_cookie_secure'] if 'session_cookie_httponly' in security_config: app.config['SESSION_COOKIE_HTTPONLY'] = security_config['session_cookie_httponly'] if 'session_cookie_samesite' in security_config: app.config['SESSION_COOKIE_SAMESITE'] = security_config['session_cookie_samesite'] # Add custom Jinja2 filters @app.template_filter("fromjson") def fromjson_filter(value): """Parse JSON string to Python object""" try: return json.loads(value) if isinstance(value, str) else value except (json.JSONDecodeError, TypeError): return {} # Initialize database db = init_db(app) return app # Create application instance for direct execution if __name__ == "__main__": app = create_app(os.getenv("FLASK_CONFIG", "default")) # Register routes socketio = register_routes(app) # Development server socketio.run(app, host="127.0.0.1", port=5000, debug=True)