import os import json import logging from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler from datetime import datetime import pytz from flask import current_app, request, has_request_context class RequestFormatter(logging.Formatter): """Formatter that adds request-specific info to logs when available.""" def format(self, record): if has_request_context(): record.url = request.url record.method = request.method record.remote_addr = request.remote_addr record.user_agent = request.user_agent else: record.url = None record.method = None record.remote_addr = None record.user_agent = None return super().format(record) def setup_logger(app): """Configure application logging. Args: app: Flask application instance """ log_dir = os.path.join(app.config.get("STORAGE_PATH", "storage"), "logs") os.makedirs(log_dir, exist_ok=True) # Determine environment mode (using debug instead of ENV) env_mode = "development" if app.debug else "production" # Configure log level based on environment log_level = logging.DEBUG if app.debug else logging.INFO # Base file names log_file = os.path.join(log_dir, f"app.log") error_log_file = os.path.join(log_dir, f"error.log") access_log_file = os.path.join(log_dir, f"access.log") # Configure formatters standard_formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(module)s: %(message)s" ) detailed_formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(module)s.%(funcName)s:%(lineno)d: %(message)s" ) access_formatter = RequestFormatter( "%(asctime)s - %(remote_addr)s - %(method)s %(url)s - " "%(user_agent)s - %(message)s" ) # Create system logger (general application logs) system_logger = logging.getLogger("app") system_logger.setLevel(log_level) # Rotating file handler for general logs (10 MB files, keep 10 backups) rfh = RotatingFileHandler( log_file, maxBytes=10 * 1024 * 1024, backupCount=10, encoding="utf-8" ) rfh.setFormatter(standard_formatter) rfh.setLevel(log_level) system_logger.addHandler(rfh) # Error logger with more detailed output error_logger = logging.getLogger("app.error") error_logger.setLevel(logging.ERROR) error_handler = RotatingFileHandler( error_log_file, maxBytes=10 * 1024 * 1024, backupCount=10, encoding="utf-8" ) error_handler.setFormatter(detailed_formatter) error_handler.setLevel(logging.ERROR) error_logger.addHandler(error_handler) # Access logger for HTTP requests access_logger = logging.getLogger("app.access") access_logger.setLevel(logging.INFO) # Daily rotating access logs access_handler = TimedRotatingFileHandler( access_log_file, when="midnight", interval=1, backupCount=30, encoding="utf-8" ) access_handler.setFormatter(access_formatter) access_handler.setLevel(logging.INFO) access_logger.addHandler(access_handler) # Add handlers to Flask logger for handler in [rfh, error_handler]: app.logger.addHandler(handler) app.logger.setLevel(log_level) # Initial startup log entries system_logger.info(f"Aplicación iniciada en modo: {env_mode}") system_logger.info( f"Nivel de logging establecido a: {logging.getLevelName(log_level)}" ) # Register access log handler for requests @app.after_request def log_request(response): if not request.path.startswith("/static/"): access_logger.info( f"Status: {response.status_code} - Size: {response.calculate_content_length()}" ) return response return system_logger def log_access(username, endpoint, method, ip_address, status_code): """ Registrar acceso a la aplicación. Args: username (str): Nombre de usuario endpoint (str): Endpoint accedido method (str): Método HTTP ip_address (str): Dirección IP status_code (int): Código de estado HTTP """ logger = logging.getLogger("app.access") logger.info( f"Usuario: {username} | Endpoint: {endpoint} | Método: {method} | IP: {ip_address} | Estado: {status_code}" ) def log_error(error, username=None): """ Registrar error en la aplicación. Args: error: Excepción o mensaje de error username (str, optional): Nombre de usuario """ logger = logging.getLogger("app.error") if username: logger.error(f"Usuario: {username} | Error: {str(error)}") else: logger.error(str(error)) def log_system_event(event_type, message): """ Registrar evento del sistema. Args: event_type (str): Tipo de evento message (str): Mensaje descriptivo """ logger = logging.getLogger("app") logger.info(f"Evento: {event_type} | {message}") def log_user_activity( user_id, activity_type, ip_address=None, user_agent=None, details=None ): """ Registrar actividad de usuario para auditoría. Args: user_id (str): ID del usuario activity_type (str): Tipo de actividad ip_address (str, optional): Dirección IP user_agent (str, optional): User-Agent del navegador details (dict, optional): Detalles adicionales """ logger = logging.getLogger("audit") # Crear registro de actividad activity = { "timestamp": datetime.now(pytz.UTC).isoformat(), "user_id": user_id, "activity_type": activity_type, "ip_address": ip_address or request.remote_addr if request else "unknown", "user_agent": user_agent or (request.user_agent.string if request and request.user_agent else "unknown"), } # Añadir detalles si se proporcionan if details: activity["details"] = details # Registrar como JSON logger.info(json.dumps(activity)) def log_document_access(user_id, project_id, document_id, version, action): """ Registrar acceso a documentos. Args: user_id (str): ID del usuario project_id (int): ID del proyecto document_id (int): ID del documento version (int): Versión del documento action (str): Acción realizada ('view', 'download', 'upload') """ log_user_activity( user_id=user_id, activity_type=f"document_{action}", details={ "project_id": project_id, "document_id": document_id, "version": version, }, ) def log_project_activity(user_id, project_id, action, details=None): """ Registrar actividad en proyectos. Args: user_id (str): ID del usuario project_id (int): ID del proyecto action (str): Acción realizada ('create', 'update', 'delete') details (dict, optional): Detalles adicionales """ activity_details = {"project_id": project_id} if details: activity_details.update(details) log_user_activity( user_id=user_id, activity_type=f"project_{action}", details=activity_details ) def log_user_management(admin_id, target_user_id, action, details=None): """ Registrar actividad de gestión de usuarios. Args: admin_id (str): ID del administrador target_user_id (str): ID del usuario objetivo action (str): Acción realizada ('create', 'update', 'delete') details (dict, optional): Detalles adicionales """ activity_details = {"target_user_id": target_user_id} if details: activity_details.update(details) log_user_activity( user_id=admin_id, activity_type=f"user_{action}", details=activity_details ) def get_audit_logs(filters=None, limit=100): """ Obtener registros de auditoría filtrados. Args: filters (dict, optional): Filtros a aplicar - user_id: ID de usuario - activity_type: Tipo de actividad - start_date: Fecha de inicio (ISO format) - end_date: Fecha de fin (ISO format) limit (int, optional): Número máximo de registros a devolver Returns: list: Lista de registros de auditoría """ log_dir = current_app.config["LOG_DIR"] audit_log_file = os.path.join(log_dir, "audit.log") if not os.path.exists(audit_log_file): return [] logs = [] with open(audit_log_file, "r") as f: for line in f: try: # Extraer la parte JSON del log json_start = line.find("{") if json_start == -1: continue json_str = line[json_start:] log_entry = json.loads(json_str) # Aplicar filtros if filters: # Filtrar por usuario if ( "user_id" in filters and filters["user_id"] and log_entry.get("user_id") != filters["user_id"] ): continue # Filtrar por tipo de actividad if ( "activity_type" in filters and filters["activity_type"] and log_entry.get("activity_type") != filters["activity_type"] ): continue # Filtrar por fecha de inicio if "start_date" in filters and filters["start_date"]: start_date = datetime.fromisoformat( filters["start_date"].replace("Z", "+00:00") ) log_date = datetime.fromisoformat( log_entry.get("timestamp", "").replace("Z", "+00:00") ) if log_date < start_date: continue # Filtrar por fecha de fin if "end_date" in filters and filters["end_date"]: end_date = datetime.fromisoformat( filters["end_date"].replace("Z", "+00:00") ) log_date = datetime.fromisoformat( log_entry.get("timestamp", "").replace("Z", "+00:00") ) if log_date > end_date: continue logs.append(log_entry) # Limitar número de registros if len(logs) >= limit: break except (json.JSONDecodeError, ValueError): continue # Ordenar por timestamp (más reciente primero) logs.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return logs