348 lines
11 KiB
Python
348 lines
11 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
|
|
from datetime import datetime
|
|
import pytz
|
|
from flask import current_app, request, has_request_context
|
|
|
|
|
|
class RequestFormatter(logging.Formatter):
|
|
"""Formatter that adds request-specific info to logs when available."""
|
|
|
|
def format(self, record):
|
|
if has_request_context():
|
|
record.url = request.url
|
|
record.method = request.method
|
|
record.remote_addr = request.remote_addr
|
|
record.user_agent = request.user_agent
|
|
else:
|
|
record.url = None
|
|
record.method = None
|
|
record.remote_addr = None
|
|
record.user_agent = None
|
|
|
|
return super().format(record)
|
|
|
|
|
|
def setup_logger(app):
|
|
"""Configure application logging.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
log_dir = os.path.join(app.config.get("STORAGE_PATH", "storage"), "logs")
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Determine environment mode (using debug instead of ENV)
|
|
env_mode = "development" if app.debug else "production"
|
|
|
|
# Configure log level based on environment
|
|
log_level = logging.DEBUG if app.debug else logging.INFO
|
|
|
|
# Base file names
|
|
log_file = os.path.join(log_dir, f"app.log")
|
|
error_log_file = os.path.join(log_dir, f"error.log")
|
|
access_log_file = os.path.join(log_dir, f"access.log")
|
|
|
|
# Configure formatters
|
|
standard_formatter = logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(module)s: %(message)s"
|
|
)
|
|
|
|
detailed_formatter = logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(module)s.%(funcName)s:%(lineno)d: %(message)s"
|
|
)
|
|
|
|
access_formatter = RequestFormatter(
|
|
"%(asctime)s - %(remote_addr)s - %(method)s %(url)s - "
|
|
"%(user_agent)s - %(message)s"
|
|
)
|
|
|
|
# Create system logger (general application logs)
|
|
system_logger = logging.getLogger("app")
|
|
system_logger.setLevel(log_level)
|
|
|
|
# Rotating file handler for general logs (10 MB files, keep 10 backups)
|
|
rfh = RotatingFileHandler(
|
|
log_file, maxBytes=10 * 1024 * 1024, backupCount=10, encoding="utf-8"
|
|
)
|
|
rfh.setFormatter(standard_formatter)
|
|
rfh.setLevel(log_level)
|
|
system_logger.addHandler(rfh)
|
|
|
|
# Error logger with more detailed output
|
|
error_logger = logging.getLogger("app.error")
|
|
error_logger.setLevel(logging.ERROR)
|
|
|
|
error_handler = RotatingFileHandler(
|
|
error_log_file, maxBytes=10 * 1024 * 1024, backupCount=10, encoding="utf-8"
|
|
)
|
|
error_handler.setFormatter(detailed_formatter)
|
|
error_handler.setLevel(logging.ERROR)
|
|
error_logger.addHandler(error_handler)
|
|
|
|
# Access logger for HTTP requests
|
|
access_logger = logging.getLogger("app.access")
|
|
access_logger.setLevel(logging.INFO)
|
|
|
|
# Daily rotating access logs
|
|
access_handler = TimedRotatingFileHandler(
|
|
access_log_file, when="midnight", interval=1, backupCount=30, encoding="utf-8"
|
|
)
|
|
access_handler.setFormatter(access_formatter)
|
|
access_handler.setLevel(logging.INFO)
|
|
access_logger.addHandler(access_handler)
|
|
|
|
# Add handlers to Flask logger
|
|
for handler in [rfh, error_handler]:
|
|
app.logger.addHandler(handler)
|
|
|
|
app.logger.setLevel(log_level)
|
|
|
|
# Initial startup log entries
|
|
system_logger.info(f"Aplicación iniciada en modo: {env_mode}")
|
|
system_logger.info(
|
|
f"Nivel de logging establecido a: {logging.getLevelName(log_level)}"
|
|
)
|
|
|
|
# Register access log handler for requests
|
|
@app.after_request
|
|
def log_request(response):
|
|
if not request.path.startswith("/static/"):
|
|
access_logger.info(
|
|
f"Status: {response.status_code} - Size: {response.calculate_content_length()}"
|
|
)
|
|
return response
|
|
|
|
return system_logger
|
|
|
|
|
|
def log_access(username, endpoint, method, ip_address, status_code):
|
|
"""
|
|
Registrar acceso a la aplicación.
|
|
|
|
Args:
|
|
username (str): Nombre de usuario
|
|
endpoint (str): Endpoint accedido
|
|
method (str): Método HTTP
|
|
ip_address (str): Dirección IP
|
|
status_code (int): Código de estado HTTP
|
|
"""
|
|
logger = logging.getLogger("app.access")
|
|
logger.info(
|
|
f"Usuario: {username} | Endpoint: {endpoint} | Método: {method} | IP: {ip_address} | Estado: {status_code}"
|
|
)
|
|
|
|
|
|
def log_error(error, username=None):
|
|
"""
|
|
Registrar error en la aplicación.
|
|
|
|
Args:
|
|
error: Excepción o mensaje de error
|
|
username (str, optional): Nombre de usuario
|
|
"""
|
|
logger = logging.getLogger("app.error")
|
|
|
|
if username:
|
|
logger.error(f"Usuario: {username} | Error: {str(error)}")
|
|
else:
|
|
logger.error(str(error))
|
|
|
|
|
|
def log_system_event(event_type, message):
|
|
"""
|
|
Registrar evento del sistema.
|
|
|
|
Args:
|
|
event_type (str): Tipo de evento
|
|
message (str): Mensaje descriptivo
|
|
"""
|
|
logger = logging.getLogger("app")
|
|
logger.info(f"Evento: {event_type} | {message}")
|
|
|
|
|
|
def log_user_activity(
|
|
user_id, activity_type, ip_address=None, user_agent=None, details=None
|
|
):
|
|
"""
|
|
Registrar actividad de usuario para auditoría.
|
|
|
|
Args:
|
|
user_id (str): ID del usuario
|
|
activity_type (str): Tipo de actividad
|
|
ip_address (str, optional): Dirección IP
|
|
user_agent (str, optional): User-Agent del navegador
|
|
details (dict, optional): Detalles adicionales
|
|
"""
|
|
logger = logging.getLogger("audit")
|
|
|
|
# Crear registro de actividad
|
|
activity = {
|
|
"timestamp": datetime.now(pytz.UTC).isoformat(),
|
|
"user_id": user_id,
|
|
"activity_type": activity_type,
|
|
"ip_address": ip_address or request.remote_addr if request else "unknown",
|
|
"user_agent": user_agent
|
|
or (request.user_agent.string if request and request.user_agent else "unknown"),
|
|
}
|
|
|
|
# Añadir detalles si se proporcionan
|
|
if details:
|
|
activity["details"] = details
|
|
|
|
# Registrar como JSON
|
|
logger.info(json.dumps(activity))
|
|
|
|
|
|
def log_document_access(user_id, project_id, document_id, version, action):
|
|
"""
|
|
Registrar acceso a documentos.
|
|
|
|
Args:
|
|
user_id (str): ID del usuario
|
|
project_id (int): ID del proyecto
|
|
document_id (int): ID del documento
|
|
version (int): Versión del documento
|
|
action (str): Acción realizada ('view', 'download', 'upload')
|
|
"""
|
|
log_user_activity(
|
|
user_id=user_id,
|
|
activity_type=f"document_{action}",
|
|
details={
|
|
"project_id": project_id,
|
|
"document_id": document_id,
|
|
"version": version,
|
|
},
|
|
)
|
|
|
|
|
|
def log_project_activity(user_id, project_id, action, details=None):
|
|
"""
|
|
Registrar actividad en proyectos.
|
|
|
|
Args:
|
|
user_id (str): ID del usuario
|
|
project_id (int): ID del proyecto
|
|
action (str): Acción realizada ('create', 'update', 'delete')
|
|
details (dict, optional): Detalles adicionales
|
|
"""
|
|
activity_details = {"project_id": project_id}
|
|
|
|
if details:
|
|
activity_details.update(details)
|
|
|
|
log_user_activity(
|
|
user_id=user_id, activity_type=f"project_{action}", details=activity_details
|
|
)
|
|
|
|
|
|
def log_user_management(admin_id, target_user_id, action, details=None):
|
|
"""
|
|
Registrar actividad de gestión de usuarios.
|
|
|
|
Args:
|
|
admin_id (str): ID del administrador
|
|
target_user_id (str): ID del usuario objetivo
|
|
action (str): Acción realizada ('create', 'update', 'delete')
|
|
details (dict, optional): Detalles adicionales
|
|
"""
|
|
activity_details = {"target_user_id": target_user_id}
|
|
|
|
if details:
|
|
activity_details.update(details)
|
|
|
|
log_user_activity(
|
|
user_id=admin_id, activity_type=f"user_{action}", details=activity_details
|
|
)
|
|
|
|
|
|
def get_audit_logs(filters=None, limit=100):
|
|
"""
|
|
Obtener registros de auditoría filtrados.
|
|
|
|
Args:
|
|
filters (dict, optional): Filtros a aplicar
|
|
- user_id: ID de usuario
|
|
- activity_type: Tipo de actividad
|
|
- start_date: Fecha de inicio (ISO format)
|
|
- end_date: Fecha de fin (ISO format)
|
|
limit (int, optional): Número máximo de registros a devolver
|
|
|
|
Returns:
|
|
list: Lista de registros de auditoría
|
|
"""
|
|
log_dir = current_app.config["LOG_DIR"]
|
|
audit_log_file = os.path.join(log_dir, "audit.log")
|
|
|
|
if not os.path.exists(audit_log_file):
|
|
return []
|
|
|
|
logs = []
|
|
|
|
with open(audit_log_file, "r") as f:
|
|
for line in f:
|
|
try:
|
|
# Extraer la parte JSON del log
|
|
json_start = line.find("{")
|
|
if json_start == -1:
|
|
continue
|
|
|
|
json_str = line[json_start:]
|
|
log_entry = json.loads(json_str)
|
|
|
|
# Aplicar filtros
|
|
if filters:
|
|
# Filtrar por usuario
|
|
if (
|
|
"user_id" in filters
|
|
and filters["user_id"]
|
|
and log_entry.get("user_id") != filters["user_id"]
|
|
):
|
|
continue
|
|
|
|
# Filtrar por tipo de actividad
|
|
if (
|
|
"activity_type" in filters
|
|
and filters["activity_type"]
|
|
and log_entry.get("activity_type") != filters["activity_type"]
|
|
):
|
|
continue
|
|
|
|
# Filtrar por fecha de inicio
|
|
if "start_date" in filters and filters["start_date"]:
|
|
start_date = datetime.fromisoformat(
|
|
filters["start_date"].replace("Z", "+00:00")
|
|
)
|
|
log_date = datetime.fromisoformat(
|
|
log_entry.get("timestamp", "").replace("Z", "+00:00")
|
|
)
|
|
if log_date < start_date:
|
|
continue
|
|
|
|
# Filtrar por fecha de fin
|
|
if "end_date" in filters and filters["end_date"]:
|
|
end_date = datetime.fromisoformat(
|
|
filters["end_date"].replace("Z", "+00:00")
|
|
)
|
|
log_date = datetime.fromisoformat(
|
|
log_entry.get("timestamp", "").replace("Z", "+00:00")
|
|
)
|
|
if log_date > end_date:
|
|
continue
|
|
|
|
logs.append(log_entry)
|
|
|
|
# Limitar número de registros
|
|
if len(logs) >= limit:
|
|
break
|
|
|
|
except (json.JSONDecodeError, ValueError):
|
|
continue
|
|
|
|
# Ordenar por timestamp (más reciente primero)
|
|
logs.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
|
|
|
|
return logs
|