import os import shutil import tarfile import zipfile import json import threading import schedule import time from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Optional from app.config.database import db class BackupService: """Service for managing system backups.""" def __init__(self, config=None): self.config = config or self._get_default_config() self.backup_path = Path(self.config.get("backup_path", "./backup")) self.data_path = Path(self.config.get("data_path", "./data")) self.backup_path.mkdir(exist_ok=True) self._setup_scheduler() def _get_default_config(self) -> Dict: """Get default backup configuration.""" return { "enabled": True, "schedule_time": "02:00", "retention_days": 30, "compression": "gzip", "backup_path": "./backup", "data_path": "./data", "exclude_patterns": ["*.tmp", "*.log", "__pycache__"], "max_backup_size_gb": 10, } def _setup_scheduler(self): """Setup automatic backup scheduler.""" if self.config.get("enabled", True): schedule_time = self.config.get("schedule_time", "02:00") schedule.every().day.at(schedule_time).do(self._scheduled_backup) # Start scheduler in background thread scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) scheduler_thread.start() def _run_scheduler(self): """Run the backup scheduler.""" while True: schedule.run_pending() time.sleep(60) # Check every minute def _scheduled_backup(self): """Run scheduled backup.""" try: backup_result = self.create_daily_backup() print(f"Scheduled backup completed: {backup_result['backup_file']}") self._cleanup_old_backups() except Exception as e: print(f"Scheduled backup failed: {e}") def create_daily_backup(self) -> Dict: """Create compressed backup of entire data directory.""" if not self.data_path.exists(): raise FileNotFoundError(f"Data directory not found: {self.data_path}") # Create backup filename with current date and time now = datetime.now() backup_date = now.strftime("%Y-%m-%d") backup_time = now.strftime("%H%M%S") backup_dir = self.backup_path / backup_date backup_dir.mkdir(exist_ok=True) compression = self.config.get("compression", "gzip") if compression == "gzip": backup_filename = f"data_backup_{backup_date}_{backup_time}.tar.gz" backup_file = backup_dir / backup_filename elif compression == "zip": backup_filename = f"data_backup_{backup_date}_{backup_time}.zip" backup_file = backup_dir / backup_filename else: backup_filename = f"data_backup_{backup_date}_{backup_time}.tar" backup_file = backup_dir / backup_filename # Create backup try: if compression == "gzip": self._create_tar_backup(backup_file, compression="gz") elif compression == "zip": self._create_zip_backup(backup_file) else: self._create_tar_backup(backup_file) # Get backup stats backup_size = backup_file.stat().st_size # Create backup log log_info = { "backup_date": backup_date, "backup_time": backup_time, "backup_file": str(backup_file), "backup_size": backup_size, "compression": compression, "data_path": str(self.data_path), "status": "success", "created_at": now.isoformat(), } self._save_backup_log(backup_dir, log_info) return log_info except Exception as e: error_log = { "backup_date": backup_date, "backup_time": backup_time, "error": str(e), "status": "failed", "created_at": now.isoformat(), } self._save_backup_log(backup_dir, error_log) raise def _create_tar_backup(self, backup_file: Path, compression: str = None): """Create tar backup with optional compression.""" mode = "w" if compression == "gz": mode = "w:gz" elif compression == "bz2": mode = "w:bz2" exclude_patterns = self.config.get("exclude_patterns", []) with tarfile.open(backup_file, mode) as tar: for item in self.data_path.iterdir(): if not self._should_exclude(item.name, exclude_patterns): tar.add(item, arcname=item.name) def _create_zip_backup(self, backup_file: Path): """Create zip backup.""" exclude_patterns = self.config.get("exclude_patterns", []) with zipfile.ZipFile(backup_file, "w", zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(self.data_path): # Filter out excluded directories dirs[:] = [ d for d in dirs if not self._should_exclude(d, exclude_patterns) ] for file in files: if not self._should_exclude(file, exclude_patterns): file_path = Path(root) / file arcname = file_path.relative_to(self.data_path) zipf.write(file_path, arcname) def _should_exclude(self, filename: str, patterns: List[str]) -> bool: """Check if file should be excluded based on patterns.""" import fnmatch for pattern in patterns: if fnmatch.fnmatch(filename, pattern): return True return False def _save_backup_log(self, backup_dir: Path, log_info: Dict): """Save backup log information.""" log_file = backup_dir / f"backup_{log_info['backup_date']}.log" with open(log_file, "w") as f: json.dump(log_info, f, indent=2) def list_available_backups(self) -> List[Dict]: """List all available backups.""" backups = [] if not self.backup_path.exists(): return backups for backup_date_dir in self.backup_path.iterdir(): if backup_date_dir.is_dir(): backup_date = backup_date_dir.name # Find backup files in this date directory backup_files = ( list(backup_date_dir.glob("data_backup_*.tar.gz")) + list(backup_date_dir.glob("data_backup_*.zip")) + list(backup_date_dir.glob("data_backup_*.tar")) ) # Find log file log_files = list(backup_date_dir.glob("backup_*.log")) for backup_file in backup_files: backup_info = { "backup_date": backup_date, "backup_file": str(backup_file), "backup_size": backup_file.stat().st_size, "created_at": datetime.fromtimestamp( backup_file.stat().st_mtime ).isoformat(), } # Load additional info from log if available if log_files: try: with open(log_files[0], "r") as f: log_data = json.load(f) backup_info.update(log_data) except Exception: pass backups.append(backup_info) # Sort by date (newest first) backups.sort(key=lambda x: x["created_at"], reverse=True) return backups def delete_backup(self, backup_date: str) -> bool: """Delete specific backup by date.""" backup_dir = self.backup_path / backup_date if backup_dir.exists() and backup_dir.is_dir(): try: shutil.rmtree(backup_dir) return True except Exception as e: print(f"Error deleting backup {backup_date}: {e}") return False return False def _cleanup_old_backups(self): """Clean up old backups based on retention policy.""" retention_days = self.config.get("retention_days", 30) cutoff_date = datetime.now() - timedelta(days=retention_days) if not self.backup_path.exists(): return for backup_date_dir in self.backup_path.iterdir(): if backup_date_dir.is_dir(): try: # Parse backup date from directory name (YYYY-MM-DD) backup_date = datetime.strptime(backup_date_dir.name, "%Y-%m-%d") if backup_date < cutoff_date: print(f"Cleaning up old backup: {backup_date_dir.name}") shutil.rmtree(backup_date_dir) except ValueError: # Skip directories that don't match date format continue except Exception as e: print(f"Error cleaning up backup {backup_date_dir.name}: {e}") def get_backup_status(self) -> Dict: """Get current backup service status.""" backups = self.list_available_backups() # Calculate total backup size total_size = sum(backup.get("backup_size", 0) for backup in backups) # Get last backup info last_backup = backups[0] if backups else None # Calculate next scheduled backup next_backup = None if self.config.get("enabled", True): schedule_time = self.config.get("schedule_time", "02:00") today = datetime.now().date() next_backup_time = datetime.combine( today, datetime.strptime(schedule_time, "%H:%M").time() ) # If today's backup time has passed, next backup is tomorrow if datetime.now() > next_backup_time: next_backup_time += timedelta(days=1) next_backup = next_backup_time.isoformat() return { "enabled": self.config.get("enabled", True), "total_backups": len(backups), "total_size": total_size, "last_backup": last_backup, "next_scheduled_backup": next_backup, "retention_days": self.config.get("retention_days", 30), "backup_path": str(self.backup_path), "data_path": str(self.data_path), } def restore_from_backup( self, backup_date: str, target_path: Optional[str] = None ) -> bool: """Restore data from specific backup (Admin only operation).""" backup_dir = self.backup_path / backup_date if not backup_dir.exists(): raise FileNotFoundError(f"Backup directory not found: {backup_dir}") # Find backup file backup_files = ( list(backup_dir.glob("data_backup_*.tar.gz")) + list(backup_dir.glob("data_backup_*.zip")) + list(backup_dir.glob("data_backup_*.tar")) ) if not backup_files: raise FileNotFoundError(f"No backup files found in {backup_dir}") backup_file = backup_files[0] # Use the first (should be only one) restore_path = Path(target_path) if target_path else self.data_path # Create restore directory if it doesn't exist restore_path.mkdir(parents=True, exist_ok=True) try: if backup_file.suffix == ".gz": with tarfile.open(backup_file, "r:gz") as tar: tar.extractall(path=restore_path) elif backup_file.suffix == ".zip": with zipfile.ZipFile(backup_file, "r") as zipf: zipf.extractall(path=restore_path) else: with tarfile.open(backup_file, "r") as tar: tar.extractall(path=restore_path) return True except Exception as e: print(f"Error restoring backup: {e}") return False def create_manual_backup(self, description: str = "Manual backup") -> Dict: """Create immediate manual backup.""" backup_result = self.create_daily_backup() backup_result["description"] = description backup_result["type"] = "manual" return backup_result