Arch/utils/logger.py

348 lines
11 KiB
Python

import os
import json
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from datetime import datetime
import pytz
from flask import current_app, request, has_request_context
class RequestFormatter(logging.Formatter):
"""Formatter that adds request-specific info to logs when available."""
def format(self, record):
if has_request_context():
record.url = request.url
record.method = request.method
record.remote_addr = request.remote_addr
record.user_agent = request.user_agent
else:
record.url = None
record.method = None
record.remote_addr = None
record.user_agent = None
return super().format(record)
def setup_logger(app):
"""Configure application logging.
Args:
app: Flask application instance
"""
log_dir = os.path.join(app.config.get("STORAGE_PATH", "storage"), "logs")
os.makedirs(log_dir, exist_ok=True)
# Determine environment mode (using debug instead of ENV)
env_mode = "development" if app.debug else "production"
# Configure log level based on environment
log_level = logging.DEBUG if app.debug else logging.INFO
# Base file names
log_file = os.path.join(log_dir, f"app.log")
error_log_file = os.path.join(log_dir, f"error.log")
access_log_file = os.path.join(log_dir, f"access.log")
# Configure formatters
standard_formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(module)s: %(message)s"
)
detailed_formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(module)s.%(funcName)s:%(lineno)d: %(message)s"
)
access_formatter = RequestFormatter(
"%(asctime)s - %(remote_addr)s - %(method)s %(url)s - "
"%(user_agent)s - %(message)s"
)
# Create system logger (general application logs)
system_logger = logging.getLogger("app")
system_logger.setLevel(log_level)
# Rotating file handler for general logs (10 MB files, keep 10 backups)
rfh = RotatingFileHandler(
log_file, maxBytes=10 * 1024 * 1024, backupCount=10, encoding="utf-8"
)
rfh.setFormatter(standard_formatter)
rfh.setLevel(log_level)
system_logger.addHandler(rfh)
# Error logger with more detailed output
error_logger = logging.getLogger("app.error")
error_logger.setLevel(logging.ERROR)
error_handler = RotatingFileHandler(
error_log_file, maxBytes=10 * 1024 * 1024, backupCount=10, encoding="utf-8"
)
error_handler.setFormatter(detailed_formatter)
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
# Access logger for HTTP requests
access_logger = logging.getLogger("app.access")
access_logger.setLevel(logging.INFO)
# Daily rotating access logs
access_handler = TimedRotatingFileHandler(
access_log_file, when="midnight", interval=1, backupCount=30, encoding="utf-8"
)
access_handler.setFormatter(access_formatter)
access_handler.setLevel(logging.INFO)
access_logger.addHandler(access_handler)
# Add handlers to Flask logger
for handler in [rfh, error_handler]:
app.logger.addHandler(handler)
app.logger.setLevel(log_level)
# Initial startup log entries
system_logger.info(f"Aplicación iniciada en modo: {env_mode}")
system_logger.info(
f"Nivel de logging establecido a: {logging.getLevelName(log_level)}"
)
# Register access log handler for requests
@app.after_request
def log_request(response):
if not request.path.startswith("/static/"):
access_logger.info(
f"Status: {response.status_code} - Size: {response.calculate_content_length()}"
)
return response
return system_logger
def log_access(username, endpoint, method, ip_address, status_code):
"""
Registrar acceso a la aplicación.
Args:
username (str): Nombre de usuario
endpoint (str): Endpoint accedido
method (str): Método HTTP
ip_address (str): Dirección IP
status_code (int): Código de estado HTTP
"""
logger = logging.getLogger("app.access")
logger.info(
f"Usuario: {username} | Endpoint: {endpoint} | Método: {method} | IP: {ip_address} | Estado: {status_code}"
)
def log_error(error, username=None):
"""
Registrar error en la aplicación.
Args:
error: Excepción o mensaje de error
username (str, optional): Nombre de usuario
"""
logger = logging.getLogger("app.error")
if username:
logger.error(f"Usuario: {username} | Error: {str(error)}")
else:
logger.error(str(error))
def log_system_event(event_type, message):
"""
Registrar evento del sistema.
Args:
event_type (str): Tipo de evento
message (str): Mensaje descriptivo
"""
logger = logging.getLogger("app")
logger.info(f"Evento: {event_type} | {message}")
def log_user_activity(
user_id, activity_type, ip_address=None, user_agent=None, details=None
):
"""
Registrar actividad de usuario para auditoría.
Args:
user_id (str): ID del usuario
activity_type (str): Tipo de actividad
ip_address (str, optional): Dirección IP
user_agent (str, optional): User-Agent del navegador
details (dict, optional): Detalles adicionales
"""
logger = logging.getLogger("audit")
# Crear registro de actividad
activity = {
"timestamp": datetime.now(pytz.UTC).isoformat(),
"user_id": user_id,
"activity_type": activity_type,
"ip_address": ip_address or request.remote_addr if request else "unknown",
"user_agent": user_agent
or (request.user_agent.string if request and request.user_agent else "unknown"),
}
# Añadir detalles si se proporcionan
if details:
activity["details"] = details
# Registrar como JSON
logger.info(json.dumps(activity))
def log_document_access(user_id, project_id, document_id, version, action):
"""
Registrar acceso a documentos.
Args:
user_id (str): ID del usuario
project_id (int): ID del proyecto
document_id (int): ID del documento
version (int): Versión del documento
action (str): Acción realizada ('view', 'download', 'upload')
"""
log_user_activity(
user_id=user_id,
activity_type=f"document_{action}",
details={
"project_id": project_id,
"document_id": document_id,
"version": version,
},
)
def log_project_activity(user_id, project_id, action, details=None):
"""
Registrar actividad en proyectos.
Args:
user_id (str): ID del usuario
project_id (int): ID del proyecto
action (str): Acción realizada ('create', 'update', 'delete')
details (dict, optional): Detalles adicionales
"""
activity_details = {"project_id": project_id}
if details:
activity_details.update(details)
log_user_activity(
user_id=user_id, activity_type=f"project_{action}", details=activity_details
)
def log_user_management(admin_id, target_user_id, action, details=None):
"""
Registrar actividad de gestión de usuarios.
Args:
admin_id (str): ID del administrador
target_user_id (str): ID del usuario objetivo
action (str): Acción realizada ('create', 'update', 'delete')
details (dict, optional): Detalles adicionales
"""
activity_details = {"target_user_id": target_user_id}
if details:
activity_details.update(details)
log_user_activity(
user_id=admin_id, activity_type=f"user_{action}", details=activity_details
)
def get_audit_logs(filters=None, limit=100):
"""
Obtener registros de auditoría filtrados.
Args:
filters (dict, optional): Filtros a aplicar
- user_id: ID de usuario
- activity_type: Tipo de actividad
- start_date: Fecha de inicio (ISO format)
- end_date: Fecha de fin (ISO format)
limit (int, optional): Número máximo de registros a devolver
Returns:
list: Lista de registros de auditoría
"""
log_dir = current_app.config["LOG_DIR"]
audit_log_file = os.path.join(log_dir, "audit.log")
if not os.path.exists(audit_log_file):
return []
logs = []
with open(audit_log_file, "r") as f:
for line in f:
try:
# Extraer la parte JSON del log
json_start = line.find("{")
if json_start == -1:
continue
json_str = line[json_start:]
log_entry = json.loads(json_str)
# Aplicar filtros
if filters:
# Filtrar por usuario
if (
"user_id" in filters
and filters["user_id"]
and log_entry.get("user_id") != filters["user_id"]
):
continue
# Filtrar por tipo de actividad
if (
"activity_type" in filters
and filters["activity_type"]
and log_entry.get("activity_type") != filters["activity_type"]
):
continue
# Filtrar por fecha de inicio
if "start_date" in filters and filters["start_date"]:
start_date = datetime.fromisoformat(
filters["start_date"].replace("Z", "+00:00")
)
log_date = datetime.fromisoformat(
log_entry.get("timestamp", "").replace("Z", "+00:00")
)
if log_date < start_date:
continue
# Filtrar por fecha de fin
if "end_date" in filters and filters["end_date"]:
end_date = datetime.fromisoformat(
filters["end_date"].replace("Z", "+00:00")
)
log_date = datetime.fromisoformat(
log_entry.get("timestamp", "").replace("Z", "+00:00")
)
if log_date > end_date:
continue
logs.append(log_entry)
# Limitar número de registros
if len(logs) >= limit:
break
except (json.JSONDecodeError, ValueError):
continue
# Ordenar por timestamp (más reciente primero)
logs.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
return logs