SIDEL_ScriptsManager/app/services/backup_service.py

341 lines
13 KiB
Python

import os
import shutil
import tarfile
import zipfile
import json
import threading
import schedule
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional
from app.config.database import db
class BackupService:
"""Service for managing system backups."""
def __init__(self, config=None):
self.config = config or self._get_default_config()
self.backup_path = Path(self.config.get("backup_path", "./backup"))
self.data_path = Path(self.config.get("data_path", "./data"))
self.backup_path.mkdir(exist_ok=True)
self._setup_scheduler()
def _get_default_config(self) -> Dict:
"""Get default backup configuration."""
return {
"enabled": True,
"schedule_time": "02:00",
"retention_days": 30,
"compression": "gzip",
"backup_path": "./backup",
"data_path": "./data",
"exclude_patterns": ["*.tmp", "*.log", "__pycache__"],
"max_backup_size_gb": 10,
}
def _setup_scheduler(self):
"""Setup automatic backup scheduler."""
if self.config.get("enabled", True):
schedule_time = self.config.get("schedule_time", "02:00")
schedule.every().day.at(schedule_time).do(self._scheduled_backup)
# Start scheduler in background thread
scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
scheduler_thread.start()
def _run_scheduler(self):
"""Run the backup scheduler."""
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
def _scheduled_backup(self):
"""Run scheduled backup."""
try:
backup_result = self.create_daily_backup()
print(f"Scheduled backup completed: {backup_result['backup_file']}")
self._cleanup_old_backups()
except Exception as e:
print(f"Scheduled backup failed: {e}")
def create_daily_backup(self) -> Dict:
"""Create compressed backup of entire data directory."""
if not self.data_path.exists():
raise FileNotFoundError(f"Data directory not found: {self.data_path}")
# Create backup filename with current date and time
now = datetime.now()
backup_date = now.strftime("%Y-%m-%d")
backup_time = now.strftime("%H%M%S")
backup_dir = self.backup_path / backup_date
backup_dir.mkdir(exist_ok=True)
compression = self.config.get("compression", "gzip")
if compression == "gzip":
backup_filename = f"data_backup_{backup_date}_{backup_time}.tar.gz"
backup_file = backup_dir / backup_filename
elif compression == "zip":
backup_filename = f"data_backup_{backup_date}_{backup_time}.zip"
backup_file = backup_dir / backup_filename
else:
backup_filename = f"data_backup_{backup_date}_{backup_time}.tar"
backup_file = backup_dir / backup_filename
# Create backup
try:
if compression == "gzip":
self._create_tar_backup(backup_file, compression="gz")
elif compression == "zip":
self._create_zip_backup(backup_file)
else:
self._create_tar_backup(backup_file)
# Get backup stats
backup_size = backup_file.stat().st_size
# Create backup log
log_info = {
"backup_date": backup_date,
"backup_time": backup_time,
"backup_file": str(backup_file),
"backup_size": backup_size,
"compression": compression,
"data_path": str(self.data_path),
"status": "success",
"created_at": now.isoformat(),
}
self._save_backup_log(backup_dir, log_info)
return log_info
except Exception as e:
error_log = {
"backup_date": backup_date,
"backup_time": backup_time,
"error": str(e),
"status": "failed",
"created_at": now.isoformat(),
}
self._save_backup_log(backup_dir, error_log)
raise
def _create_tar_backup(self, backup_file: Path, compression: str = None):
"""Create tar backup with optional compression."""
mode = "w"
if compression == "gz":
mode = "w:gz"
elif compression == "bz2":
mode = "w:bz2"
exclude_patterns = self.config.get("exclude_patterns", [])
with tarfile.open(backup_file, mode) as tar:
for item in self.data_path.iterdir():
if not self._should_exclude(item.name, exclude_patterns):
tar.add(item, arcname=item.name)
def _create_zip_backup(self, backup_file: Path):
"""Create zip backup."""
exclude_patterns = self.config.get("exclude_patterns", [])
with zipfile.ZipFile(backup_file, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(self.data_path):
# Filter out excluded directories
dirs[:] = [
d for d in dirs if not self._should_exclude(d, exclude_patterns)
]
for file in files:
if not self._should_exclude(file, exclude_patterns):
file_path = Path(root) / file
arcname = file_path.relative_to(self.data_path)
zipf.write(file_path, arcname)
def _should_exclude(self, filename: str, patterns: List[str]) -> bool:
"""Check if file should be excluded based on patterns."""
import fnmatch
for pattern in patterns:
if fnmatch.fnmatch(filename, pattern):
return True
return False
def _save_backup_log(self, backup_dir: Path, log_info: Dict):
"""Save backup log information."""
log_file = backup_dir / f"backup_{log_info['backup_date']}.log"
with open(log_file, "w") as f:
json.dump(log_info, f, indent=2)
def list_available_backups(self) -> List[Dict]:
"""List all available backups."""
backups = []
if not self.backup_path.exists():
return backups
for backup_date_dir in self.backup_path.iterdir():
if backup_date_dir.is_dir():
backup_date = backup_date_dir.name
# Find backup files in this date directory
backup_files = (
list(backup_date_dir.glob("data_backup_*.tar.gz"))
+ list(backup_date_dir.glob("data_backup_*.zip"))
+ list(backup_date_dir.glob("data_backup_*.tar"))
)
# Find log file
log_files = list(backup_date_dir.glob("backup_*.log"))
for backup_file in backup_files:
backup_info = {
"backup_date": backup_date,
"backup_file": str(backup_file),
"backup_size": backup_file.stat().st_size,
"created_at": datetime.fromtimestamp(
backup_file.stat().st_mtime
).isoformat(),
}
# Load additional info from log if available
if log_files:
try:
with open(log_files[0], "r") as f:
log_data = json.load(f)
backup_info.update(log_data)
except Exception:
pass
backups.append(backup_info)
# Sort by date (newest first)
backups.sort(key=lambda x: x["created_at"], reverse=True)
return backups
def delete_backup(self, backup_date: str) -> bool:
"""Delete specific backup by date."""
backup_dir = self.backup_path / backup_date
if backup_dir.exists() and backup_dir.is_dir():
try:
shutil.rmtree(backup_dir)
return True
except Exception as e:
print(f"Error deleting backup {backup_date}: {e}")
return False
return False
def _cleanup_old_backups(self):
"""Clean up old backups based on retention policy."""
retention_days = self.config.get("retention_days", 30)
cutoff_date = datetime.now() - timedelta(days=retention_days)
if not self.backup_path.exists():
return
for backup_date_dir in self.backup_path.iterdir():
if backup_date_dir.is_dir():
try:
# Parse backup date from directory name (YYYY-MM-DD)
backup_date = datetime.strptime(backup_date_dir.name, "%Y-%m-%d")
if backup_date < cutoff_date:
print(f"Cleaning up old backup: {backup_date_dir.name}")
shutil.rmtree(backup_date_dir)
except ValueError:
# Skip directories that don't match date format
continue
except Exception as e:
print(f"Error cleaning up backup {backup_date_dir.name}: {e}")
def get_backup_status(self) -> Dict:
"""Get current backup service status."""
backups = self.list_available_backups()
# Calculate total backup size
total_size = sum(backup.get("backup_size", 0) for backup in backups)
# Get last backup info
last_backup = backups[0] if backups else None
# Calculate next scheduled backup
next_backup = None
if self.config.get("enabled", True):
schedule_time = self.config.get("schedule_time", "02:00")
today = datetime.now().date()
next_backup_time = datetime.combine(
today, datetime.strptime(schedule_time, "%H:%M").time()
)
# If today's backup time has passed, next backup is tomorrow
if datetime.now() > next_backup_time:
next_backup_time += timedelta(days=1)
next_backup = next_backup_time.isoformat()
return {
"enabled": self.config.get("enabled", True),
"total_backups": len(backups),
"total_size": total_size,
"last_backup": last_backup,
"next_scheduled_backup": next_backup,
"retention_days": self.config.get("retention_days", 30),
"backup_path": str(self.backup_path),
"data_path": str(self.data_path),
}
def restore_from_backup(
self, backup_date: str, target_path: Optional[str] = None
) -> bool:
"""Restore data from specific backup (Admin only operation)."""
backup_dir = self.backup_path / backup_date
if not backup_dir.exists():
raise FileNotFoundError(f"Backup directory not found: {backup_dir}")
# Find backup file
backup_files = (
list(backup_dir.glob("data_backup_*.tar.gz"))
+ list(backup_dir.glob("data_backup_*.zip"))
+ list(backup_dir.glob("data_backup_*.tar"))
)
if not backup_files:
raise FileNotFoundError(f"No backup files found in {backup_dir}")
backup_file = backup_files[0] # Use the first (should be only one)
restore_path = Path(target_path) if target_path else self.data_path
# Create restore directory if it doesn't exist
restore_path.mkdir(parents=True, exist_ok=True)
try:
if backup_file.suffix == ".gz":
with tarfile.open(backup_file, "r:gz") as tar:
tar.extractall(path=restore_path)
elif backup_file.suffix == ".zip":
with zipfile.ZipFile(backup_file, "r") as zipf:
zipf.extractall(path=restore_path)
else:
with tarfile.open(backup_file, "r") as tar:
tar.extractall(path=restore_path)
return True
except Exception as e:
print(f"Error restoring backup: {e}")
return False
def create_manual_backup(self, description: str = "Manual backup") -> Dict:
"""Create immediate manual backup."""
backup_result = self.create_daily_backup()
backup_result["description"] = description
backup_result["type"] = "manual"
return backup_result