192 lines
5.5 KiB
Python
192 lines
5.5 KiB
Python
import os
|
|
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
from flask import current_app
|
|
from flask_login import UserMixin
|
|
from app import bcrypt
|
|
from utils.file_utils import load_json_file, save_json_file
|
|
from services.auth_service import (
|
|
get_user_by_username,
|
|
get_user_by_id,
|
|
get_all_users,
|
|
create_user,
|
|
update_user,
|
|
delete_user,
|
|
)
|
|
|
|
|
|
class User(UserMixin):
|
|
"""Clase de usuario para Flask-Login."""
|
|
|
|
def __init__(self, user_id, user_data):
|
|
self.id = user_id
|
|
self.username = user_data.get("username", "")
|
|
self.nombre = user_data.get("nombre", "")
|
|
self.email = user_data.get("email", "")
|
|
self.nivel = user_data.get("nivel", 0)
|
|
self.idioma = user_data.get("idioma", "es")
|
|
self.fecha_caducidad = user_data.get("fecha_caducidad")
|
|
self.empresa = user_data.get("empresa", "")
|
|
self.estado = user_data.get("estado", "activo")
|
|
self.ultimo_acceso = user_data.get("ultimo_acceso")
|
|
|
|
def has_permission(self, required_level):
|
|
"""Verificar si el usuario tiene el nivel de permiso requerido."""
|
|
return self.nivel >= required_level and self.estado == "activo"
|
|
|
|
def to_dict(self):
|
|
"""Convertir el usuario a un diccionario para almacenamiento."""
|
|
return {
|
|
"username": self.username,
|
|
"nombre": self.nombre,
|
|
"email": self.email,
|
|
"nivel": self.nivel,
|
|
"idioma": self.idioma,
|
|
"fecha_caducidad": self.fecha_caducidad,
|
|
"empresa": self.empresa,
|
|
"estado": self.estado,
|
|
"ultimo_acceso": self.ultimo_acceso,
|
|
}
|
|
|
|
|
|
def get_users_file_path():
|
|
"""Obtener la ruta al archivo de usuarios."""
|
|
storage_path = current_app.config["STORAGE_PATH"]
|
|
return os.path.join(storage_path, "users", "users.json")
|
|
|
|
|
|
def get_all_users():
|
|
"""Obtener todos los usuarios del sistema."""
|
|
users_data = load_json_file(get_users_file_path(), {})
|
|
return users_data
|
|
|
|
|
|
def get_user_by_id(user_id):
|
|
"""Obtener un usuario por su ID."""
|
|
if not user_id:
|
|
return None
|
|
|
|
users_data = load_json_file(get_users_file_path(), {})
|
|
|
|
if user_id in users_data:
|
|
return User(user_id, users_data[user_id])
|
|
|
|
return None
|
|
|
|
|
|
# Este archivo importa funcionalidades de auth_service.py para mantener
|
|
# consistencia en la estructura del proyecto, y agrega funcionalidades específicas
|
|
# de gestión de usuarios que no están relacionadas con la autenticación.
|
|
|
|
|
|
def filter_users(filter_params):
|
|
"""
|
|
Filtrar usuarios según los parámetros proporcionados.
|
|
|
|
Args:
|
|
filter_params (dict): Diccionario con parámetros de filtrado
|
|
- empresa: Nombre de empresa
|
|
- estado: Estado del usuario ('activo', 'inactivo')
|
|
- nivel_min: Nivel mínimo de permiso
|
|
- nivel_max: Nivel máximo de permiso
|
|
|
|
Returns:
|
|
list: Lista de objetos User que cumplen los criterios
|
|
"""
|
|
users = get_all_users()
|
|
filtered_users = []
|
|
|
|
for user in users:
|
|
# Filtrar por empresa
|
|
if "empresa" in filter_params and filter_params["empresa"]:
|
|
if user.empresa != filter_params["empresa"]:
|
|
continue
|
|
|
|
# Filtrar por estado
|
|
if "estado" in filter_params and filter_params["estado"]:
|
|
if user.estado != filter_params["estado"]:
|
|
continue
|
|
|
|
# Filtrar por nivel mínimo
|
|
if "nivel_min" in filter_params and filter_params["nivel_min"] is not None:
|
|
if user.nivel < int(filter_params["nivel_min"]):
|
|
continue
|
|
|
|
# Filtrar por nivel máximo
|
|
if "nivel_max" in filter_params and filter_params["nivel_max"] is not None:
|
|
if user.nivel > int(filter_params["nivel_max"]):
|
|
continue
|
|
|
|
# Si pasó todos los filtros, agregar a la lista
|
|
filtered_users.append(user)
|
|
|
|
return filtered_users
|
|
|
|
|
|
def get_user_stats():
|
|
"""
|
|
Obtener estadísticas sobre los usuarios.
|
|
|
|
Returns:
|
|
dict: Diccionario con estadísticas
|
|
"""
|
|
users = get_all_users()
|
|
|
|
# Inicializar estadísticas
|
|
stats = {
|
|
"total": len(users),
|
|
"activos": 0,
|
|
"inactivos": 0,
|
|
"expirados": 0,
|
|
"por_empresa": {},
|
|
"por_nivel": {
|
|
"admin": 0, # Nivel 9000+
|
|
"gestor": 0, # Nivel 5000-8999
|
|
"editor": 0, # Nivel 1000-4999
|
|
"lector": 0, # Nivel 0-999
|
|
},
|
|
}
|
|
|
|
# Calcular estadísticas
|
|
for user in users:
|
|
# Estado
|
|
if user.estado == "activo":
|
|
stats["activos"] += 1
|
|
else:
|
|
stats["inactivos"] += 1
|
|
|
|
# Expirados
|
|
if user.is_expired():
|
|
stats["expirados"] += 1
|
|
|
|
# Por empresa
|
|
empresa = user.empresa or "Sin empresa"
|
|
stats["por_empresa"][empresa] = stats["por_empresa"].get(empresa, 0) + 1
|
|
|
|
# Por nivel
|
|
if user.nivel >= 9000:
|
|
stats["por_nivel"]["admin"] += 1
|
|
elif user.nivel >= 5000:
|
|
stats["por_nivel"]["gestor"] += 1
|
|
elif user.nivel >= 1000:
|
|
stats["por_nivel"]["editor"] += 1
|
|
else:
|
|
stats["por_nivel"]["lector"] += 1
|
|
|
|
return stats
|
|
|
|
|
|
def check_username_availability(username):
|
|
"""
|
|
Verificar si un nombre de usuario está disponible.
|
|
|
|
Args:
|
|
username (str): Nombre de usuario a verificar
|
|
|
|
Returns:
|
|
bool: True si está disponible, False si ya existe
|
|
"""
|
|
user = get_user_by_username(username)
|
|
return user is None
|