Arch/utils/file_utils.py

294 lines
7.7 KiB
Python

import os
import json
import shutil
import zipfile
from datetime import datetime
import pytz
from flask import current_app
from werkzeug.utils import secure_filename
import mimetypes
# Initialize mimetypes
mimetypes.init()
# Try to import magic library but don't fail if it's not available
try:
import magic
HAS_MAGIC = True
except ImportError:
HAS_MAGIC = False
def ensure_dir_exists(directory):
"""
Asegurar que un directorio existe, creándolo si es necesario.
Args:
directory (str): Ruta del directorio
"""
if not os.path.exists(directory):
os.makedirs(directory)
def load_json_file(file_path, default=None):
"""
Load JSON data from a file.
Args:
file_path (str): Path to the JSON file
default: Default value to return if file doesn't exist
Returns:
dict or list: Loaded JSON data or default value
"""
if default is None:
default = {}
if not os.path.exists(file_path):
return default
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
current_app.logger.error(f"Error decoding JSON from {file_path}")
return default
except Exception as e:
current_app.logger.error(f"Error loading file {file_path}: {str(e)}")
return default
def save_json_file(file_path, data):
"""
Save data to a JSON file.
Args:
file_path (str): Path to the JSON file
data (dict or list): Data to save
Returns:
bool: True if successful, False otherwise
"""
try:
# Ensure directory exists
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
current_app.logger.error(f"Error saving file {file_path}: {str(e)}")
return False
def get_next_id(id_type):
"""
Obtener el siguiente ID disponible para proyectos o documentos.
Args:
id_type (str): Tipo de ID ('project' o 'document')
Returns:
int: Siguiente ID disponible
"""
storage_path = current_app.config["STORAGE_PATH"]
indices_file = os.path.join(storage_path, "indices.json")
# Cargar índices
indices = load_json_file(indices_file, {"max_project_id": 0, "max_document_id": 0})
# Incrementar y guardar
if id_type == "project":
indices["max_project_id"] += 1
new_id = indices["max_project_id"]
elif id_type == "document":
indices["max_document_id"] += 1
new_id = indices["max_document_id"]
else:
raise ValueError(f"Tipo de ID no válido: {id_type}")
save_json_file(indices_file, indices)
return new_id
def format_project_directory_name(project_id, project_name):
"""
Formatear nombre de directorio para un proyecto.
Args:
project_id (int): ID del proyecto
project_name (str): Nombre del proyecto
Returns:
str: Nombre de directorio formateado
"""
# Sanitizar nombre de proyecto
safe_name = secure_filename(project_name)
safe_name = safe_name.replace(" ", "_")
# Formatear como @id_num_@project_name
return f"@{project_id:03d}_@{safe_name}"
def format_document_directory_name(document_id, document_name):
"""
Formatear nombre de directorio para un documento.
Args:
document_id (int): ID del documento
document_name (str): Nombre del documento
Returns:
str: Nombre de directorio formateado
"""
# Sanitizar nombre de documento
safe_name = secure_filename(document_name)
safe_name = safe_name.replace(" ", "_")
# Formatear como @id_num_@doc_name
return f"@{document_id:03d}_@{safe_name}"
def format_version_filename(version, document_name, extension):
"""
Formatear nombre de archivo para una versión de documento.
Args:
version (int): Número de versión
document_name (str): Nombre base del documento
extension (str): Extensión del archivo
Returns:
str: Nombre de archivo formateado
"""
# Sanitizar nombre
safe_name = secure_filename(document_name)
safe_name = safe_name.replace(" ", "_")
# Asegurar que la extensión no tiene el punto
if extension.startswith("."):
extension = extension[1:]
# Formatear como v001_doc_name.ext
return f"v{version:03d}_{safe_name}.{extension}"
def create_zip_archive(source_dir, files_to_include, output_path):
"""
Crear un archivo ZIP con los documentos seleccionados.
Args:
source_dir (str): Directorio fuente
files_to_include (list): Lista de archivos a incluir
output_path (str): Ruta de salida para el ZIP
Returns:
str: Ruta al archivo ZIP creado
"""
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_info in files_to_include:
zipf.write(
os.path.join(source_dir, file_info["path"]),
arcname=file_info["arcname"],
)
return output_path
def get_directory_size(directory):
"""
Calcular el tamaño total de un directorio en bytes.
Args:
directory (str): Ruta al directorio
Returns:
int: Tamaño en bytes
"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(directory):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
total_size += os.path.getsize(file_path)
return total_size
def get_file_info(file_path):
"""
Obtener información básica sobre un archivo.
Args:
file_path (str): Ruta al archivo
Returns:
dict: Información del archivo
"""
stat_info = os.stat(file_path)
return {
"filename": os.path.basename(file_path),
"size": stat_info.st_size,
"created": datetime.fromtimestamp(stat_info.st_ctime, pytz.UTC).isoformat(),
"modified": datetime.fromtimestamp(stat_info.st_mtime, pytz.UTC).isoformat(),
"path": file_path,
}
def delete_directory_with_content(directory):
"""
Eliminar un directorio y todo su contenido.
Args:
directory (str): Ruta al directorio a eliminar
"""
if os.path.exists(directory):
shutil.rmtree(directory)
def detect_file_type(file_path=None, file_data=None):
"""
Detect file type using either libmagic (if available) or file extension.
Args:
file_path: Path to the file
file_data: Binary data of the file (used with magic)
Returns:
tuple: (mime_type, encoding)
"""
if HAS_MAGIC:
# Use libmagic for more accurate file type detection
try:
mime = magic.Magic(mime=True)
if file_data:
mime_type = mime.from_buffer(file_data)
return mime_type, None
elif file_path and os.path.exists(file_path):
mime_type = mime.from_file(file_path)
return mime_type, None
except Exception as e:
current_app.logger.error(f"Error using magic library: {str(e)}")
# Fall back to extension-based detection
# Fallback method using file extensions
if file_path:
mime_type, encoding = mimetypes.guess_type(file_path)
return mime_type or "application/octet-stream", encoding
# If no path or data, return a default
return "application/octet-stream", None
def is_allowed_file(filename, allowed_extensions):
"""Check if a file has an allowed extension"""
if "." not in filename:
return False
ext = filename.rsplit(".", 1)[1].lower()
return ext in allowed_extensions