294 lines
7.7 KiB
Python
294 lines
7.7 KiB
Python
import os
|
|
import json
|
|
import shutil
|
|
import zipfile
|
|
from datetime import datetime
|
|
import pytz
|
|
from flask import current_app
|
|
from werkzeug.utils import secure_filename
|
|
import mimetypes
|
|
|
|
# Initialize mimetypes
|
|
mimetypes.init()
|
|
|
|
# Try to import magic library but don't fail if it's not available
|
|
try:
|
|
import magic
|
|
|
|
HAS_MAGIC = True
|
|
except ImportError:
|
|
HAS_MAGIC = False
|
|
|
|
|
|
def ensure_dir_exists(directory):
|
|
"""
|
|
Asegurar que un directorio existe, creándolo si es necesario.
|
|
|
|
Args:
|
|
directory (str): Ruta del directorio
|
|
"""
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
|
|
def load_json_file(file_path, default=None):
|
|
"""
|
|
Load JSON data from a file.
|
|
|
|
Args:
|
|
file_path (str): Path to the JSON file
|
|
default: Default value to return if file doesn't exist
|
|
|
|
Returns:
|
|
dict or list: Loaded JSON data or default value
|
|
"""
|
|
if default is None:
|
|
default = {}
|
|
|
|
if not os.path.exists(file_path):
|
|
return default
|
|
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError:
|
|
current_app.logger.error(f"Error decoding JSON from {file_path}")
|
|
return default
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error loading file {file_path}: {str(e)}")
|
|
return default
|
|
|
|
|
|
def save_json_file(file_path, data):
|
|
"""
|
|
Save data to a JSON file.
|
|
|
|
Args:
|
|
file_path (str): Path to the JSON file
|
|
data (dict or list): Data to save
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
# Ensure directory exists
|
|
directory = os.path.dirname(file_path)
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error saving file {file_path}: {str(e)}")
|
|
return False
|
|
|
|
|
|
def get_next_id(id_type):
|
|
"""
|
|
Obtener el siguiente ID disponible para proyectos o documentos.
|
|
|
|
Args:
|
|
id_type (str): Tipo de ID ('project' o 'document')
|
|
|
|
Returns:
|
|
int: Siguiente ID disponible
|
|
"""
|
|
storage_path = current_app.config["STORAGE_PATH"]
|
|
indices_file = os.path.join(storage_path, "indices.json")
|
|
|
|
# Cargar índices
|
|
indices = load_json_file(indices_file, {"max_project_id": 0, "max_document_id": 0})
|
|
|
|
# Incrementar y guardar
|
|
if id_type == "project":
|
|
indices["max_project_id"] += 1
|
|
new_id = indices["max_project_id"]
|
|
elif id_type == "document":
|
|
indices["max_document_id"] += 1
|
|
new_id = indices["max_document_id"]
|
|
else:
|
|
raise ValueError(f"Tipo de ID no válido: {id_type}")
|
|
|
|
save_json_file(indices_file, indices)
|
|
|
|
return new_id
|
|
|
|
|
|
def format_project_directory_name(project_id, project_name):
|
|
"""
|
|
Formatear nombre de directorio para un proyecto.
|
|
|
|
Args:
|
|
project_id (int): ID del proyecto
|
|
project_name (str): Nombre del proyecto
|
|
|
|
Returns:
|
|
str: Nombre de directorio formateado
|
|
"""
|
|
# Sanitizar nombre de proyecto
|
|
safe_name = secure_filename(project_name)
|
|
safe_name = safe_name.replace(" ", "_")
|
|
|
|
# Formatear como @id_num_@project_name
|
|
return f"@{project_id:03d}_@{safe_name}"
|
|
|
|
|
|
def format_document_directory_name(document_id, document_name):
|
|
"""
|
|
Formatear nombre de directorio para un documento.
|
|
|
|
Args:
|
|
document_id (int): ID del documento
|
|
document_name (str): Nombre del documento
|
|
|
|
Returns:
|
|
str: Nombre de directorio formateado
|
|
"""
|
|
# Sanitizar nombre de documento
|
|
safe_name = secure_filename(document_name)
|
|
safe_name = safe_name.replace(" ", "_")
|
|
|
|
# Formatear como @id_num_@doc_name
|
|
return f"@{document_id:03d}_@{safe_name}"
|
|
|
|
|
|
def format_version_filename(version, document_name, extension):
|
|
"""
|
|
Formatear nombre de archivo para una versión de documento.
|
|
|
|
Args:
|
|
version (int): Número de versión
|
|
document_name (str): Nombre base del documento
|
|
extension (str): Extensión del archivo
|
|
|
|
Returns:
|
|
str: Nombre de archivo formateado
|
|
"""
|
|
# Sanitizar nombre
|
|
safe_name = secure_filename(document_name)
|
|
safe_name = safe_name.replace(" ", "_")
|
|
|
|
# Asegurar que la extensión no tiene el punto
|
|
if extension.startswith("."):
|
|
extension = extension[1:]
|
|
|
|
# Formatear como v001_doc_name.ext
|
|
return f"v{version:03d}_{safe_name}.{extension}"
|
|
|
|
|
|
def create_zip_archive(source_dir, files_to_include, output_path):
|
|
"""
|
|
Crear un archivo ZIP con los documentos seleccionados.
|
|
|
|
Args:
|
|
source_dir (str): Directorio fuente
|
|
files_to_include (list): Lista de archivos a incluir
|
|
output_path (str): Ruta de salida para el ZIP
|
|
|
|
Returns:
|
|
str: Ruta al archivo ZIP creado
|
|
"""
|
|
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
|
|
for file_info in files_to_include:
|
|
zipf.write(
|
|
os.path.join(source_dir, file_info["path"]),
|
|
arcname=file_info["arcname"],
|
|
)
|
|
|
|
return output_path
|
|
|
|
|
|
def get_directory_size(directory):
|
|
"""
|
|
Calcular el tamaño total de un directorio en bytes.
|
|
|
|
Args:
|
|
directory (str): Ruta al directorio
|
|
|
|
Returns:
|
|
int: Tamaño en bytes
|
|
"""
|
|
total_size = 0
|
|
|
|
for dirpath, dirnames, filenames in os.walk(directory):
|
|
for filename in filenames:
|
|
file_path = os.path.join(dirpath, filename)
|
|
total_size += os.path.getsize(file_path)
|
|
|
|
return total_size
|
|
|
|
|
|
def get_file_info(file_path):
|
|
"""
|
|
Obtener información básica sobre un archivo.
|
|
|
|
Args:
|
|
file_path (str): Ruta al archivo
|
|
|
|
Returns:
|
|
dict: Información del archivo
|
|
"""
|
|
stat_info = os.stat(file_path)
|
|
|
|
return {
|
|
"filename": os.path.basename(file_path),
|
|
"size": stat_info.st_size,
|
|
"created": datetime.fromtimestamp(stat_info.st_ctime, pytz.UTC).isoformat(),
|
|
"modified": datetime.fromtimestamp(stat_info.st_mtime, pytz.UTC).isoformat(),
|
|
"path": file_path,
|
|
}
|
|
|
|
|
|
def delete_directory_with_content(directory):
|
|
"""
|
|
Eliminar un directorio y todo su contenido.
|
|
|
|
Args:
|
|
directory (str): Ruta al directorio a eliminar
|
|
"""
|
|
if os.path.exists(directory):
|
|
shutil.rmtree(directory)
|
|
|
|
|
|
def detect_file_type(file_path=None, file_data=None):
|
|
"""
|
|
Detect file type using either libmagic (if available) or file extension.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
file_data: Binary data of the file (used with magic)
|
|
|
|
Returns:
|
|
tuple: (mime_type, encoding)
|
|
"""
|
|
if HAS_MAGIC:
|
|
# Use libmagic for more accurate file type detection
|
|
try:
|
|
mime = magic.Magic(mime=True)
|
|
if file_data:
|
|
mime_type = mime.from_buffer(file_data)
|
|
return mime_type, None
|
|
elif file_path and os.path.exists(file_path):
|
|
mime_type = mime.from_file(file_path)
|
|
return mime_type, None
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error using magic library: {str(e)}")
|
|
# Fall back to extension-based detection
|
|
|
|
# Fallback method using file extensions
|
|
if file_path:
|
|
mime_type, encoding = mimetypes.guess_type(file_path)
|
|
return mime_type or "application/octet-stream", encoding
|
|
|
|
# If no path or data, return a default
|
|
return "application/octet-stream", None
|
|
|
|
|
|
def is_allowed_file(filename, allowed_extensions):
|
|
"""Check if a file has an allowed extension"""
|
|
if "." not in filename:
|
|
return False
|
|
ext = filename.rsplit(".", 1)[1].lower()
|
|
return ext in allowed_extensions
|