diff --git a/.blackboxrules b/.blackboxrules new file mode 100644 index 0000000..e69de29 diff --git a/app.py b/app.py index e574f12..b0a6b56 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,6 @@ import os -from flask import Flask +from datetime import datetime +from flask import Flask, render_template from flask_login import LoginManager from flask_bcrypt import Bcrypt from flask_session import Session @@ -10,108 +11,149 @@ from config import config # Inicialización de extensiones login_manager = LoginManager() -login_manager.login_view = 'auth.login' -login_manager.login_message = 'Por favor inicie sesión para acceder a esta página.' -login_manager.login_message_category = 'warning' +login_manager.login_view = "auth.login" +login_manager.login_message = "Por favor inicie sesión para acceder a esta página." +login_manager.login_message_category = "warning" bcrypt = Bcrypt() csrf = CSRFProtect() session = Session() cache = Cache() + def create_app(config_name=None): """Fábrica de aplicación Flask.""" if config_name is None: - config_name = os.environ.get('FLASK_ENV', 'default') - + config_name = os.environ.get("FLASK_ENV", "default") + app = Flask(__name__) app.config.from_object(config[config_name]) config[config_name].init_app(app) - + # Inicializar extensiones login_manager.init_app(app) bcrypt.init_app(app) csrf.init_app(app) session.init_app(app) cache.init_app(app) - + # Importar y registrar blueprints + from routes.main_routes import main_bp from routes.auth_routes import auth_bp from routes.user_routes import users_bp from routes.project_routes import projects_bp from routes.document_routes import documents_bp from routes.schema_routes import schemas_bp from routes.admin_routes import admin_bp - + + app.register_blueprint(main_bp) app.register_blueprint(auth_bp) app.register_blueprint(users_bp) app.register_blueprint(projects_bp) app.register_blueprint(documents_bp) app.register_blueprint(schemas_bp) app.register_blueprint(admin_bp) - + # Configurar cargador de usuario para Flask-Login from services.user_service import get_user_by_id - + @login_manager.user_loader def load_user(user_id): return get_user_by_id(user_id) - + # Registrar handlers para errores register_error_handlers(app) - + + # Context processor para variables globales en templates + @app.context_processor + def inject_globals(): + return {"now": datetime.now()} + # Asegurar que existen los directorios de almacenamiento initialize_storage_structure(app) - + + # Configurar sistema de logging + from utils.logger import setup_logger + + setup_logger(app) + + # Inicializar datos por defecto + with app.app_context(): + # Inicializar usuario administrador si no existe + from services.auth_service import initialize_admin_user + + initialize_admin_user() + + # Inicializar esquemas predeterminados si no existen + from services.schema_service import initialize_default_schemas + + initialize_default_schemas() + return app + def register_error_handlers(app): """Registrar manejadores de errores para la aplicación.""" - + @app.errorhandler(404) def page_not_found(e): - return render_template('error.html', - error_code=404, - error_message="Página no encontrada"), 404 - + return ( + render_template( + "error.html", error_code=404, error_message="Página no encontrada" + ), + 404, + ) + @app.errorhandler(403) def forbidden(e): - return render_template('error.html', - error_code=403, - error_message="Acceso denegado"), 403 - + return ( + render_template( + "error.html", error_code=403, error_message="Acceso denegado" + ), + 403, + ) + @app.errorhandler(500) def internal_error(e): - return render_template('error.html', - error_code=500, - error_message="Error interno del servidor"), 500 + return ( + render_template( + "error.html", error_code=500, error_message="Error interno del servidor" + ), + 500, + ) + def initialize_storage_structure(app): """Crear estructura de almacenamiento si no existe.""" - storage_path = app.config['STORAGE_PATH'] - + storage_path = app.config["STORAGE_PATH"] + # Crear directorios principales - os.makedirs(os.path.join(storage_path, 'logs'), exist_ok=True) - os.makedirs(os.path.join(storage_path, 'schemas'), exist_ok=True) - os.makedirs(os.path.join(storage_path, 'users'), exist_ok=True) - os.makedirs(os.path.join(storage_path, 'filetypes'), exist_ok=True) - os.makedirs(os.path.join(storage_path, 'projects'), exist_ok=True) - + os.makedirs(os.path.join(storage_path, "logs"), exist_ok=True) + os.makedirs(os.path.join(storage_path, "schemas"), exist_ok=True) + os.makedirs(os.path.join(storage_path, "users"), exist_ok=True) + os.makedirs(os.path.join(storage_path, "filetypes"), exist_ok=True) + os.makedirs(os.path.join(storage_path, "projects"), exist_ok=True) + os.makedirs(os.path.join(storage_path, "exports"), exist_ok=True) + # Inicializar archivos JSON si no existen - init_json_file(os.path.join(storage_path, 'schemas', 'schema.json'), {}) - init_json_file(os.path.join(storage_path, 'users', 'users.json'), {}) - init_json_file(os.path.join(storage_path, 'filetypes', 'filetypes.json'), {}) - init_json_file(os.path.join(storage_path, 'indices.json'), - {"max_project_id": 0, "max_document_id": 0}) + init_json_file(os.path.join(storage_path, "schemas", "schema.json"), {}) + init_json_file(os.path.join(storage_path, "users", "users.json"), {}) + init_json_file(os.path.join(storage_path, "filetypes", "filetypes.json"), {}) + init_json_file( + os.path.join(storage_path, "indices.json"), + {"max_project_id": 0, "max_document_id": 0}, + ) + def init_json_file(file_path, default_content): """Inicializar un archivo JSON si no existe.""" if not os.path.exists(file_path): import json - with open(file_path, 'w', encoding='utf-8') as f: + + with open(file_path, "w", encoding="utf-8") as f: json.dump(default_content, f, ensure_ascii=False, indent=2) -if __name__ == '__main__': - from flask import render_template + +if __name__ == "__main__": app = create_app() - app.run(debug=True) \ No newline at end of file + app.run(debug=True) diff --git a/config.py b/config.py index 0ec1306..f687958 100644 --- a/config.py +++ b/config.py @@ -5,82 +5,91 @@ from dotenv import load_dotenv # Cargar variables de entorno desde archivo .env load_dotenv() +# Directorio base donde se ejecuta la aplicación +basedir = os.path.abspath(os.path.dirname(__file__)) + + class Config: - """Configuración base para la aplicación.""" - # Configuración de Flask - SECRET_KEY = os.environ.get('SECRET_KEY') or 'clave-secreta-predeterminada' - STORAGE_PATH = os.environ.get('STORAGE_PATH') or 'storage' - - # Configuración de sesión - SESSION_TYPE = 'filesystem' - SESSION_FILE_DIR = os.path.join(STORAGE_PATH, 'sessions') + """Configuración base.""" + + # Seguridad + SECRET_KEY = os.environ.get("SECRET_KEY") or "clave-secreta-por-defecto" + BCRYPT_LOG_ROUNDS = 12 + WTF_CSRF_ENABLED = True + + # Sesión + SESSION_TYPE = "filesystem" + SESSION_FILE_DIR = os.path.join(basedir, "flask_session") SESSION_PERMANENT = True - PERMANENT_SESSION_LIFETIME = timedelta(hours=8) - - # Configuración de carga de archivos - MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 100MB límite global - UPLOAD_FOLDER = os.path.join(STORAGE_PATH, 'projects') - - # Configuración de caché - CACHE_TYPE = 'SimpleCache' + PERMANENT_SESSION_LIFETIME = timedelta(days=1) + SESSION_USE_SIGNER = True + + # Caché + CACHE_TYPE = "SimpleCache" CACHE_DEFAULT_TIMEOUT = 300 - - # Configuración de logging - LOG_DIR = os.path.join(STORAGE_PATH, 'logs') - + + # Almacenamiento + STORAGE_PATH = os.environ.get("STORAGE_PATH") or os.path.join(basedir, "storage") + MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50 MB + + # Configuración de la aplicación + APP_NAME = "Arch" + ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL") or "admin@example.com" + ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD") or "admin" + @staticmethod def init_app(app): """Inicialización adicional de la aplicación.""" - # Asegurar que existen directorios necesarios os.makedirs(Config.SESSION_FILE_DIR, exist_ok=True) - os.makedirs(Config.LOG_DIR, exist_ok=True) class DevelopmentConfig(Config): - """Configuración para entorno de desarrollo.""" + """Configuración para desarrollo.""" + DEBUG = True - + TESTING = False + class TestingConfig(Config): - """Configuración para entorno de pruebas.""" + """Configuración para pruebas.""" + TESTING = True - STORAGE_PATH = 'test_storage' - WTF_CSRF_ENABLED = False # Deshabilitar CSRF para pruebas - + DEBUG = True + WTF_CSRF_ENABLED = False + BCRYPT_LOG_ROUNDS = 4 # más rápido para pruebas + STORAGE_PATH = os.path.join(basedir, "test_storage") + class ProductionConfig(Config): - """Configuración para entorno de producción.""" + """Configuración para producción.""" + DEBUG = False TESTING = False - - @classmethod - def init_app(cls, app): + + # Override secret key in production + SECRET_KEY = os.environ.get("SECRET_KEY") + + # Opciones de seguridad más estrictas + SESSION_COOKIE_SECURE = True + SESSION_COOKIE_HTTPONLY = True + REMEMBER_COOKIE_SECURE = True + REMEMBER_COOKIE_HTTPONLY = True + + @staticmethod + def init_app(app): Config.init_app(app) - - # Configuración adicional para producción - import logging - from logging.handlers import RotatingFileHandler - - # Configurar handler para errores - file_handler = RotatingFileHandler( - os.path.join(cls.LOG_DIR, 'arch.log'), - maxBytes=10 * 1024 * 1024, # 10MB - backupCount=10 - ) - file_handler.setFormatter(logging.Formatter( - '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]' - )) - file_handler.setLevel(logging.INFO) - app.logger.addHandler(file_handler) - - app.logger.setLevel(logging.INFO) - app.logger.info('ARCH inicializado') + + # Verificar SECRET_KEY + if app.config["SECRET_KEY"] == "clave-secreta-por-defecto": + import warnings + + warnings.warn("SECRET_KEY no configurada para producción!") -# Diccionario con las configuraciones disponibles +# Mapeo de configuraciones config = { - 'development': DevelopmentConfig, - 'testing': TestingConfig, - 'production': ProductionConfig, - 'default': DevelopmentConfig -} \ No newline at end of file + "development": DevelopmentConfig, + "testing": TestingConfig, + "production": ProductionConfig, + "default": DevelopmentConfig, +} diff --git a/middleware/__init__.py b/middleware/__init__.py index e69de29..9d141ea 100644 --- a/middleware/__init__.py +++ b/middleware/__init__.py @@ -0,0 +1,7 @@ +""" +Paquete de middleware para la aplicación ARCH. + +Este paquete contiene los módulos que implementan funcionalidades +intermedias entre las solicitudes HTTP y las respuestas, como +autenticación, verificación de permisos y registro de actividad. +""" diff --git a/middleware/auth_middleware.py b/middleware/auth_middleware.py index e69de29..ab194d4 100644 --- a/middleware/auth_middleware.py +++ b/middleware/auth_middleware.py @@ -0,0 +1,121 @@ +from functools import wraps +from flask import request, redirect, url_for, flash, current_app +from flask_login import current_user + +def login_required_api(f): + """ + Decorador para verificar autenticación en endpoints de API. + A diferencia del decorador estándar de Flask-Login, este devuelve + una respuesta JSON en lugar de redirigir. + + Args: + f: Función a decorar + + Returns: + function: Función decorada + """ + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated: + return { + 'success': False, + 'message': 'Autenticación requerida', + 'code': 401 + }, 401 + return f(*args, **kwargs) + return decorated_function + +def check_permission(required_level): + """ + Decorador para verificar nivel de permisos en endpoints de API. + + Args: + required_level (int): Nivel mínimo requerido + + Returns: + function: Decorador configurado + """ + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated: + return { + 'success': False, + 'message': 'Autenticación requerida', + 'code': 401 + }, 401 + + if not current_user.has_permission(required_level): + return { + 'success': False, + 'message': 'No tiene permisos suficientes para esta acción', + 'code': 403 + }, 403 + + return f(*args, **kwargs) + return decorated_function + return decorator + +def check_active_user(f): + """ + Decorador para verificar que el usuario está activo y no ha expirado. + + Args: + f: Función a decorar + + Returns: + function: Función decorada + """ + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated: + return redirect(url_for('auth.login')) + + if not current_user.is_active(): + flash('Su cuenta está desactivada. Contacte al administrador.', 'danger') + return redirect(url_for('auth.logout')) + + if current_user.is_expired(): + flash('Su cuenta ha expirado. Contacte al administrador.', 'danger') + return redirect(url_for('auth.logout')) + + return f(*args, **kwargs) + return decorated_function + +def log_activity(activity_type): + """ + Decorador para registrar actividad del usuario. + + Args: + activity_type (str): Tipo de actividad a registrar + + Returns: + function: Decorador configurado + """ + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + # Ejecutar la función original + result = f(*args, **kwargs) + + # Registrar actividad si el usuario está autenticado + if current_user.is_authenticated: + from utils.logger import log_user_activity + + # Extraer información relevante + user_id = current_user.id + ip_address = request.remote_addr + user_agent = request.user_agent.string + + # Registrar actividad + log_user_activity( + user_id=user_id, + activity_type=activity_type, + ip_address=ip_address, + user_agent=user_agent, + details=kwargs + ) + + return result + return decorated_function + return decorator diff --git a/middleware/permission_check.py b/middleware/permission_check.py index e69de29..54f925e 100644 --- a/middleware/permission_check.py +++ b/middleware/permission_check.py @@ -0,0 +1,229 @@ +from functools import wraps +from flask import render_template, abort, current_app +from flask_login import current_user + +def permission_required(min_level): + """ + Decorador para verificar nivel de permisos en rutas web. + + Args: + min_level (int): Nivel mínimo requerido + + Returns: + function: Decorador configurado + """ + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated: + return current_app.login_manager.unauthorized() + + if not current_user.has_permission(min_level): + return render_template('error.html', + error_code=403, + error_message="No tiene permisos suficientes para acceder a esta página."), 403 + + return f(*args, **kwargs) + return decorated_function + return decorator + +def project_access_required(access_type='view'): + """ + Decorador para verificar acceso a un proyecto específico. + + Args: + access_type (str): Tipo de acceso ('view' o 'edit') + + Returns: + function: Decorador configurado + """ + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated: + return current_app.login_manager.unauthorized() + + # Obtener ID del proyecto de los argumentos + project_id = kwargs.get('project_id') + + if not project_id: + abort(400, description="ID de proyecto no proporcionado") + + # Verificar permisos específicos del proyecto + from services.project_service import get_project + + project = get_project(project_id) + + if not project: + abort(404, description="Proyecto no encontrado") + + # Verificar si el proyecto está activo + if project.get('estado') != 'activo' and not current_user.has_permission(9000): + return render_template('error.html', + error_code=403, + error_message="El proyecto no está activo."), 403 + + # Verificar permisos específicos + # Administradores siempre tienen acceso + if current_user.has_permission(9000): + return f(*args, **kwargs) + + # Verificar permisos en el archivo de permisos del proyecto + from utils.file_utils import load_json_file + import os + + storage_path = current_app.config['STORAGE_PATH'] + project_dir = os.path.join(storage_path, 'projects', project.get('directory', '')) + permissions_file = os.path.join(project_dir, 'permissions.json') + + permissions = load_json_file(permissions_file, {}) + + # Verificar permisos específicos del usuario + user_permissions = permissions.get(current_user.id, {}) + + if access_type == 'edit' and user_permissions.get('can_edit', False): + return f(*args, **kwargs) + + if access_type == 'view' and user_permissions.get('can_view', False): + return f(*args, **kwargs) + + # Verificar nivel general requerido según el esquema + schema_code = project.get('esquema') + + if not schema_code: + return render_template('error.html', + error_code=403, + error_message="No tiene permisos para acceder a este proyecto."), 403 + + from services.schema_service import get_schema + + schema = get_schema(schema_code) + + if not schema: + return render_template('error.html', + error_code=403, + error_message="Esquema no encontrado."), 403 + + # Nivel mínimo para ver/editar según el esquema + min_level_view = 0 # Por defecto, cualquier usuario autenticado puede ver + min_level_edit = 5000 # Por defecto, se requiere nivel de gestor para editar + + # Si hay configuración específica en el esquema, usarla + if 'nivel_ver' in schema: + min_level_view = schema.get('nivel_ver', 0) + + if 'nivel_editar' in schema: + min_level_edit = schema.get('nivel_editar', 5000) + + # Verificar nivel según tipo de acceso + if access_type == 'edit' and current_user.has_permission(min_level_edit): + return f(*args, **kwargs) + + if access_type == 'view' and current_user.has_permission(min_level_view): + return f(*args, **kwargs) + + # Si llega aquí, no tiene permisos + return render_template('error.html', + error_code=403, + error_message="No tiene permisos para acceder a este proyecto."), 403 + + return decorated_function + return decorator + +def document_access_required(access_type='view'): + """ + Decorador para verificar acceso a un documento específico. + + Args: + access_type (str): Tipo de acceso ('view' o 'edit') + + Returns: + function: Decorador configurado + """ + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated: + return current_app.login_manager.unauthorized() + + # Obtener IDs de proyecto y documento + project_id = kwargs.get('project_id') + document_id = kwargs.get('document_id') + + if not project_id or not document_id: + abort(400, description="ID de proyecto o documento no proporcionado") + + # Verificar permisos del proyecto primero + from services.project_service import get_project + + project = get_project(project_id) + + if not project: + abort(404, description="Proyecto no encontrado") + + # Verificar si el proyecto está activo + if project.get('estado') != 'activo' and not current_user.has_permission(9000): + return render_template('error.html', + error_code=403, + error_message="El proyecto no está activo."), 403 + + # Administradores siempre tienen acceso + if current_user.has_permission(9000): + return f(*args, **kwargs) + + # Obtener información del documento + from services.document_service import get_document + + document = get_document(project_id, document_id) + + if not document: + abort(404, description="Documento no encontrado") + + # Obtener el esquema del proyecto + schema_code = project.get('esquema') + + if not schema_code: + return render_template('error.html', + error_code=403, + error_message="No tiene permisos para acceder a este documento."), 403 + + # Cargar el esquema específico del proyecto + from utils.file_utils import load_json_file + import os + + storage_path = current_app.config['STORAGE_PATH'] + project_dir = os.path.join(storage_path, 'projects', project.get('directory', '')) + project_schema_file = os.path.join(project_dir, 'schema.json') + + schema = load_json_file(project_schema_file) + + if not schema: + # Si no hay esquema específico, usar el general + from services.schema_service import get_schema + schema = get_schema(schema_code) + + if not schema: + return render_template('error.html', + error_code=403, + error_message="Esquema no encontrado."), 403 + + # Obtener el tipo de documento + document_name = document.get('document_id', '').split('_', 1)[1] if '_' in document.get('document_id', '') else '' + + # Buscar configuración de permisos para este tipo de documento + for doc_type in schema.get('documentos', []): + if doc_type.get('nombre', '').lower().replace(' ', '_') == document_name: + # Verificar nivel según tipo de acceso + if access_type == 'edit' and current_user.has_permission(doc_type.get('nivel_editar', 5000)): + return f(*args, **kwargs) + + if access_type == 'view' and current_user.has_permission(doc_type.get('nivel_ver', 0)): + return f(*args, **kwargs) + + # Si no se encontró configuración específica o no tiene permisos + return render_template('error.html', + error_code=403, + error_message="No tiene permisos para acceder a este documento."), 403 + + return decorated_function + return decorator diff --git a/requirements.txt b/requirements.txt index dcd2413..837381f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,7 +27,8 @@ Flask-Session==0.5.0 requests==2.31.0 # Validación de archivos -python-magic==0.4.27 +python-magic-bin; platform_system == "Windows" +python-magic; platform_system != "Windows" # Mejoras de desarrollo y mantenimiento loguru==0.7.0 diff --git a/routes/__init__.py b/routes/__init__.py index e69de29..bb7f589 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -0,0 +1,6 @@ +""" +Paquete de rutas para la aplicación ARCH. + +Este paquete contiene los blueprints de Flask que definen las rutas +y controladores para las diferentes secciones de la aplicación. +""" diff --git a/routes/admin_routes.py b/routes/admin_routes.py index e69de29..3865e43 100644 --- a/routes/admin_routes.py +++ b/routes/admin_routes.py @@ -0,0 +1,288 @@ +import os +import json +from flask import ( + Blueprint, + render_template, + redirect, + url_for, + flash, + request, + jsonify, + current_app, +) +from flask_login import login_required, current_user +from flask_wtf import FlaskForm +from wtforms import StringField, IntegerField, TextAreaField, SubmitField +from wtforms.validators import DataRequired, Length, NumberRange + +from services.index_service import build_search_index, get_index_stats +from services.user_service import get_user_stats +from utils.security import permission_required +from utils.logger import get_audit_logs +from utils.file_utils import get_directory_size + +# Definir Blueprint +admin_bp = Blueprint("admin", __name__, url_prefix="/admin") + + +# Formularios +class FileTypeForm(FlaskForm): + """Formulario para tipo de archivo.""" + + extension = StringField("Extensión", validators=[DataRequired(), Length(1, 10)]) + descripcion = StringField( + "Descripción", validators=[DataRequired(), Length(1, 100)] + ) + mime_type = StringField("MIME Type", validators=[DataRequired(), Length(1, 100)]) + tamano_maximo = IntegerField( + "Tamaño máximo (bytes)", validators=[NumberRange(min=1)], default=10485760 + ) + submit = SubmitField("Guardar") + + +# Rutas +@admin_bp.route("/") +@login_required +@permission_required(9000) # Solo administradores +def dashboard(): + """Panel de administración.""" + # Obtener estadísticas + user_stats = get_user_stats() + index_stats = get_index_stats() + + # Obtener tamaño del almacenamiento + storage_path = os.path.join(os.getcwd(), "storage") + storage_size = get_directory_size(storage_path) // (1024 * 1024) # Convertir a MB + + return render_template( + "admin/dashboard.html", + user_stats=user_stats, + index_stats=index_stats, + storage_size=storage_size, + ) + + +@admin_bp.route("/filetypes") +@login_required +@permission_required(9000) # Solo administradores +def filetypes(): + """Gestión de tipos de archivo.""" + # Cargar tipos de archivo + storage_path = os.path.join(os.getcwd(), "storage") + filetypes_file = os.path.join(storage_path, "filetypes", "filetypes.json") + + with open(filetypes_file, "r", encoding="utf-8") as f: + filetypes = json.load(f) + + form = FileTypeForm() + + return render_template("admin/filetypes.html", filetypes=filetypes, form=form) + + +@admin_bp.route("/filetypes/add", methods=["POST"]) +@login_required +@permission_required(9000) # Solo administradores +def add_filetype(): + """Añadir nuevo tipo de archivo.""" + form = FileTypeForm() + + if form.validate_on_submit(): + # Cargar tipos de archivo actuales + storage_path = os.path.join(os.getcwd(), "storage") + filetypes_file = os.path.join(storage_path, "filetypes", "filetypes.json") + + with open(filetypes_file, "r", encoding="utf-8") as f: + filetypes = json.load(f) + + # Verificar si ya existe + extension = form.extension.data.lower() + if extension in filetypes: + flash(f"El tipo de archivo '{extension}' ya existe.", "danger") + return redirect(url_for("admin.filetypes")) + + # Añadir nuevo tipo + filetypes[extension] = { + "extension": extension, + "descripcion": form.descripcion.data, + "mime_type": form.mime_type.data, + "tamano_maximo": form.tamano_maximo.data, + } + + # Guardar cambios + with open(filetypes_file, "w", encoding="utf-8") as f: + json.dump(filetypes, f, ensure_ascii=False, indent=2) + + flash(f"Tipo de archivo '{extension}' añadido correctamente.", "success") + else: + for field, errors in form.errors.items(): + for error in errors: + flash(f"{getattr(form, field).label.text}: {error}", "danger") + + return redirect(url_for("admin.filetypes")) + + +@admin_bp.route("/filetypes/delete/", methods=["POST"]) +@login_required +@permission_required(9000) # Solo administradores +def delete_filetype(extension): + """Eliminar tipo de archivo.""" + # Cargar tipos de archivo actuales + storage_path = os.path.join(os.getcwd(), "storage") + filetypes_file = os.path.join(storage_path, "filetypes", "filetypes.json") + + with open(filetypes_file, "r", encoding="utf-8") as f: + filetypes = json.load(f) + + # Verificar si existe + if extension not in filetypes: + flash(f"El tipo de archivo '{extension}' no existe.", "danger") + return redirect(url_for("admin.filetypes")) + + # Eliminar tipo + del filetypes[extension] + + # Guardar cambios + with open(filetypes_file, "w", encoding="utf-8") as f: + json.dump(filetypes, f, ensure_ascii=False, indent=2) + + flash(f"Tipo de archivo '{extension}' eliminado correctamente.", "success") + return redirect(url_for("admin.filetypes")) + + +@admin_bp.route("/system") +@login_required +@permission_required(9000) # Solo administradores +def system(): + """Estado del sistema.""" + import sys + + storage_path = current_app.config["STORAGE_PATH"] + + # Define a helper function to get directory size + def get_directory_size(path): + total_size = 0 + if os.path.exists(path): + for dirpath, dirnames, filenames in os.walk(path): + for f in filenames: + fp = os.path.join(dirpath, f) + # skip if it is symbolic link + if not os.path.islink(fp): + total_size += os.path.getsize(fp) + return total_size + + # Recopilar estadísticas del sistema + stats = { + "storage_size": get_directory_size(storage_path), + "projects_count": ( + len(os.listdir(os.path.join(storage_path, "projects"))) + if os.path.exists(os.path.join(storage_path, "projects")) + else 0 + ), + "log_size": get_directory_size(os.path.join(storage_path, "logs")), + "python_version": sys.version, + "platform": sys.platform, + } + + return render_template("admin/system.html", stats=stats) + + +@admin_bp.route("/system/clear_logs", methods=["POST"]) +@login_required +@permission_required(9000) # Solo administradores +def clear_logs(): + """Limpiar logs del sistema.""" + storage_path = current_app.config["STORAGE_PATH"] + logs_path = os.path.join(storage_path, "logs") + + try: + for filename in os.listdir(logs_path): + file_path = os.path.join(logs_path, filename) + if os.path.isfile(file_path) and not filename.endswith(".log"): + os.unlink(file_path) + + flash("Archivos de log rotados eliminados correctamente.", "success") + except Exception as e: + flash(f"Error al limpiar logs: {str(e)}", "danger") + + return redirect(url_for("admin.system")) + + +@admin_bp.route("/initialize") +@login_required +@permission_required(9000) # Solo administradores +def initialize(): + """Inicializar o reiniciar el sistema.""" + from services.filetype_service import initialize_default_filetypes + from services.schema_service import initialize_default_schemas + + try: + initialize_default_filetypes() + initialize_default_schemas() + + flash("Sistema inicializado correctamente.", "success") + except Exception as e: + flash(f"Error al inicializar el sistema: {str(e)}", "danger") + + return redirect(url_for("admin.system")) + + +@admin_bp.route("/system/rebuild-index", methods=["POST"]) +@login_required +@permission_required(9000) # Solo administradores +def rebuild_index(): + """Reconstruir índice de búsqueda.""" + success, message = build_search_index() + + if success: + flash(message, "success") + else: + flash(message, "danger") + + return redirect(url_for("admin.system")) + + +@admin_bp.route("/audit-logs") +@login_required +@permission_required(9000) # Solo administradores +def audit_logs(): + """Ver logs de auditoría.""" + # Obtener parámetros de filtrado + filters = { + "user_id": request.args.get("user_id"), + "activity_type": request.args.get("activity_type"), + "start_date": request.args.get("start_date"), + "end_date": request.args.get("end_date"), + } + + # Filtrar logs vacíos + filters = {k: v for k, v in filters.items() if v} + + # Obtener logs + logs = get_audit_logs(filters, limit=100) + + return render_template("admin/audit_logs.html", logs=logs, filters=filters) + + +@admin_bp.route("/api/system-info") +@login_required +@permission_required(9000) # Solo administradores +def api_system_info(): + """API para obtener información del sistema.""" + # Obtener información del sistema + storage_path = os.path.join(os.getcwd(), "storage") + + # Tamaños de directorios + storage_info = { + "total": get_directory_size(storage_path) // (1024 * 1024), # MB + "logs": get_directory_size(os.path.join(storage_path, "logs")) // (1024 * 1024), + "projects": get_directory_size(os.path.join(storage_path, "projects")) + // (1024 * 1024), + } + + # Estadísticas de usuarios + user_stats = get_user_stats() + + # Estadísticas de índice + index_stats = get_index_stats() + + return jsonify({"storage": storage_info, "users": user_stats, "index": index_stats}) diff --git a/routes/main_routes.py b/routes/main_routes.py new file mode 100644 index 0000000..b1001c1 --- /dev/null +++ b/routes/main_routes.py @@ -0,0 +1,22 @@ +from flask import Blueprint, render_template, redirect, url_for +from flask_login import current_user + +# Create a blueprint for main routes +main_bp = Blueprint("main", __name__) + + +@main_bp.route("/") +def index(): + """Render the homepage.""" + if current_user.is_authenticated: + # If user is logged in, we could redirect to a dashboard or just show the main page + return render_template("index.html") + else: + # For non-authenticated users, show the public homepage + return render_template("index.html") + + +@main_bp.route("/about") +def about(): + """About page.""" + return render_template("about.html") diff --git a/routes/schema_routes.py b/routes/schema_routes.py index e69de29..942b7be 100644 --- a/routes/schema_routes.py +++ b/routes/schema_routes.py @@ -0,0 +1,185 @@ +from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify +from flask_login import login_required, current_user +from flask_wtf import FlaskForm +from wtforms import ( + StringField, + TextAreaField, + FieldList, + FormField, + SelectField, + IntegerField, + SubmitField, +) +from wtforms.validators import DataRequired, Length, NumberRange + +from services.schema_service import ( + get_all_schemas, + get_schema_by_id, + create_schema, + update_schema, + delete_schema, +) +from services.document_service import get_allowed_filetypes +from utils.security import permission_required +from utils.validators import validate_schema_data + +# Definir Blueprint +schemas_bp = Blueprint("schemas", __name__, url_prefix="/schemas") + + +# Formularios +class DocumentTypeForm(FlaskForm): + """Formulario para tipo de documento en esquema.""" + + tipo = StringField("Tipo de Documento", validators=[DataRequired()]) + nombre = StringField("Nombre", validators=[DataRequired()]) + nivel_ver = IntegerField("Nivel para Ver", default=0) + nivel_editar = IntegerField("Nivel para Editar", default=5000) + + +class SchemaForm(FlaskForm): + """Formulario para esquema.""" + + codigo = StringField("Código", validators=[DataRequired(), Length(1, 20)]) + descripcion = TextAreaField("Descripción", validators=[DataRequired()]) + documentos = FieldList(FormField(DocumentTypeForm), min_entries=1) + submit = SubmitField("Guardar") + + +# Rutas +@schemas_bp.route("/") +@login_required +@permission_required(5000) # Nivel mínimo para ver esquemas +def list(): + """Lista todos los esquemas disponibles.""" + schemas = get_all_schemas() + return render_template("schemas/list.html", schemas=schemas) + + +@schemas_bp.route("/view/") +@login_required +@permission_required(5000) +def view(schema_id): + """Ver detalle de un esquema.""" + schema = get_schema_by_id(schema_id) + if not schema: + flash("Esquema no encontrado.", "danger") + return redirect(url_for("schemas.list")) + + return render_template("schemas/view.html", schema=schema) + + +@schemas_bp.route("/create", methods=["GET", "POST"]) +@login_required +@permission_required(9000) # Solo administradores +def create(): + """Crear nuevo esquema.""" + form = SchemaForm() + + if form.validate_on_submit(): + data = { + "codigo": form.codigo.data, + "descripcion": form.descripcion.data, + "documentos": [], + } + + for doc_form in form.documentos: + data["documentos"].append( + { + "tipo": doc_form.tipo.data, + "nombre": doc_form.nombre.data, + "nivel_ver": doc_form.nivel_ver.data, + "nivel_editar": doc_form.nivel_editar.data, + } + ) + + success, message = create_schema(data, current_user.id) + + if success: + flash(message, "success") + return redirect(url_for("schemas.list")) + else: + flash(message, "danger") + + return render_template("schemas/create.html", form=form) + + +@schemas_bp.route("/edit/", methods=["GET", "POST"]) +@login_required +@permission_required(9000) # Solo administradores +def edit(schema_id): + """Editar esquema existente.""" + schema = get_schema_by_id(schema_id) + if not schema: + flash("Esquema no encontrado.", "danger") + return redirect(url_for("schemas.list")) + + # Prepopulate form + form = SchemaForm(obj=schema) + + if form.validate_on_submit(): + data = { + "codigo": form.codigo.data, + "descripcion": form.descripcion.data, + "documentos": [], + } + + for doc_form in form.documentos: + data["documentos"].append( + { + "tipo": doc_form.tipo.data, + "nombre": doc_form.nombre.data, + "nivel_ver": doc_form.nivel_ver.data, + "nivel_editar": doc_form.nivel_editar.data, + } + ) + + success, message = update_schema(schema_id, data) + + if success: + flash(message, "success") + return redirect(url_for("schemas.list")) + else: + flash(message, "danger") + + return render_template("schemas/edit.html", form=form, schema=schema) + + +@schemas_bp.route("/delete/", methods=["POST"]) +@login_required +@permission_required(9000) # Solo administradores +def delete(schema_id): + """Eliminar esquema.""" + success, message = delete_schema(schema_id) + + if success: + flash(message, "success") + else: + flash(message, "danger") + + return redirect(url_for("schemas.list")) + + +@schemas_bp.route("/api/list") +@login_required +def api_list(): + """API para listar esquemas.""" + schemas = get_all_schemas() + return jsonify( + [ + {"code": code, "description": schema["descripcion"]} + for code, schema in schemas.items() + ] + ) + + +@schemas_bp.route("/api/get/") +@login_required +def api_get(schema_code): + """API para obtener un esquema específico.""" + schema = get_schema(schema_code) + + if not schema: + return jsonify({"error": "Esquema no encontrado"}), 404 + + return jsonify(schema) diff --git a/routes/user_routes.py b/routes/user_routes.py index e69de29..60aa3aa 100644 --- a/routes/user_routes.py +++ b/routes/user_routes.py @@ -0,0 +1,259 @@ +from flask import ( + Blueprint, + render_template, + redirect, + url_for, + flash, + request, + jsonify, + current_app, +) +from flask_login import login_required, current_user +from flask_wtf import FlaskForm +from wtforms import ( + StringField, + PasswordField, + SelectField, + IntegerField, + EmailField, + BooleanField, + SubmitField, +) +from wtforms.validators import DataRequired, Email, Length, Optional, EqualTo + +from services.auth_service import ( + get_all_users, + get_user_by_username, + create_user, + update_user, + delete_user, +) +from services.user_service import ( + filter_users, + get_user_stats, + check_username_availability, +) +from utils.validators import validate_user_data +from utils.security import permission_required +from utils.logger import log_user_management + +# Definir Blueprint +users_bp = Blueprint("users", __name__, url_prefix="/users") + + +# Formularios +class UserForm(FlaskForm): + """Formulario para crear/editar usuarios.""" + + nombre = StringField("Nombre completo", validators=[DataRequired(), Length(1, 100)]) + username = StringField( + "Nombre de usuario", validators=[DataRequired(), Length(3, 20)] + ) + email = EmailField("Email", validators=[DataRequired(), Email()]) + password = PasswordField("Contraseña", validators=[Optional(), Length(8, 64)]) + password_confirm = PasswordField( + "Confirmar contraseña", + validators=[ + Optional(), + EqualTo("password", message="Las contraseñas deben coincidir"), + ], + ) + nivel = IntegerField("Nivel de acceso", validators=[DataRequired()]) + idioma = SelectField("Idioma", choices=[("es", "Español"), ("en", "Inglés")]) + empresa = StringField("Empresa", validators=[Optional(), Length(0, 100)]) + estado = SelectField( + "Estado", choices=[("activo", "Activo"), ("inactivo", "Inactivo")] + ) + fecha_caducidad = StringField( + "Fecha de caducidad (YYYY-MM-DD)", validators=[Optional()] + ) + submit = SubmitField("Guardar") + + +class UserFilterForm(FlaskForm): + """Formulario para filtrar usuarios.""" + + empresa = StringField("Empresa", validators=[Optional()]) + estado = SelectField( + "Estado", + choices=[("", "Todos"), ("activo", "Activo"), ("inactivo", "Inactivo")], + validators=[Optional()], + ) + nivel_min = IntegerField("Nivel mínimo", validators=[Optional()]) + nivel_max = IntegerField("Nivel máximo", validators=[Optional()]) + submit = SubmitField("Filtrar") + + +# Rutas +@users_bp.route("/") +@login_required +@permission_required(5000) # Assuming 5000+ is admin level permission +def list(): + """List all users.""" + # Import here to avoid circular imports + from services.user_service import get_all_users + + users = get_all_users() + + return render_template("users/list.html", users=users) + + +@users_bp.route("/create", methods=["GET", "POST"]) +@login_required +@permission_required(9000) # Solo administradores +def create(): + """Crear nuevo usuario.""" + form = UserForm() + + if form.validate_on_submit(): + # Validar datos + data = { + "nombre": form.nombre.data, + "username": form.username.data, + "email": form.email.data, + "password": form.password.data, + "nivel": form.nivel.data, + "idioma": form.idioma.data, + "empresa": form.empresa.data, + "estado": form.estado.data, + "fecha_caducidad": ( + form.fecha_caducidad.data if form.fecha_caducidad.data else None + ), + } + + is_valid, errors = validate_user_data(data) + + if not is_valid: + for field, error in errors.items(): + flash(f"Error en {field}: {error}", "danger") + return render_template("users/create.html", form=form) + + # Crear usuario + success, message = create_user( + username=data["username"], + nombre=data["nombre"], + email=data["email"], + password=data["password"], + nivel=data["nivel"], + idioma=data["idioma"], + fecha_caducidad=data["fecha_caducidad"], + empresa=data["empresa"], + estado=data["estado"], + ) + + if success: + flash(message, "success") + + # Registrar actividad + log_user_management( + admin_id=current_user.id, + target_user_id=data["username"], + action="create", + ) + + return redirect(url_for("users.list_users")) + else: + flash(message, "danger") + + return render_template("users/create.html", form=form) + + +@users_bp.route("/edit/", methods=["GET", "POST"]) +@login_required +@permission_required(9000) # Solo administradores +def edit(username): + """Editar usuario existente.""" + user = get_user_by_username(username) + + if not user: + flash(f"Usuario {username} no encontrado.", "danger") + return redirect(url_for("users.list_users")) + + form = UserForm(obj=user) + + if form.validate_on_submit(): + # Validar datos + data = { + "nombre": form.nombre.data, + "email": form.email.data, + "password": form.password.data, + "nivel": form.nivel.data, + "idioma": form.idioma.data, + "empresa": form.empresa.data, + "estado": form.estado.data, + "fecha_caducidad": ( + form.fecha_caducidad.data if form.fecha_caducidad.data else None + ), + } + + is_valid, errors = validate_user_data(data, is_new_user=False) + + if not is_valid: + for field, error in errors.items(): + flash(f"Error en {field}: {error}", "danger") + return render_template("users/edit.html", form=form, username=username) + + # Actualizar usuario + success, message = update_user(username, data) + + if success: + flash(message, "success") + + # Registrar actividad + log_user_management( + admin_id=current_user.id, + target_user_id=username, + action="update", + details={"fields_updated": list(data.keys())}, + ) + + return redirect(url_for("users.list_users")) + else: + flash(message, "danger") + + return render_template("users/edit.html", form=form, username=username) + + +@users_bp.route("/delete/", methods=["POST"]) +@login_required +@permission_required(9000) # Solo administradores +def delete(username): + """Eliminar usuario.""" + if username == current_user.id: + flash("No puede eliminar su propio usuario.", "danger") + return redirect(url_for("users.list_users")) + + if username == "admin": + flash("No se puede eliminar el usuario administrador.", "danger") + return redirect(url_for("users.list_users")) + + success, message = delete_user(username) + + if success: + flash(message, "success") + + # Registrar actividad + log_user_management( + admin_id=current_user.id, target_user_id=username, action="delete" + ) + else: + flash(message, "danger") + + return redirect(url_for("users.list_users")) + + +# API para verificar disponibilidad de nombre de usuario +@users_bp.route("/api/check_username", methods=["POST"]) +@login_required +@permission_required(9000) # Solo administradores +def api_check_username(): + """Verificar disponibilidad de nombre de usuario.""" + data = request.get_json() + + if not data or "username" not in data: + return jsonify({"error": "Se requiere nombre de usuario"}), 400 + + username = data["username"] + available = check_username_availability(username) + + return jsonify({"available": available}) diff --git a/run_tests.py b/run_tests.py new file mode 100644 index 0000000..e649add --- /dev/null +++ b/run_tests.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +""" +Script to run the test suite for ARCH application. +Executes all tests and generates JSON and HTML reports. +""" +import pytest +import os +import sys +import argparse +import json +import shutil +from datetime import datetime + +def run_tests(args): + """Run pytest with specified arguments and generate reports.""" + # Create test reports directory if needed + os.makedirs('test_reports', exist_ok=True) + + # Generate timestamp for report files + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + + # Base pytest arguments + pytest_args = [ + '-v', # Verbose output + '--no-header', # No header in the output + '--tb=short', # Short traceback + f'--junitxml=test_reports/junit_{timestamp}.xml', # JUnit XML report + '--cov=app', # Coverage for app module + '--cov=routes', # Coverage for routes + '--cov=services', # Coverage for services + '--cov=utils', # Coverage for utils + '--cov-report=html:test_reports/coverage', # HTML coverage report + ] + + # Add test files/directories + if args.tests: + pytest_args.extend(args.tests) + else: + pytest_args.append('tests/') + + # Execute tests + print(f"\n{'='*80}") + print(f"Running tests at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"{'='*80}") + + result = pytest.main(pytest_args) + + # Generate JSON report + print(f"\n{'='*80}") + print(f"Test report available at: test_reports/test_results_{timestamp}.json") + print(f"Coverage report available at: test_reports/coverage/index.html") + print(f"{'='*80}\n") + + return result + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run tests for ARCH application") + parser.add_argument('tests', nargs='*', help='Specific test files or directories to run') + parser.add_argument('--clean', action='store_true', help='Clean test storage and results before running') + + args = parser.parse_args() + + if args.clean: + # Clean temporary test storage + if os.path.exists('test_storage'): + shutil.rmtree('test_storage') + # Clean previous test reports + if os.path.exists('test_reports'): + shutil.rmtree('test_reports') + print("Cleaned test storage and reports.") + + # Run tests and exit with the pytest result code + sys.exit(run_tests(args)) diff --git a/services/__init__.py b/services/__init__.py index e69de29..565e38d 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -0,0 +1,6 @@ +""" +Paquete de servicios para la aplicación ARCH. + +Este paquete contiene los módulos que implementan la lógica de negocio +de la aplicación, separando las responsabilidades de las rutas y vistas. +""" diff --git a/services/export_service.py b/services/export_service.py index e69de29..a3cb023 100644 --- a/services/export_service.py +++ b/services/export_service.py @@ -0,0 +1,317 @@ +import os +import json +import zipfile +import tempfile +from datetime import datetime +import pytz +from flask import current_app + +from services.project_service import get_project, find_project_directory +from services.document_service import get_project_documents, get_latest_version +from utils.file_utils import ensure_dir_exists + +def export_project(project_id, document_ids=None, include_metadata=True): + """ + Exportar un proyecto completo o documentos seleccionados a un archivo ZIP. + + Args: + project_id (int): ID del proyecto a exportar + document_ids (list, optional): Lista de IDs de documentos a incluir. + Si es None, se incluyen todos los documentos. + include_metadata (bool, optional): Incluir metadatos del proyecto y documentos. + Por defecto es True. + + Returns: + tuple: (success, message, zip_path) + - success (bool): True si la exportación fue exitosa + - message (str): Mensaje descriptivo + - zip_path (str): Ruta al archivo ZIP generado + """ + # Verificar que el proyecto existe + project = get_project(project_id) + + if not project: + return False, f"No se encontró el proyecto con ID {project_id}.", None + + # Obtener directorio del proyecto + project_dir = find_project_directory(project_id) + + if not project_dir: + return False, f"No se encontró el directorio del proyecto con ID {project_id}.", None + + # Crear directorio temporal para la exportación + temp_dir = tempfile.mkdtemp() + + try: + # Obtener documentos del proyecto + all_documents = get_project_documents(project_id) + + # Filtrar documentos si se especificaron IDs + if document_ids: + documents = [doc for doc in all_documents if doc.get('id') in document_ids] + else: + documents = all_documents + + if not documents: + return False, "No hay documentos para exportar.", None + + # Crear nombre para el archivo ZIP + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + project_code = project.get('codigo', f'PROJ{project_id}') + zip_filename = f"{project_code}_export_{timestamp}.zip" + + # Ruta completa al archivo ZIP + storage_path = current_app.config['STORAGE_PATH'] + exports_dir = os.path.join(storage_path, 'exports') + ensure_dir_exists(exports_dir) + zip_path = os.path.join(exports_dir, zip_filename) + + # Crear archivo ZIP + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + # Incluir metadatos del proyecto si se solicita + if include_metadata: + # Crear archivo JSON con metadatos del proyecto + project_meta = { + 'codigo': project.get('codigo'), + 'descripcion': project.get('descripcion'), + 'cliente': project.get('cliente'), + 'destinacion': project.get('destinacion'), + 'ano_creacion': project.get('ano_creacion'), + 'fecha_creacion': project.get('fecha_creacion'), + 'creado_por': project.get('creado_por'), + 'fecha_exportacion': datetime.now(pytz.UTC).isoformat(), + 'documentos_incluidos': len(documents) + } + + # Guardar metadatos en archivo temporal + meta_file = os.path.join(temp_dir, 'project_info.json') + with open(meta_file, 'w', encoding='utf-8') as f: + json.dump(project_meta, f, ensure_ascii=False, indent=2) + + # Añadir al ZIP + zipf.write(meta_file, arcname='project_info.json') + + # Añadir documentos (última versión de cada uno) + for document in documents: + doc_id = document.get('id') + doc_name = document.get('document_id', '').split('_', 1)[1] if '_' in document.get('document_id', '') else f'doc_{doc_id}' + + # Obtener última versión + version_meta, file_path = get_latest_version(project_id, doc_id) + + if not version_meta or not file_path or not os.path.exists(file_path): + continue + + # Nombre para el archivo en el ZIP + version_num = version_meta.get('version', 1) + original_filename = document.get('original_filename', os.path.basename(file_path)) + + # Usar nombre original o generar uno basado en metadatos + if '.' in original_filename: + base_name, extension = original_filename.rsplit('.', 1) + zip_filename = f"{doc_name}_v{version_num}.{extension}" + else: + zip_filename = f"{doc_name}_v{version_num}" + + # Añadir archivo al ZIP + zipf.write(file_path, arcname=f'documents/{zip_filename}') + + # Incluir metadatos del documento si se solicita + if include_metadata: + # Crear metadatos simplificados + doc_meta = { + 'nombre': doc_name, + 'version': version_num, + 'fecha_creacion': version_meta.get('created_at'), + 'creado_por': version_meta.get('created_by'), + 'descripcion': version_meta.get('description', ''), + 'tamano': version_meta.get('file_size', 0) + } + + # Guardar metadatos en archivo temporal + doc_meta_file = os.path.join(temp_dir, f'{doc_name}_meta.json') + with open(doc_meta_file, 'w', encoding='utf-8') as f: + json.dump(doc_meta, f, ensure_ascii=False, indent=2) + + # Añadir al ZIP + zipf.write(doc_meta_file, arcname=f'documents/{doc_name}_meta.json') + + return True, f"Proyecto exportado correctamente. {len(documents)} documentos incluidos.", zip_path + + except Exception as e: + return False, f"Error al exportar proyecto: {str(e)}", None + + finally: + # Limpiar directorio temporal + import shutil + shutil.rmtree(temp_dir) + +def export_document_versions(project_id, document_id, include_all_versions=False): + """ + Exportar todas las versiones de un documento específico. + + Args: + project_id (int): ID del proyecto + document_id (int): ID del documento + include_all_versions (bool, optional): Incluir todas las versiones. + Si es False, solo se incluye la última versión. + + Returns: + tuple: (success, message, zip_path) + """ + from services.document_service import get_document, find_document_directory + + # Verificar que el proyecto existe + project = get_project(project_id) + + if not project: + return False, f"No se encontró el proyecto con ID {project_id}.", None + + # Obtener directorio del proyecto + project_dir = find_project_directory(project_id) + + if not project_dir: + return False, f"No se encontró el directorio del proyecto con ID {project_id}.", None + + # Obtener documento + document = get_document(project_id, document_id) + + if not document: + return False, f"No se encontró el documento con ID {document_id}.", None + + # Obtener directorio del documento + document_dir = find_document_directory(project_dir, document_id) + + if not document_dir: + return False, f"No se encontró el directorio del documento con ID {document_id}.", None + + # Crear nombre para el archivo ZIP + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + doc_name = document.get('document_id', '').split('_', 1)[1] if '_' in document.get('document_id', '') else f'doc_{document_id}' + zip_filename = f"{doc_name}_export_{timestamp}.zip" + + # Ruta completa al archivo ZIP + storage_path = current_app.config['STORAGE_PATH'] + exports_dir = os.path.join(storage_path, 'exports') + ensure_dir_exists(exports_dir) + zip_path = os.path.join(exports_dir, zip_filename) + + try: + # Crear archivo ZIP + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + # Obtener versiones + versions = document.get('versions', []) + + if not versions: + return False, "El documento no tiene versiones.", None + + # Si solo se quiere la última versión + if not include_all_versions: + latest_version = max(versions, key=lambda v: v['version']) + versions = [latest_version] + + # Añadir cada versión al ZIP + for version in versions: + version_num = version.get('version', 1) + version_filename = version.get('filename') + + if not version_filename: + continue + + file_path = os.path.join(document_dir, version_filename) + + if not os.path.exists(file_path): + continue + + # Añadir archivo al ZIP + zipf.write(file_path, arcname=f'v{version_num}_{version_filename}') + + # Añadir metadatos + meta_data = { + 'document_id': document.get('document_id'), + 'original_filename': document.get('original_filename'), + 'versions_included': len(versions), + 'export_date': datetime.now(pytz.UTC).isoformat() + } + + # Crear archivo temporal para metadatos + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp: + json.dump(meta_data, temp, ensure_ascii=False, indent=2) + temp_path = temp.name + + # Añadir metadatos al ZIP + zipf.write(temp_path, arcname='document_info.json') + + # Eliminar archivo temporal + os.unlink(temp_path) + + return True, f"Documento exportado correctamente. {len(versions)} versiones incluidas.", zip_path + + except Exception as e: + return False, f"Error al exportar documento: {str(e)}", None + +def create_project_report(project_id, report_type='summary'): + """ + Crear un informe del proyecto en formato JSON. + + Args: + project_id (int): ID del proyecto + report_type (str, optional): Tipo de informe ('summary', 'detailed'). + Por defecto es 'summary'. + + Returns: + tuple: (success, message, report_data) + """ + # Verificar que el proyecto existe + project = get_project(project_id) + + if not project: + return False, f"No se encontró el proyecto con ID {project_id}.", None + + # Obtener documentos del proyecto + documents = get_project_documents(project_id) + + # Crear informe básico + report = { + 'project_code': project.get('codigo'), + 'description': project.get('descripcion'), + 'client': project.get('cliente'), + 'destination': project.get('destinacion'), + 'creation_year': project.get('ano_creacion'), + 'creation_date': project.get('fecha_creacion'), + 'created_by': project.get('creado_por'), + 'status': project.get('estado'), + 'last_modified': project.get('ultima_modificacion'), + 'modified_by': project.get('modificado_por'), + 'document_count': len(documents), + 'report_generated': datetime.now(pytz.UTC).isoformat(), + 'report_type': report_type + } + + # Si es un informe detallado, incluir información de documentos + if report_type == 'detailed': + report['documents'] = [] + + for doc in documents: + doc_info = { + 'id': doc.get('id'), + 'name': doc.get('document_id', '').split('_', 1)[1] if '_' in doc.get('document_id', '') else '', + 'original_filename': doc.get('original_filename'), + 'version_count': len(doc.get('versions', [])), + } + + # Incluir información de la última versión + if doc.get('versions'): + latest_version = max(doc.get('versions', []), key=lambda v: v['version']) + doc_info['latest_version'] = { + 'version': latest_version.get('version'), + 'created_at': latest_version.get('created_at'), + 'created_by': latest_version.get('created_by'), + 'description': latest_version.get('description'), + 'file_size': latest_version.get('file_size'), + 'download_count': len(latest_version.get('downloads', [])) + } + + report['documents'].append(doc_info) + + return True, "Informe generado correctamente.", report diff --git a/services/index_service.py b/services/index_service.py index e69de29..1542575 100644 --- a/services/index_service.py +++ b/services/index_service.py @@ -0,0 +1,276 @@ +import os +import json +import re +from datetime import datetime +import pytz +from flask import current_app + +from utils.file_utils import load_json_file, save_json_file, ensure_dir_exists +from services.project_service import get_all_projects +from services.document_service import get_project_documents + +def build_search_index(): + """ + Construir un índice de búsqueda para proyectos y documentos. + + Returns: + tuple: (success, message) + """ + try: + # Obtener todos los proyectos (incluyendo inactivos) + all_projects = get_all_projects(include_inactive=True) + + # Inicializar índice + index = { + 'projects': {}, + 'documents': {}, + 'clients': {}, + 'years': {}, + 'users': {}, + 'last_updated': datetime.now(pytz.UTC).isoformat() + } + + # Indexar proyectos + for project in all_projects: + project_id = int(project.get('codigo', '').replace('PROJ', '')) + + if not project_id: + continue + + # Datos básicos del proyecto + project_data = { + 'id': project_id, + 'code': project.get('codigo'), + 'description': project.get('descripcion', ''), + 'client': project.get('cliente', ''), + 'destination': project.get('destinacion', ''), + 'year': project.get('ano_creacion'), + 'status': project.get('estado', 'inactivo'), + 'created_by': project.get('creado_por', ''), + 'modified_by': project.get('modificado_por', ''), + 'directory': project.get('directory', '') + } + + # Añadir al índice de proyectos + index['projects'][str(project_id)] = project_data + + # Añadir a índice de clientes + client = project.get('cliente', 'Sin cliente') + if client not in index['clients']: + index['clients'][client] = [] + index['clients'][client].append(project_id) + + # Añadir a índice de años + year = str(project.get('ano_creacion', 'Sin año')) + if year not in index['years']: + index['years'][year] = [] + index['years'][year].append(project_id) + + # Añadir a índice de usuarios + creator = project.get('creado_por', 'desconocido') + if creator not in index['users']: + index['users'][creator] = {'created': [], 'modified': []} + index['users'][creator]['created'].append(project_id) + + modifier = project.get('modificado_por') + if modifier and modifier not in index['users']: + index['users'][modifier] = {'created': [], 'modified': []} + if modifier: + index['users'][modifier]['modified'].append(project_id) + + # Indexar documentos del proyecto + documents = get_project_documents(project_id) + + for doc in documents: + doc_id = doc.get('id') + + if not doc_id: + continue + + # Extraer nombre del documento + doc_name = doc.get('document_id', '').split('_', 1)[1] if '_' in doc.get('document_id', '') else f'doc_{doc_id}' + + # Datos básicos del documento + doc_data = { + 'id': doc_id, + 'name': doc_name, + 'project_id': project_id, + 'original_filename': doc.get('original_filename', ''), + 'versions': len(doc.get('versions', [])), + 'latest_version': max([v.get('version', 0) for v in doc.get('versions', [])]) if doc.get('versions') else 0, + 'directory': doc.get('directory', '') + } + + # Añadir al índice de documentos + index['documents'][f"{project_id}_{doc_id}"] = doc_data + + # Guardar índice + storage_path = current_app.config['STORAGE_PATH'] + index_file = os.path.join(storage_path, 'indices.json') + save_json_file(index_file, index) + + return True, "Índice construido correctamente." + + except Exception as e: + current_app.logger.error(f"Error al construir índice: {str(e)}") + return False, f"Error al construir índice: {str(e)}" + +def search_projects(query, filters=None): + """ + Buscar proyectos en el índice. + + Args: + query (str): Término de búsqueda + filters (dict, optional): Filtros adicionales + - client: Cliente específico + - year: Año de creación + - status: Estado del proyecto + - created_by: Usuario creador + + Returns: + list: Lista de proyectos que coinciden con la búsqueda + """ + # Cargar índice + storage_path = current_app.config['STORAGE_PATH'] + index_file = os.path.join(storage_path, 'indices.json') + index = load_json_file(index_file, {}) + + if not index or 'projects' not in index: + return [] + + # Preparar resultados + results = [] + + # Convertir query a minúsculas para búsqueda insensible a mayúsculas + query = query.lower() + + # Buscar en proyectos + for project_id, project_data in index.get('projects', {}).items(): + # Aplicar filtros si existen + if filters: + # Filtrar por cliente + if 'client' in filters and filters['client'] and project_data.get('client') != filters['client']: + continue + + # Filtrar por año + if 'year' in filters and filters['year'] and str(project_data.get('year')) != str(filters['year']): + continue + + # Filtrar por estado + if 'status' in filters and filters['status'] and project_data.get('status') != filters['status']: + continue + + # Filtrar por creador + if 'created_by' in filters and filters['created_by'] and project_data.get('created_by') != filters['created_by']: + continue + + # Si no hay query, incluir todos los proyectos que pasen los filtros + if not query: + results.append(project_data) + continue + + # Buscar coincidencias en campos relevantes + if (query in project_data.get('code', '').lower() or + query in project_data.get('description', '').lower() or + query in project_data.get('client', '').lower() or + query in project_data.get('destination', '').lower()): + results.append(project_data) + + return results + +def search_documents(query, project_id=None): + """ + Buscar documentos en el índice. + + Args: + query (str): Término de búsqueda + project_id (int, optional): ID del proyecto para limitar la búsqueda + + Returns: + list: Lista de documentos que coinciden con la búsqueda + """ + # Cargar índice + storage_path = current_app.config['STORAGE_PATH'] + index_file = os.path.join(storage_path, 'indices.json') + index = load_json_file(index_file, {}) + + if not index or 'documents' not in index: + return [] + + # Preparar resultados + results = [] + + # Convertir query a minúsculas para búsqueda insensible a mayúsculas + query = query.lower() + + # Buscar en documentos + for doc_key, doc_data in index.get('documents', {}).items(): + # Si se especifica proyecto, filtrar por él + if project_id is not None and doc_data.get('project_id') != int(project_id): + continue + + # Si no hay query, incluir todos los documentos del proyecto + if not query: + results.append(doc_data) + continue + + # Buscar coincidencias en campos relevantes + if (query in doc_data.get('name', '').lower() or + query in doc_data.get('original_filename', '').lower()): + results.append(doc_data) + + return results + +def get_index_stats(): + """ + Obtener estadísticas del índice. + + Returns: + dict: Estadísticas del índice + """ + # Cargar índice + storage_path = current_app.config['STORAGE_PATH'] + index_file = os.path.join(storage_path, 'indices.json') + index = load_json_file(index_file, {}) + + if not index: + return { + 'projects': 0, + 'documents': 0, + 'clients': 0, + 'years': 0, + 'users': 0, + 'last_updated': None + } + + # Calcular estadísticas + stats = { + 'projects': len(index.get('projects', {})), + 'documents': len(index.get('documents', {})), + 'clients': len(index.get('clients', {})), + 'years': len(index.get('years', {})), + 'users': len(index.get('users', {})), + 'last_updated': index.get('last_updated') + } + + return stats + +def schedule_index_update(): + """ + Programar actualización periódica del índice. + """ + from apscheduler.schedulers.background import BackgroundScheduler + + scheduler = BackgroundScheduler() + + # Programar actualización diaria a las 3:00 AM + scheduler.add_job( + build_search_index, + 'cron', + hour=3, + minute=0, + id='index_update' + ) + + scheduler.start() + current_app.logger.info("Actualización de índice programada.") diff --git a/services/schema_service.py b/services/schema_service.py index fed4156..0038af9 100644 --- a/services/schema_service.py +++ b/services/schema_service.py @@ -1,212 +1,176 @@ import os +import json from datetime import datetime import pytz from flask import current_app from utils.file_utils import load_json_file, save_json_file -def get_all_schemas(): - """ - Obtener todos los esquemas disponibles. - - Returns: - dict: Diccionario de esquemas - """ - storage_path = current_app.config['STORAGE_PATH'] - schema_file = os.path.join(storage_path, 'schemas', 'schema.json') - - return load_json_file(schema_file, {}) -def get_schema(schema_code): +def get_schemas_file_path(): + """Obtener ruta al archivo de esquemas.""" + storage_path = current_app.config["STORAGE_PATH"] + return os.path.join(storage_path, "schemas", "schema.json") + + +def get_all_schemas(): + """Obtener todos los esquemas disponibles.""" + return load_json_file(get_schemas_file_path(), {}) + + +def get_schema_by_id(schema_id): """ - Obtener un esquema específico. - + Obtener un esquema por su ID. + Args: - schema_code (str): Código del esquema - + schema_id (str): ID del esquema a buscar + Returns: dict: Datos del esquema o None si no existe """ schemas = get_all_schemas() - - return schemas.get(schema_code) + return schemas.get(schema_id) -def create_schema(schema_data, creator_username): + +def create_schema(schema_data, user_id): """ Crear un nuevo esquema. - + Args: schema_data (dict): Datos del esquema - creator_username (str): Usuario que crea el esquema - - Returns: - tuple: (success, message, schema_code) - """ - # Validar datos obligatorios - if 'descripcion' not in schema_data or not schema_data['descripcion']: - return False, "La descripción del esquema es obligatoria.", None - - if 'documentos' not in schema_data or not schema_data['documentos']: - return False, "Se requiere al menos un tipo de documento en el esquema.", None - - # Generar código de esquema - schemas = get_all_schemas() - - # Encontrar el último código numérico - last_code = 0 - for code in schemas.keys(): - if code.startswith('ESQ'): - try: - num = int(code[3:]) - if num > last_code: - last_code = num - except ValueError: - pass - - # Crear nuevo código - schema_code = f"ESQ{last_code + 1:03d}" - - # Crear esquema - schemas[schema_code] = { - 'codigo': schema_code, - 'descripcion': schema_data['descripcion'], - 'fecha_creacion': datetime.now(pytz.UTC).isoformat(), - 'creado_por': creator_username, - 'documentos': schema_data['documentos'] - } - - # Guardar esquemas - storage_path = current_app.config['STORAGE_PATH'] - schema_file = os.path.join(storage_path, 'schemas', 'schema.json') - save_json_file(schema_file, schemas) - - return True, "Esquema creado correctamente.", schema_code + user_id (str): ID del usuario que crea el esquema -def update_schema(schema_code, schema_data): + Returns: + tuple: (éxito, mensaje) + """ + schemas = get_all_schemas() + + # Verificar si ya existe un esquema con ese código + schema_id = schema_data["codigo"] + if schema_id in schemas: + return False, f"Ya existe un esquema con el código {schema_id}." + + # Añadir metadatos + schema_data["fecha_creacion"] = datetime.now(pytz.UTC).isoformat() + schema_data["creado_por"] = user_id + + # Guardar esquema + schemas[schema_id] = schema_data + save_json_file(get_schemas_file_path(), schemas) + + return True, f"Esquema '{schema_data['descripcion']}' creado correctamente." + + +def update_schema(schema_id, schema_data): """ Actualizar un esquema existente. - + Args: - schema_code (str): Código del esquema - schema_data (dict): Datos actualizados - + schema_id (str): ID del esquema a actualizar + schema_data (dict): Nuevos datos del esquema + Returns: - tuple: (success, message) + tuple: (éxito, mensaje) """ schemas = get_all_schemas() - - # Verificar si existe el esquema - if schema_code not in schemas: - return False, f"No se encontró el esquema con código {schema_code}." - - # Actualizar campos permitidos - if 'descripcion' in schema_data: - schemas[schema_code]['descripcion'] = schema_data['descripcion'] - - if 'documentos' in schema_data: - schemas[schema_code]['documentos'] = schema_data['documentos'] - - # Guardar esquemas - storage_path = current_app.config['STORAGE_PATH'] - schema_file = os.path.join(storage_path, 'schemas', 'schema.json') - save_json_file(schema_file, schemas) - - return True, "Esquema actualizado correctamente." -def delete_schema(schema_code): + # Verificar si existe el esquema + if schema_id not in schemas: + return False, f"No existe un esquema con el código {schema_id}." + + # Preservar metadatos originales + schema_data["fecha_creacion"] = schemas[schema_id].get("fecha_creacion") + schema_data["creado_por"] = schemas[schema_id].get("creado_por") + schema_data["ultima_modificacion"] = datetime.now(pytz.UTC).isoformat() + + # Actualizar esquema + schemas[schema_id] = schema_data + save_json_file(get_schemas_file_path(), schemas) + + return True, f"Esquema '{schema_data['descripcion']}' actualizado correctamente." + + +def delete_schema(schema_id): """ Eliminar un esquema. - + Args: - schema_code (str): Código del esquema - + schema_id (str): ID del esquema a eliminar + Returns: - tuple: (success, message) + tuple: (éxito, mensaje) """ schemas = get_all_schemas() - + # Verificar si existe el esquema - if schema_code not in schemas: - return False, f"No se encontró el esquema con código {schema_code}." - + if schema_id not in schemas: + return False, f"No existe un esquema con el código {schema_id}." + + # Verificar si el esquema está en uso + # [Implementar verificación si el esquema está asociado a proyectos] + # Eliminar esquema - del schemas[schema_code] - - # Guardar esquemas - storage_path = current_app.config['STORAGE_PATH'] - schema_file = os.path.join(storage_path, 'schemas', 'schema.json') - save_json_file(schema_file, schemas) - - return True, "Esquema eliminado correctamente." + schema_desc = schemas[schema_id].get("descripcion", schema_id) + del schemas[schema_id] + save_json_file(get_schemas_file_path(), schemas) -def get_schema_document_types(schema_code): - """ - Obtener los tipos de documento definidos en un esquema. - - Args: - schema_code (str): Código del esquema - - Returns: - list: Lista de tipos de documento - """ - schema = get_schema(schema_code) - - if not schema: - return [] - - return schema.get('documentos', []) + return True, f"Esquema '{schema_desc}' eliminado correctamente." -def validate_document_for_schema(schema_code, document_type): - """ - Validar si un tipo de documento está permitido en un esquema. - - Args: - schema_code (str): Código del esquema - document_type (str): Tipo de documento - - Returns: - tuple: (is_valid, required_level) - Validez y nivel requerido - """ - schema = get_schema(schema_code) - - if not schema: - return False, None - - for doc in schema.get('documentos', []): - if doc.get('tipo') == document_type: - return True, doc.get('nivel_editar', 0) - - return False, None def initialize_default_schemas(): - """ - Inicializar esquemas predeterminados si no existen. - """ + """Inicializar esquemas predeterminados si no existen.""" schemas = get_all_schemas() - - if not schemas: - # Crear esquema estándar - create_schema({ - 'descripcion': 'Proyecto estándar', - 'documentos': [ + + # Si ya hay esquemas, no hacer nada + if schemas: + return + + # Esquema predeterminado para proyecto estándar + default_schema = { + "ESQ001": { + "codigo": "ESQ001", + "descripcion": "Proyecto estándar", + "fecha_creacion": datetime.now(pytz.UTC).isoformat(), + "creado_por": "admin", + "documentos": [ { - 'tipo': 'pdf', - 'nombre': 'Manual de Usuario', - 'nivel_ver': 0, - 'nivel_editar': 5000 + "tipo": "pdf", + "nombre": "Manual de Usuario", + "nivel_ver": 0, + "nivel_editar": 5000, }, { - 'tipo': 'txt', - 'nombre': 'Notas del Proyecto', - 'nivel_ver': 0, - 'nivel_editar': 1000 + "tipo": "dwg", + "nombre": "Planos Técnicos", + "nivel_ver": 0, + "nivel_editar": 5000, }, { - 'tipo': 'zip', - 'nombre': 'Archivos Fuente', - 'nivel_ver': 1000, - 'nivel_editar': 5000 - } - ] - }, 'admin') - - current_app.logger.info('Esquema predeterminado creado.') \ No newline at end of file + "tipo": "zip", + "nombre": "Archivos Fuente", + "nivel_ver": 1000, + "nivel_editar": 5000, + }, + ], + } + } + + save_json_file(get_schemas_file_path(), default_schema) + current_app.logger.info("Esquemas predeterminados inicializados.") + + +def get_schema_document_types(schema_id): + """ + Obtener los tipos de documentos definidos en un esquema. + + Args: + schema_id (str): ID del esquema + + Returns: + list: Lista de tipos de documentos en el esquema, o lista vacía si no existe + """ + schema = get_schema_by_id(schema_id) + + if not schema or "documentos" not in schema: + return [] + + return schema["documentos"] diff --git a/services/user_service.py b/services/user_service.py index 07931cd..23f7514 100644 --- a/services/user_service.py +++ b/services/user_service.py @@ -1,116 +1,191 @@ +import os +import json +import uuid +from datetime import datetime +from flask import current_app +from flask_login import UserMixin +from app import bcrypt +from utils.file_utils import load_json_file, save_json_file from services.auth_service import ( - get_user_by_username, get_user_by_id, get_all_users, - create_user, update_user, delete_user + get_user_by_username, + get_user_by_id, + get_all_users, + create_user, + update_user, + delete_user, ) + +class User(UserMixin): + """Clase de usuario para Flask-Login.""" + + def __init__(self, user_id, user_data): + self.id = user_id + self.username = user_data.get("username", "") + self.nombre = user_data.get("nombre", "") + self.email = user_data.get("email", "") + self.nivel = user_data.get("nivel", 0) + self.idioma = user_data.get("idioma", "es") + self.fecha_caducidad = user_data.get("fecha_caducidad") + self.empresa = user_data.get("empresa", "") + self.estado = user_data.get("estado", "activo") + self.ultimo_acceso = user_data.get("ultimo_acceso") + + def has_permission(self, required_level): + """Verificar si el usuario tiene el nivel de permiso requerido.""" + return self.nivel >= required_level and self.estado == "activo" + + def to_dict(self): + """Convertir el usuario a un diccionario para almacenamiento.""" + return { + "username": self.username, + "nombre": self.nombre, + "email": self.email, + "nivel": self.nivel, + "idioma": self.idioma, + "fecha_caducidad": self.fecha_caducidad, + "empresa": self.empresa, + "estado": self.estado, + "ultimo_acceso": self.ultimo_acceso, + } + + +def get_users_file_path(): + """Obtener la ruta al archivo de usuarios.""" + storage_path = current_app.config["STORAGE_PATH"] + return os.path.join(storage_path, "users", "users.json") + + +def get_all_users(): + """Obtener todos los usuarios del sistema.""" + users_data = load_json_file(get_users_file_path(), {}) + return users_data + + +def get_user_by_id(user_id): + """Obtener un usuario por su ID.""" + if not user_id: + return None + + users_data = load_json_file(get_users_file_path(), {}) + + if user_id in users_data: + return User(user_id, users_data[user_id]) + + return None + + # Este archivo importa funcionalidades de auth_service.py para mantener # consistencia en la estructura del proyecto, y agrega funcionalidades específicas # de gestión de usuarios que no están relacionadas con la autenticación. + def filter_users(filter_params): """ Filtrar usuarios según los parámetros proporcionados. - + Args: filter_params (dict): Diccionario con parámetros de filtrado - empresa: Nombre de empresa - estado: Estado del usuario ('activo', 'inactivo') - nivel_min: Nivel mínimo de permiso - nivel_max: Nivel máximo de permiso - + Returns: list: Lista de objetos User que cumplen los criterios """ users = get_all_users() filtered_users = [] - + for user in users: # Filtrar por empresa - if 'empresa' in filter_params and filter_params['empresa']: - if user.empresa != filter_params['empresa']: + if "empresa" in filter_params and filter_params["empresa"]: + if user.empresa != filter_params["empresa"]: continue - + # Filtrar por estado - if 'estado' in filter_params and filter_params['estado']: - if user.estado != filter_params['estado']: + if "estado" in filter_params and filter_params["estado"]: + if user.estado != filter_params["estado"]: continue - + # Filtrar por nivel mínimo - if 'nivel_min' in filter_params and filter_params['nivel_min'] is not None: - if user.nivel < int(filter_params['nivel_min']): + if "nivel_min" in filter_params and filter_params["nivel_min"] is not None: + if user.nivel < int(filter_params["nivel_min"]): continue - + # Filtrar por nivel máximo - if 'nivel_max' in filter_params and filter_params['nivel_max'] is not None: - if user.nivel > int(filter_params['nivel_max']): + if "nivel_max" in filter_params and filter_params["nivel_max"] is not None: + if user.nivel > int(filter_params["nivel_max"]): continue - + # Si pasó todos los filtros, agregar a la lista filtered_users.append(user) - + return filtered_users + def get_user_stats(): """ Obtener estadísticas sobre los usuarios. - + Returns: dict: Diccionario con estadísticas """ users = get_all_users() - + # Inicializar estadísticas stats = { - 'total': len(users), - 'activos': 0, - 'inactivos': 0, - 'expirados': 0, - 'por_empresa': {}, - 'por_nivel': { - 'admin': 0, # Nivel 9000+ - 'gestor': 0, # Nivel 5000-8999 - 'editor': 0, # Nivel 1000-4999 - 'lector': 0 # Nivel 0-999 - } + "total": len(users), + "activos": 0, + "inactivos": 0, + "expirados": 0, + "por_empresa": {}, + "por_nivel": { + "admin": 0, # Nivel 9000+ + "gestor": 0, # Nivel 5000-8999 + "editor": 0, # Nivel 1000-4999 + "lector": 0, # Nivel 0-999 + }, } - + # Calcular estadísticas for user in users: # Estado - if user.estado == 'activo': - stats['activos'] += 1 + if user.estado == "activo": + stats["activos"] += 1 else: - stats['inactivos'] += 1 - + stats["inactivos"] += 1 + # Expirados if user.is_expired(): - stats['expirados'] += 1 - + stats["expirados"] += 1 + # Por empresa - empresa = user.empresa or 'Sin empresa' - stats['por_empresa'][empresa] = stats['por_empresa'].get(empresa, 0) + 1 - + empresa = user.empresa or "Sin empresa" + stats["por_empresa"][empresa] = stats["por_empresa"].get(empresa, 0) + 1 + # Por nivel if user.nivel >= 9000: - stats['por_nivel']['admin'] += 1 + stats["por_nivel"]["admin"] += 1 elif user.nivel >= 5000: - stats['por_nivel']['gestor'] += 1 + stats["por_nivel"]["gestor"] += 1 elif user.nivel >= 1000: - stats['por_nivel']['editor'] += 1 + stats["por_nivel"]["editor"] += 1 else: - stats['por_nivel']['lector'] += 1 - + stats["por_nivel"]["lector"] += 1 + return stats + def check_username_availability(username): """ Verificar si un nombre de usuario está disponible. - + Args: username (str): Nombre de usuario a verificar - + Returns: bool: True si está disponible, False si ya existe """ user = get_user_by_username(username) - return user is None \ No newline at end of file + return user is None diff --git a/static/css/main.css b/static/css/main.css index 8b5b067..53ff9b5 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -21,6 +21,14 @@ body { margin-top: auto; } +main { + flex: 1 0 auto; +} + +footer { + flex-shrink: 0; +} + /* Encabezados */ h1, h2, h3, h4, h5, h6 { font-weight: 600; @@ -35,7 +43,7 @@ h1, h2, h3, h4, h5, h6 { } .card-header { - background-color: #f8f9fa; + background-color: rgba(0, 0, 0, 0.03); border-bottom: 1px solid rgba(0, 0, 0, 0.125); padding: 0.75rem 1.25rem; } @@ -55,12 +63,20 @@ h1, h2, h3, h4, h5, h6 { background-color: rgba(0, 123, 255, 0.05); } +.table-responsive { + overflow-x: auto; +} + /* Botones */ .btn { border-radius: 0.25rem; font-weight: 500; } +.btn-icon { + padding: 0.25rem 0.5rem; +} + /* Navegación */ .navbar { box-shadow: 0 0.125rem 0.25rem rgba(0, 0, 0, 0.075); @@ -93,6 +109,11 @@ h1, h2, h3, h4, h5, h6 { box-shadow: 0 0 0 0.2rem rgba(0, 123, 255, 0.25); } +.required-field::after { + content: " *"; + color: red; +} + /* Alertas */ .alert { border-radius: 0.25rem; @@ -174,4 +195,52 @@ h1, h2, h3, h4, h5, h6 { .text-primary-dark { color: #0056b3; +} + +/* Login form styling */ +.login-container { + max-width: 400px; + margin: 2rem auto; +} + +/* Project card styling */ +.project-card { + transition: transform 0.2s; +} + +.project-card:hover { + transform: translateY(-5px); + box-shadow: 0 0.5rem 1rem rgba(0, 0, 0, 0.15); +} + +/* Document version styling */ +.version-timeline { + position: relative; + padding-left: 30px; +} + +.version-timeline:before { + content: ''; + position: absolute; + left: 10px; + top: 0; + height: 100%; + width: 2px; + background-color: #dee2e6; +} + +.version-item { + position: relative; + margin-bottom: 1.5rem; +} + +.version-item:before { + content: ''; + position: absolute; + left: -25px; + top: 5px; + width: 12px; + height: 12px; + border-radius: 50%; + background-color: #0d6efd; } \ No newline at end of file diff --git a/static/img/icons/logo.webp b/static/img/icons/logo.webp new file mode 100644 index 0000000..60a71fb Binary files /dev/null and b/static/img/icons/logo.webp differ diff --git a/static/img/logo.png b/static/img/logo.png new file mode 100644 index 0000000..568f59e Binary files /dev/null and b/static/img/logo.png differ diff --git a/static/js/main.js b/static/js/main.js new file mode 100644 index 0000000..6709777 --- /dev/null +++ b/static/js/main.js @@ -0,0 +1,41 @@ +// Main JavaScript for ARCH + +document.addEventListener('DOMContentLoaded', function() { + // Enable tooltips + var tooltipTriggerList = [].slice.call(document.querySelectorAll('[data-bs-toggle="tooltip"]')) + var tooltipList = tooltipTriggerList.map(function (tooltipTriggerEl) { + return new bootstrap.Tooltip(tooltipTriggerEl) + }); + + // Enable popovers + var popoverTriggerList = [].slice.call(document.querySelectorAll('[data-bs-toggle="popover"]')) + var popoverList = popoverTriggerList.map(function (popoverTriggerEl) { + return new bootstrap.Popover(popoverTriggerEl) + }); + + // Auto-hide alerts after 5 seconds + setTimeout(function() { + $('.alert').fadeOut('slow'); + }, 5000); +}); + +// Function to confirm deletions +function confirmDelete(message, formId) { + if (confirm(message || '¿Está seguro que desea eliminar este elemento?')) { + document.getElementById(formId).submit(); + } +} + +// Function to preview images when uploading +function previewImage(input, previewId) { + if (input.files && input.files[0]) { + var reader = new FileReader(); + + reader.onload = function(e) { + document.getElementById(previewId).src = e.target.result; + document.getElementById(previewId).style.display = 'block'; + } + + reader.readAsDataURL(input.files[0]); + } +} diff --git a/templates/about.html b/templates/about.html new file mode 100644 index 0000000..67d5355 --- /dev/null +++ b/templates/about.html @@ -0,0 +1,39 @@ +{% extends 'base.html' %} + +{% block title %}Acerca de - ARCH{% endblock %} + +{% block content %} +
+
+
+

Acerca de ARCH

+

Sistema de gestión documental especializado para arquitectura

+ +

Nuestra Misión

+

+ ARCH es una plataforma diseñada para ayudar a los arquitectos y profesionales + del sector a gestionar de manera eficiente su documentación técnica, facilitando + la organización, clasificación y acceso a los documentos relacionados con proyectos arquitectónicos. +

+ +

Características

+
    +
  • Gestión de proyectos arquitectónicos
  • +
  • Control de versiones de documentos
  • +
  • Clasificación por esquemas personalizables
  • +
  • Búsqueda avanzada de documentación
  • +
  • Informes y estadísticas
  • +
+ +

Contacto

+

+ Para más información o soporte técnico, contacta con el administrador del sistema. +

+ + +
+
+
+{% endblock %} diff --git a/templates/admin/dashboard.html b/templates/admin/dashboard.html index e69de29..3b6a4dd 100644 --- a/templates/admin/dashboard.html +++ b/templates/admin/dashboard.html @@ -0,0 +1,68 @@ +{% extends "base.html" %} + +{% block title %}Panel de Administración - ARCH{% endblock %} + +{% block content %} +
+

Panel de Administración

+ +
+
+
+
+ +
Gestión de Usuarios
+

Administre usuarios, permisos y accesos al sistema.

+ Ir a Usuarios +
+
+
+ +
+
+
+ +
Tipos de Archivo
+

Configure los tipos de archivo permitidos en el sistema.

+ Ir a Tipos de Archivo +
+
+
+ +
+
+
+ +
Estado del Sistema
+

Supervise el estado y realice tareas de mantenimiento.

+ Ver Estado +
+
+
+
+ +
+
+
+
+ +
Esquemas
+

Gestione los esquemas de documentos para proyectos.

+ Ir a Esquemas +
+
+
+ +
+
+
+ +
Proyectos
+

Acceda a todos los proyectos del sistema.

+ Ir a Proyectos +
+
+
+
+
+{% endblock %} diff --git a/templates/admin/filetypes.html b/templates/admin/filetypes.html index e69de29..3178573 100644 --- a/templates/admin/filetypes.html +++ b/templates/admin/filetypes.html @@ -0,0 +1,179 @@ +{% extends "base.html" %} + +{% block title %}Tipos de Archivo - ARCH{% endblock %} + +{% block content %} +
+
+

Tipos de Archivo

+ +
+ +
+
+
+ + + + + + + + + + + + {% for ext, filetype in filetypes.items() %} + + + + + + + + {% else %} + + + + {% endfor %} + +
ExtensiónDescripciónTipo MIMETamaño MáximoAcciones
{{ ext }}{{ filetype.descripcion }}{{ filetype.mime_type }}{{ (filetype.tamano_maximo / 1024 / 1024)|round(1) }} MB +
+ + +
+
No hay tipos de archivo definidos.
+
+
+
+
+ + + + + + + + + + +{% block extra_js %} + +{% endblock %} +{% endblock %} diff --git a/templates/admin/system.html b/templates/admin/system.html index e69de29..3ecd8f9 100644 --- a/templates/admin/system.html +++ b/templates/admin/system.html @@ -0,0 +1,99 @@ +{% extends "base.html" %} + +{% block title %}Estado del Sistema - ARCH{% endblock %} + +{% block content %} +
+

Estado del Sistema

+ +
+
+
+
+
Información del Sistema
+
+
+
+
Versión Python:
+
{{ stats.python_version }}
+ +
Plataforma:
+
{{ stats.platform }}
+ +
Proyectos:
+
{{ stats.projects_count }}
+ +
Hora del servidor:
+
{{ now.strftime('%Y-%m-%d %H:%M:%S') }}
+
+
+
+
+ +
+
+
+
Uso de Almacenamiento
+
+
+
+

{{ (stats.storage_size / (1024*1024))|round(2) }} MB

+ Espacio total utilizado +
+ +
+
+ {{ ((stats.storage_size / (1024*1024*1024))|float * 100)|round(1) }}% +
+
+ +
+
Tamaño de logs:
+
{{ (stats.log_size / 1024)|round(2) }} KB
+
+
+
+
+
+ +
+
+
+
+
Mantenimiento del Sistema
+
+
+
+
+
Limpieza de Logs
+

Elimina los archivos de log rotados para liberar espacio.

+ +
+ + +
+
+ +
+
Reinicializar Valores Predeterminados
+

Restablece los tipos de archivo y esquemas predeterminados.

+ + + Reinicializar Valores + +
+
+
+
+
+
+
+{% endblock %} diff --git a/templates/base.html b/templates/base.html index 584ecf3..301314e 100644 --- a/templates/base.html +++ b/templates/base.html @@ -3,42 +3,42 @@ - {% block title %}ARCH - Sistema de Gestión de Documentos{% endblock %} + {% block title %}ARCH - Sistema de Gestión Documental{% endblock %} - + - - + + - - {% block styles %}{% endblock %} + + + + {% block extra_css %}{% endblock %} - + - - -
- - {% with messages = get_flashed_messages(with_categories=true) %} - {% if messages %} - {% for category, message in messages %} -
- {{ message }} - -
- {% endfor %} - {% endif %} - {% endwith %} - + + + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} +
+ {% for category, message in messages %} + {% set alert_class = 'primary' %} + {% if category == 'error' or category == 'danger' %} + {% set alert_class = 'danger' %} + {% elif category == 'warning' %} + {% set alert_class = 'warning' %} + {% elif category == 'success' %} + {% set alert_class = 'success' %} + {% elif category == 'info' %} + {% set alert_class = 'info' %} + {% endif %} + + {% endfor %} +
+ {% endif %} + {% endwith %} + + +

{% block page_title %}{% endblock %}

{% block content %}{% endblock %} -
- + + -