import os import json import shutil import zipfile from datetime import datetime import pytz from flask import current_app from werkzeug.utils import secure_filename import mimetypes # Initialize mimetypes mimetypes.init() # Try to import magic library but don't fail if it's not available try: import magic HAS_MAGIC = True except ImportError: HAS_MAGIC = False def ensure_dir_exists(directory): """ Asegurar que un directorio existe, creándolo si es necesario. Args: directory (str): Ruta del directorio """ if not os.path.exists(directory): os.makedirs(directory) def load_json_file(file_path, default=None): """ Load JSON data from a file. Args: file_path (str): Path to the JSON file default: Default value to return if file doesn't exist Returns: dict or list: Loaded JSON data or default value """ if default is None: default = {} if not os.path.exists(file_path): return default try: with open(file_path, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError: current_app.logger.error(f"Error decoding JSON from {file_path}") return default except Exception as e: current_app.logger.error(f"Error loading file {file_path}: {str(e)}") return default def save_json_file(file_path, data): """ Save data to a JSON file. Args: file_path (str): Path to the JSON file data (dict or list): Data to save Returns: bool: True if successful, False otherwise """ try: # Ensure directory exists directory = os.path.dirname(file_path) if not os.path.exists(directory): os.makedirs(directory) with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return True except Exception as e: current_app.logger.error(f"Error saving file {file_path}: {str(e)}") return False def get_next_id(id_type): """ Obtener el siguiente ID disponible para proyectos o documentos. Args: id_type (str): Tipo de ID ('project' o 'document') Returns: int: Siguiente ID disponible """ storage_path = current_app.config["STORAGE_PATH"] indices_file = os.path.join(storage_path, "indices.json") # Cargar índices indices = load_json_file(indices_file, {"max_project_id": 0, "max_document_id": 0}) # Incrementar y guardar if id_type == "project": indices["max_project_id"] += 1 new_id = indices["max_project_id"] elif id_type == "document": indices["max_document_id"] += 1 new_id = indices["max_document_id"] else: raise ValueError(f"Tipo de ID no válido: {id_type}") save_json_file(indices_file, indices) return new_id def format_project_directory_name(project_id, project_name): """ Formatear nombre de directorio para un proyecto. Args: project_id (int): ID del proyecto project_name (str): Nombre del proyecto Returns: str: Nombre de directorio formateado """ # Sanitizar nombre de proyecto safe_name = secure_filename(project_name) safe_name = safe_name.replace(" ", "_") # Formatear como @id_num_@project_name return f"@{project_id:03d}_@{safe_name}" def format_document_directory_name(document_id, document_name): """ Formatear nombre de directorio para un documento. Args: document_id (int): ID del documento document_name (str): Nombre del documento Returns: str: Nombre de directorio formateado """ # Sanitizar nombre de documento safe_name = secure_filename(document_name) safe_name = safe_name.replace(" ", "_") # Formatear como @id_num_@doc_name return f"@{document_id:03d}_@{safe_name}" def format_version_filename(version, document_name, extension): """ Formatear nombre de archivo para una versión de documento. Args: version (int): Número de versión document_name (str): Nombre base del documento extension (str): Extensión del archivo Returns: str: Nombre de archivo formateado """ # Sanitizar nombre safe_name = secure_filename(document_name) safe_name = safe_name.replace(" ", "_") # Asegurar que la extensión no tiene el punto if extension.startswith("."): extension = extension[1:] # Formatear como v001_doc_name.ext return f"v{version:03d}_{safe_name}.{extension}" def create_zip_archive(source_dir, files_to_include, output_path): """ Crear un archivo ZIP con los documentos seleccionados. Args: source_dir (str): Directorio fuente files_to_include (list): Lista de archivos a incluir output_path (str): Ruta de salida para el ZIP Returns: str: Ruta al archivo ZIP creado """ with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf: for file_info in files_to_include: zipf.write( os.path.join(source_dir, file_info["path"]), arcname=file_info["arcname"], ) return output_path def get_directory_size(directory): """ Calcular el tamaño total de un directorio en bytes. Args: directory (str): Ruta al directorio Returns: int: Tamaño en bytes """ total_size = 0 for dirpath, dirnames, filenames in os.walk(directory): for filename in filenames: file_path = os.path.join(dirpath, filename) total_size += os.path.getsize(file_path) return total_size def get_file_info(file_path): """ Obtener información básica sobre un archivo. Args: file_path (str): Ruta al archivo Returns: dict: Información del archivo """ stat_info = os.stat(file_path) return { "filename": os.path.basename(file_path), "size": stat_info.st_size, "created": datetime.fromtimestamp(stat_info.st_ctime, pytz.UTC).isoformat(), "modified": datetime.fromtimestamp(stat_info.st_mtime, pytz.UTC).isoformat(), "path": file_path, } def delete_directory_with_content(directory): """ Eliminar un directorio y todo su contenido. Args: directory (str): Ruta al directorio a eliminar """ if os.path.exists(directory): shutil.rmtree(directory) def detect_file_type(file_path=None, file_data=None): """ Detect file type using either libmagic (if available) or file extension. Args: file_path: Path to the file file_data: Binary data of the file (used with magic) Returns: tuple: (mime_type, encoding) """ if HAS_MAGIC: # Use libmagic for more accurate file type detection try: mime = magic.Magic(mime=True) if file_data: mime_type = mime.from_buffer(file_data) return mime_type, None elif file_path and os.path.exists(file_path): mime_type = mime.from_file(file_path) return mime_type, None except Exception as e: current_app.logger.error(f"Error using magic library: {str(e)}") # Fall back to extension-based detection # Fallback method using file extensions if file_path: mime_type, encoding = mimetypes.guess_type(file_path) return mime_type or "application/octet-stream", encoding # If no path or data, return a default return "application/octet-stream", None def is_allowed_file(filename, allowed_extensions): """Check if a file has an allowed extension""" if "." not in filename: return False ext = filename.rsplit(".", 1)[1].lower() return ext in allowed_extensions