SIDEL_ScriptsManager/app/services/data_manager.py

290 lines
9.9 KiB
Python

import os
import json
import shutil
from pathlib import Path
from typing import Dict, Optional, List
from datetime import datetime
from app.models import UserProject, User, ScriptGroup
from app.config.database import db
class DataManager:
"""Service for managing user data and projects."""
def __init__(self, base_data_path: str = "data"):
self.base_path = Path(base_data_path)
self.base_path.mkdir(exist_ok=True)
def get_user_project_path(
self, user_id: int, group_id: int, project_name: str
) -> Path:
"""Get path to user project data directory."""
return (
self.base_path
/ "script_groups"
/ f"group_{group_id}"
/ f"user_{user_id}"
/ project_name
)
def ensure_project_directory(self, user_id: int, group_id: int, project_name: str):
"""Create project directory structure if it doesn't exist."""
project_path = self.get_user_project_path(user_id, group_id, project_name)
project_path.mkdir(parents=True, exist_ok=True)
# Create default configuration files
config_file = project_path / "config.json"
if not config_file.exists():
default_config = {
"created_at": datetime.utcnow().isoformat(),
"last_modified": datetime.utcnow().isoformat(),
"project_settings": {},
"user_preferences": {},
}
self.save_config_file(
user_id, group_id, project_name, "config.json", default_config
)
def get_config_file(
self, user_id: int, group_id: int, project_name: str, filename: str
) -> Dict:
"""Load JSON configuration file."""
project_path = self.get_user_project_path(user_id, group_id, project_name)
config_file = project_path / filename
if config_file.exists():
try:
with open(config_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"Error loading config file {config_file}: {e}")
return {}
return {}
def save_config_file(
self, user_id: int, group_id: int, project_name: str, filename: str, data: Dict
):
"""Save JSON data to configuration file."""
project_path = self.get_user_project_path(user_id, group_id, project_name)
project_path.mkdir(parents=True, exist_ok=True)
config_file = project_path / filename
try:
with open(config_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error saving config file {config_file}: {e}")
raise
def list_user_projects(self, user_id: int, group_id: int) -> List[Dict]:
"""List all projects for user in specific group."""
projects = UserProject.query.filter_by(user_id=user_id, group_id=group_id).all()
result = []
for project in projects:
project_data = project.to_dict()
# Add file system information
project_path = self.get_user_project_path(
user_id, group_id, project.project_name
)
if project_path.exists():
project_data["files_count"] = len(list(project_path.glob("*.json")))
project_data["disk_usage"] = self.get_directory_size(project_path)
else:
project_data["files_count"] = 0
project_data["disk_usage"] = 0
result.append(project_data)
return result
def get_directory_size(self, path: Path) -> int:
"""Get total size of directory in bytes."""
total_size = 0
try:
for file_path in path.rglob("*"):
if file_path.is_file():
total_size += file_path.stat().st_size
except OSError:
pass
return total_size
def create_project(
self, user_id: int, group_id: int, project_name: str, description: str = ""
) -> UserProject:
"""Create a new project for user."""
# Check if project already exists
existing = UserProject.query.filter_by(
user_id=user_id, group_id=group_id, project_name=project_name
).first()
if existing:
raise ValueError(f"Project '{project_name}' already exists")
# Create project record
project = UserProject(
user_id=user_id,
group_id=group_id,
project_name=project_name,
description=description,
)
db.session.add(project)
db.session.commit()
# Create directory structure
self.ensure_project_directory(user_id, group_id, project_name)
return project
def get_or_create_default_project(self, user_id: int, group_id: int) -> UserProject:
"""Get or create default project for user."""
default_project = UserProject.query.filter_by(
user_id=user_id, group_id=group_id, project_name="project_default"
).first()
if not default_project:
default_project = UserProject(
user_id=user_id,
group_id=group_id,
project_name="project_default",
description="Default project",
is_default=True,
)
db.session.add(default_project)
db.session.commit()
# Create directory structure
self.ensure_project_directory(user_id, group_id, "project_default")
return default_project
def delete_project(self, user_id: int, project_id: int) -> bool:
"""Delete a project and its data."""
project = UserProject.query.filter_by(id=project_id, user_id=user_id).first()
if not project:
return False
# Don't delete default project
if project.is_default:
raise ValueError("Cannot delete default project")
try:
# Remove directory
project_path = self.get_user_project_path(
user_id, project.group_id, project.project_name
)
if project_path.exists():
shutil.rmtree(project_path)
# Remove database record
db.session.delete(project)
db.session.commit()
return True
except Exception as e:
db.session.rollback()
print(f"Error deleting project {project_id}: {e}")
return False
def backup_project_data(
self, user_id: int, group_id: int, project_name: str
) -> Optional[Path]:
"""Create timestamped backup of project data."""
project_path = self.get_user_project_path(user_id, group_id, project_name)
if not project_path.exists():
return None
# Create backup directory
backup_dir = self.base_path / "backups"
backup_dir.mkdir(exist_ok=True)
# Create backup filename
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_name = f"user_{user_id}_project_{project_name}_{timestamp}.zip"
backup_path = backup_dir / backup_name
try:
# Create zip archive
shutil.make_archive(
str(backup_path.with_suffix("")), "zip", str(project_path)
)
return backup_path
except Exception as e:
print(f"Error creating backup for project {project_name}: {e}")
return None
def list_project_files(
self, user_id: int, group_id: int, project_name: str
) -> List[Dict]:
"""List all files in a project directory."""
project_path = self.get_user_project_path(user_id, group_id, project_name)
if not project_path.exists():
return []
files = []
for file_path in project_path.iterdir():
if file_path.is_file():
stat = file_path.stat()
files.append(
{
"name": file_path.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"type": file_path.suffix[1:] if file_path.suffix else "file",
}
)
return sorted(files, key=lambda x: x["name"])
def get_project_file_content(
self, user_id: int, group_id: int, project_name: str, filename: str
) -> Optional[str]:
"""Get content of a specific project file."""
project_path = self.get_user_project_path(user_id, group_id, project_name)
file_path = project_path / filename
if not file_path.exists() or not file_path.is_file():
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except IOError as e:
print(f"Error reading file {file_path}: {e}")
return None
def save_project_file_content(
self,
user_id: int,
group_id: int,
project_name: str,
filename: str,
content: str,
) -> bool:
"""Save content to a project file."""
project_path = self.get_user_project_path(user_id, group_id, project_name)
project_path.mkdir(parents=True, exist_ok=True)
file_path = project_path / filename
try:
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
return True
except IOError as e:
print(f"Error writing file {file_path}: {e}")
return False