2025-03-03 17:50:11 -03:00
|
|
|
import os
|
|
|
|
import json
|
|
|
|
from flask import (
|
|
|
|
Blueprint,
|
|
|
|
render_template,
|
|
|
|
redirect,
|
|
|
|
url_for,
|
|
|
|
flash,
|
|
|
|
request,
|
|
|
|
jsonify,
|
|
|
|
current_app,
|
|
|
|
)
|
|
|
|
from flask_login import login_required, current_user
|
|
|
|
from flask_wtf import FlaskForm
|
|
|
|
from wtforms import StringField, IntegerField, TextAreaField, SubmitField
|
|
|
|
from wtforms.validators import DataRequired, Length, NumberRange
|
|
|
|
|
|
|
|
from services.index_service import build_search_index, get_index_stats
|
|
|
|
from services.user_service import get_user_stats
|
|
|
|
from utils.security import permission_required
|
|
|
|
from utils.logger import get_audit_logs
|
|
|
|
from utils.file_utils import get_directory_size
|
|
|
|
|
|
|
|
# Definir Blueprint
|
|
|
|
admin_bp = Blueprint("admin", __name__, url_prefix="/admin")
|
|
|
|
|
|
|
|
|
|
|
|
# Formularios
|
|
|
|
class FileTypeForm(FlaskForm):
|
|
|
|
"""Formulario para tipo de archivo."""
|
|
|
|
|
|
|
|
extension = StringField("Extensión", validators=[DataRequired(), Length(1, 10)])
|
|
|
|
descripcion = StringField(
|
|
|
|
"Descripción", validators=[DataRequired(), Length(1, 100)]
|
|
|
|
)
|
|
|
|
mime_type = StringField("MIME Type", validators=[DataRequired(), Length(1, 100)])
|
|
|
|
tamano_maximo = IntegerField(
|
|
|
|
"Tamaño máximo (bytes)", validators=[NumberRange(min=1)], default=10485760
|
|
|
|
)
|
|
|
|
submit = SubmitField("Guardar")
|
|
|
|
|
|
|
|
|
|
|
|
# Rutas
|
|
|
|
@admin_bp.route("/")
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def dashboard():
|
|
|
|
"""Panel de administración."""
|
|
|
|
# Obtener estadísticas
|
|
|
|
user_stats = get_user_stats()
|
|
|
|
index_stats = get_index_stats()
|
|
|
|
|
|
|
|
# Obtener tamaño del almacenamiento
|
|
|
|
storage_path = os.path.join(os.getcwd(), "storage")
|
|
|
|
storage_size = get_directory_size(storage_path) // (1024 * 1024) # Convertir a MB
|
|
|
|
|
|
|
|
return render_template(
|
|
|
|
"admin/dashboard.html",
|
|
|
|
user_stats=user_stats,
|
|
|
|
index_stats=index_stats,
|
|
|
|
storage_size=storage_size,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/filetypes")
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def filetypes():
|
|
|
|
"""Gestión de tipos de archivo."""
|
|
|
|
# Cargar tipos de archivo
|
|
|
|
storage_path = os.path.join(os.getcwd(), "storage")
|
|
|
|
filetypes_file = os.path.join(storage_path, "filetypes", "filetypes.json")
|
|
|
|
|
|
|
|
with open(filetypes_file, "r", encoding="utf-8") as f:
|
|
|
|
filetypes = json.load(f)
|
|
|
|
|
|
|
|
form = FileTypeForm()
|
|
|
|
|
|
|
|
return render_template("admin/filetypes.html", filetypes=filetypes, form=form)
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/filetypes/add", methods=["POST"])
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def add_filetype():
|
|
|
|
"""Añadir nuevo tipo de archivo."""
|
|
|
|
form = FileTypeForm()
|
|
|
|
|
|
|
|
if form.validate_on_submit():
|
|
|
|
# Cargar tipos de archivo actuales
|
|
|
|
storage_path = os.path.join(os.getcwd(), "storage")
|
|
|
|
filetypes_file = os.path.join(storage_path, "filetypes", "filetypes.json")
|
|
|
|
|
|
|
|
with open(filetypes_file, "r", encoding="utf-8") as f:
|
|
|
|
filetypes = json.load(f)
|
|
|
|
|
|
|
|
# Verificar si ya existe
|
|
|
|
extension = form.extension.data.lower()
|
|
|
|
if extension in filetypes:
|
|
|
|
flash(f"El tipo de archivo '{extension}' ya existe.", "danger")
|
|
|
|
return redirect(url_for("admin.filetypes"))
|
|
|
|
|
|
|
|
# Añadir nuevo tipo
|
|
|
|
filetypes[extension] = {
|
|
|
|
"extension": extension,
|
|
|
|
"descripcion": form.descripcion.data,
|
|
|
|
"mime_type": form.mime_type.data,
|
|
|
|
"tamano_maximo": form.tamano_maximo.data,
|
|
|
|
}
|
|
|
|
|
|
|
|
# Guardar cambios
|
|
|
|
with open(filetypes_file, "w", encoding="utf-8") as f:
|
|
|
|
json.dump(filetypes, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
|
|
flash(f"Tipo de archivo '{extension}' añadido correctamente.", "success")
|
|
|
|
else:
|
|
|
|
for field, errors in form.errors.items():
|
|
|
|
for error in errors:
|
|
|
|
flash(f"{getattr(form, field).label.text}: {error}", "danger")
|
|
|
|
|
|
|
|
return redirect(url_for("admin.filetypes"))
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/filetypes/delete/<extension>", methods=["POST"])
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def delete_filetype(extension):
|
|
|
|
"""Eliminar tipo de archivo."""
|
|
|
|
# Cargar tipos de archivo actuales
|
|
|
|
storage_path = os.path.join(os.getcwd(), "storage")
|
|
|
|
filetypes_file = os.path.join(storage_path, "filetypes", "filetypes.json")
|
|
|
|
|
|
|
|
with open(filetypes_file, "r", encoding="utf-8") as f:
|
|
|
|
filetypes = json.load(f)
|
|
|
|
|
|
|
|
# Verificar si existe
|
|
|
|
if extension not in filetypes:
|
|
|
|
flash(f"El tipo de archivo '{extension}' no existe.", "danger")
|
|
|
|
return redirect(url_for("admin.filetypes"))
|
|
|
|
|
|
|
|
# Eliminar tipo
|
|
|
|
del filetypes[extension]
|
|
|
|
|
|
|
|
# Guardar cambios
|
|
|
|
with open(filetypes_file, "w", encoding="utf-8") as f:
|
|
|
|
json.dump(filetypes, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
|
|
flash(f"Tipo de archivo '{extension}' eliminado correctamente.", "success")
|
|
|
|
return redirect(url_for("admin.filetypes"))
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/system")
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def system():
|
|
|
|
"""Estado del sistema."""
|
|
|
|
import sys
|
|
|
|
|
|
|
|
storage_path = current_app.config["STORAGE_PATH"]
|
|
|
|
|
|
|
|
# Define a helper function to get directory size
|
|
|
|
def get_directory_size(path):
|
|
|
|
total_size = 0
|
|
|
|
if os.path.exists(path):
|
|
|
|
for dirpath, dirnames, filenames in os.walk(path):
|
|
|
|
for f in filenames:
|
|
|
|
fp = os.path.join(dirpath, f)
|
|
|
|
# skip if it is symbolic link
|
|
|
|
if not os.path.islink(fp):
|
|
|
|
total_size += os.path.getsize(fp)
|
|
|
|
return total_size
|
|
|
|
|
|
|
|
# Recopilar estadísticas del sistema
|
|
|
|
stats = {
|
|
|
|
"storage_size": get_directory_size(storage_path),
|
|
|
|
"projects_count": (
|
|
|
|
len(os.listdir(os.path.join(storage_path, "projects")))
|
|
|
|
if os.path.exists(os.path.join(storage_path, "projects"))
|
|
|
|
else 0
|
|
|
|
),
|
|
|
|
"log_size": get_directory_size(os.path.join(storage_path, "logs")),
|
|
|
|
"python_version": sys.version,
|
|
|
|
"platform": sys.platform,
|
|
|
|
}
|
|
|
|
|
|
|
|
return render_template("admin/system.html", stats=stats)
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/system/clear_logs", methods=["POST"])
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def clear_logs():
|
|
|
|
"""Limpiar logs del sistema."""
|
|
|
|
storage_path = current_app.config["STORAGE_PATH"]
|
|
|
|
logs_path = os.path.join(storage_path, "logs")
|
|
|
|
|
|
|
|
try:
|
|
|
|
for filename in os.listdir(logs_path):
|
|
|
|
file_path = os.path.join(logs_path, filename)
|
|
|
|
if os.path.isfile(file_path) and not filename.endswith(".log"):
|
|
|
|
os.unlink(file_path)
|
|
|
|
|
|
|
|
flash("Archivos de log rotados eliminados correctamente.", "success")
|
|
|
|
except Exception as e:
|
|
|
|
flash(f"Error al limpiar logs: {str(e)}", "danger")
|
|
|
|
|
|
|
|
return redirect(url_for("admin.system"))
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/initialize")
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def initialize():
|
|
|
|
"""Inicializar o reiniciar el sistema."""
|
|
|
|
from services.filetype_service import initialize_default_filetypes
|
|
|
|
from services.schema_service import initialize_default_schemas
|
|
|
|
|
|
|
|
try:
|
|
|
|
initialize_default_filetypes()
|
|
|
|
initialize_default_schemas()
|
|
|
|
|
|
|
|
flash("Sistema inicializado correctamente.", "success")
|
|
|
|
except Exception as e:
|
|
|
|
flash(f"Error al inicializar el sistema: {str(e)}", "danger")
|
|
|
|
|
|
|
|
return redirect(url_for("admin.system"))
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/system/rebuild-index", methods=["POST"])
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def rebuild_index():
|
|
|
|
"""Reconstruir índice de búsqueda."""
|
|
|
|
success, message = build_search_index()
|
|
|
|
|
|
|
|
if success:
|
|
|
|
flash(message, "success")
|
|
|
|
else:
|
|
|
|
flash(message, "danger")
|
|
|
|
|
|
|
|
return redirect(url_for("admin.system"))
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/audit-logs")
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def audit_logs():
|
|
|
|
"""Ver logs de auditoría."""
|
|
|
|
# Obtener parámetros de filtrado
|
|
|
|
filters = {
|
|
|
|
"user_id": request.args.get("user_id"),
|
|
|
|
"activity_type": request.args.get("activity_type"),
|
|
|
|
"start_date": request.args.get("start_date"),
|
|
|
|
"end_date": request.args.get("end_date"),
|
|
|
|
}
|
|
|
|
|
|
|
|
# Filtrar logs vacíos
|
|
|
|
filters = {k: v for k, v in filters.items() if v}
|
|
|
|
|
|
|
|
# Obtener logs
|
|
|
|
logs = get_audit_logs(filters, limit=100)
|
|
|
|
|
|
|
|
return render_template("admin/audit_logs.html", logs=logs, filters=filters)
|
|
|
|
|
|
|
|
|
|
|
|
@admin_bp.route("/api/system-info")
|
|
|
|
@login_required
|
|
|
|
@permission_required(9000) # Solo administradores
|
|
|
|
def api_system_info():
|
|
|
|
"""API para obtener información del sistema."""
|
|
|
|
# Obtener información del sistema
|
|
|
|
storage_path = os.path.join(os.getcwd(), "storage")
|
|
|
|
|
|
|
|
# Tamaños de directorios
|
|
|
|
storage_info = {
|
|
|
|
"total": get_directory_size(storage_path) // (1024 * 1024), # MB
|
|
|
|
"logs": get_directory_size(os.path.join(storage_path, "logs")) // (1024 * 1024),
|
|
|
|
"projects": get_directory_size(os.path.join(storage_path, "projects"))
|
|
|
|
// (1024 * 1024),
|
|
|
|
}
|
|
|
|
|
|
|
|
# Estadísticas de usuarios
|
|
|
|
user_stats = get_user_stats()
|
|
|
|
|
|
|
|
# Estadísticas de índice
|
|
|
|
index_stats = get_index_stats()
|
|
|
|
|
|
|
|
return jsonify({"storage": storage_info, "users": user_stats, "index": index_stats})
|