200 lines
6.2 KiB
Python
200 lines
6.2 KiB
Python
|
from flask import current_app
|
||
|
from flask_login import UserMixin
|
||
|
import json
|
||
|
import os
|
||
|
from datetime import datetime
|
||
|
import pytz
|
||
|
from app import bcrypt
|
||
|
|
||
|
class User(UserMixin):
|
||
|
"""Clase de usuario para Flask-Login."""
|
||
|
|
||
|
def __init__(self, username, data):
|
||
|
self.id = username
|
||
|
self.username = username
|
||
|
self.nombre = data.get('nombre', '')
|
||
|
self.email = data.get('email', '')
|
||
|
self.password_hash = data.get('password_hash', '')
|
||
|
self.nivel = data.get('nivel', 0)
|
||
|
self.idioma = data.get('idioma', 'es')
|
||
|
self.fecha_caducidad = data.get('fecha_caducidad')
|
||
|
self.empresa = data.get('empresa', '')
|
||
|
self.estado = data.get('estado', 'inactivo')
|
||
|
self.ultimo_acceso = data.get('ultimo_acceso')
|
||
|
|
||
|
def verify_password(self, password):
|
||
|
"""Verificar contraseña."""
|
||
|
return bcrypt.check_password_hash(self.password_hash, password)
|
||
|
|
||
|
def is_active(self):
|
||
|
"""Verificar si el usuario está activo."""
|
||
|
return self.estado == 'activo'
|
||
|
|
||
|
def is_expired(self):
|
||
|
"""Verificar si la cuenta del usuario ha expirado."""
|
||
|
if not self.fecha_caducidad:
|
||
|
return False
|
||
|
|
||
|
now = datetime.now(pytz.UTC)
|
||
|
expiry_date = datetime.fromisoformat(self.fecha_caducidad.replace('Z', '+00:00'))
|
||
|
return now > expiry_date
|
||
|
|
||
|
def has_permission(self, required_level):
|
||
|
"""Verificar si el usuario tiene el nivel de permiso requerido."""
|
||
|
return self.nivel >= required_level
|
||
|
|
||
|
def to_dict(self):
|
||
|
"""Convertir usuario a diccionario."""
|
||
|
return {
|
||
|
'nombre': self.nombre,
|
||
|
'username': self.username,
|
||
|
'email': self.email,
|
||
|
'password_hash': self.password_hash,
|
||
|
'nivel': self.nivel,
|
||
|
'idioma': self.idioma,
|
||
|
'fecha_caducidad': self.fecha_caducidad,
|
||
|
'empresa': self.empresa,
|
||
|
'estado': self.estado,
|
||
|
'ultimo_acceso': self.ultimo_acceso
|
||
|
}
|
||
|
|
||
|
def authenticate_user(username, password):
|
||
|
"""Autenticar usuario con nombre de usuario y contraseña."""
|
||
|
user = get_user_by_username(username)
|
||
|
|
||
|
if user and user.verify_password(password) and user.is_active() and not user.is_expired():
|
||
|
# Actualizar último acceso
|
||
|
update_last_access(username)
|
||
|
return user
|
||
|
|
||
|
return None
|
||
|
|
||
|
def get_user_by_username(username):
|
||
|
"""Obtener usuario por nombre de usuario."""
|
||
|
users = load_users()
|
||
|
|
||
|
if username in users:
|
||
|
return User(username, users[username])
|
||
|
|
||
|
return None
|
||
|
|
||
|
def get_user_by_id(user_id):
|
||
|
"""Cargar usuario por ID (username) para Flask-Login."""
|
||
|
return get_user_by_username(user_id)
|
||
|
|
||
|
def update_last_access(username):
|
||
|
"""Actualizar el timestamp de último acceso."""
|
||
|
users = load_users()
|
||
|
|
||
|
if username in users:
|
||
|
users[username]['ultimo_acceso'] = datetime.now(pytz.UTC).isoformat()
|
||
|
save_users(users)
|
||
|
|
||
|
def create_user(username, nombre, email, password, nivel=0, idioma='es',
|
||
|
fecha_caducidad=None, empresa='', estado='activo'):
|
||
|
"""Crear un nuevo usuario."""
|
||
|
users = load_users()
|
||
|
|
||
|
# Verificar si ya existe el usuario
|
||
|
if username in users:
|
||
|
return False, "El nombre de usuario ya existe."
|
||
|
|
||
|
# Generar hash para la contraseña
|
||
|
password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
|
||
|
|
||
|
# Crear entrada de usuario
|
||
|
users[username] = {
|
||
|
'nombre': nombre,
|
||
|
'username': username,
|
||
|
'email': email,
|
||
|
'password_hash': password_hash,
|
||
|
'nivel': nivel,
|
||
|
'idioma': idioma,
|
||
|
'fecha_caducidad': fecha_caducidad,
|
||
|
'empresa': empresa,
|
||
|
'estado': estado,
|
||
|
'ultimo_acceso': datetime.now(pytz.UTC).isoformat()
|
||
|
}
|
||
|
|
||
|
# Guardar usuarios
|
||
|
save_users(users)
|
||
|
|
||
|
return True, "Usuario creado correctamente."
|
||
|
|
||
|
def update_user(username, data):
|
||
|
"""Actualizar datos de usuario."""
|
||
|
users = load_users()
|
||
|
|
||
|
# Verificar si existe el usuario
|
||
|
if username not in users:
|
||
|
return False, "El usuario no existe."
|
||
|
|
||
|
# Actualizar campos (excepto password si no se especifica)
|
||
|
for key, value in data.items():
|
||
|
if key != 'password': # Contraseña se maneja separado
|
||
|
users[username][key] = value
|
||
|
|
||
|
# Actualizar contraseña si se proporciona
|
||
|
if 'password' in data and data['password']:
|
||
|
users[username]['password_hash'] = bcrypt.generate_password_hash(
|
||
|
data['password']).decode('utf-8')
|
||
|
|
||
|
# Guardar usuarios
|
||
|
save_users(users)
|
||
|
|
||
|
return True, "Usuario actualizado correctamente."
|
||
|
|
||
|
def delete_user(username):
|
||
|
"""Eliminar usuario."""
|
||
|
users = load_users()
|
||
|
|
||
|
# Verificar si existe el usuario
|
||
|
if username not in users:
|
||
|
return False, "El usuario no existe."
|
||
|
|
||
|
# Eliminar usuario
|
||
|
del users[username]
|
||
|
|
||
|
# Guardar usuarios
|
||
|
save_users(users)
|
||
|
|
||
|
return True, "Usuario eliminado correctamente."
|
||
|
|
||
|
def load_users():
|
||
|
"""Cargar usuarios desde archivo JSON."""
|
||
|
storage_path = current_app.config['STORAGE_PATH']
|
||
|
users_file = os.path.join(storage_path, 'users', 'users.json')
|
||
|
|
||
|
try:
|
||
|
with open(users_file, 'r', encoding='utf-8') as f:
|
||
|
return json.load(f)
|
||
|
except (FileNotFoundError, json.JSONDecodeError):
|
||
|
return {}
|
||
|
|
||
|
def save_users(users):
|
||
|
"""Guardar usuarios en archivo JSON."""
|
||
|
storage_path = current_app.config['STORAGE_PATH']
|
||
|
users_file = os.path.join(storage_path, 'users', 'users.json')
|
||
|
|
||
|
with open(users_file, 'w', encoding='utf-8') as f:
|
||
|
json.dump(users, f, ensure_ascii=False, indent=2)
|
||
|
|
||
|
def get_all_users():
|
||
|
"""Obtener lista de todos los usuarios."""
|
||
|
users = load_users()
|
||
|
return [User(username, data) for username, data in users.items()]
|
||
|
|
||
|
def initialize_admin_user():
|
||
|
"""Crear usuario administrador si no existe."""
|
||
|
users = load_users()
|
||
|
|
||
|
if 'admin' not in users:
|
||
|
create_user(
|
||
|
username='admin',
|
||
|
nombre='Administrador',
|
||
|
email='admin@example.com',
|
||
|
password='admin123', # Cambiar en producción
|
||
|
nivel=9999,
|
||
|
estado='activo'
|
||
|
)
|
||
|
current_app.logger.info('Usuario administrador creado.')
|