AutoBackups/autobackups/src/services/project_discovery_service.py

326 lines
13 KiB
Python

"""
Project Discovery Service
Servicio para descubrir proyectos S7 usando Everything API
"""
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Any, Optional
import logging
from ..models.project_model import Project, ProjectManager
from ..models.config_model import Config
from ..utils.everything_wrapper import EverythingSearcher, create_everything_searcher
class ProjectDiscoveryService:
"""Servicio para descubrir y gestionar proyectos"""
def __init__(self, config: Config, project_manager: ProjectManager):
self.config = config
self.project_manager = project_manager
self.logger = logging.getLogger(__name__)
# Inicializar Everything searcher
dll_path = config.get_dll_path()
self.everything_searcher = create_everything_searcher(dll_path)
if not self.everything_searcher:
self.logger.warning("Everything API no disponible, funcionalidad limitada")
def discover_all_projects(self) -> List[Project]:
"""
Descubrir todos los proyectos en los directorios de observación
"""
self.logger.info("Iniciando descubrimiento de proyectos...")
discovered_projects = []
# Obtener directorios de observación habilitados
observation_dirs = [
obs_dir for obs_dir in self.config.observation_directories
if obs_dir.get("enabled", True)
]
for obs_dir in observation_dirs:
dir_path = obs_dir["path"]
dir_type = obs_dir["type"]
self.logger.info(f"Escaneando directorio: {dir_path} (tipo: {dir_type})")
if dir_type == "siemens_s7":
projects = self._discover_s7_projects(obs_dir)
else: # manual directories
projects = self._discover_manual_projects(obs_dir)
discovered_projects.extend(projects)
self.logger.info(f"Descubrimiento completado: {len(discovered_projects)} proyectos encontrados")
# Actualizar project manager con proyectos descubiertos
self._update_project_manager(discovered_projects)
return discovered_projects
def _discover_s7_projects(self, obs_dir: Dict[str, Any]) -> List[Project]:
"""Descubrir proyectos S7 en un directorio de observación"""
projects = []
dir_path = obs_dir["path"]
try:
if self.everything_searcher and self.everything_searcher.is_everything_available():
# Usar Everything API para búsqueda rápida
s7p_files = self.everything_searcher.search_s7p_files([dir_path])
else:
# Fallback a búsqueda manual
s7p_files = self._manual_search_s7p_files(dir_path)
self.logger.debug(f"Encontrados {len(s7p_files)} archivos .s7p en {dir_path}")
for s7p_file in s7p_files:
project = self._create_project_from_s7p(s7p_file, obs_dir)
if project:
projects.append(project)
self.logger.debug(f"Proyecto creado: {project.name}")
except Exception as e:
self.logger.error(f"Error descubriendo proyectos S7 en {dir_path}: {e}")
return projects
def _discover_manual_projects(self, obs_dir: Dict[str, Any]) -> List[Project]:
"""Descubrir proyectos de directorio manual"""
projects = []
dir_path = obs_dir["path"]
try:
# Para directorios manuales, crear un proyecto por directorio
if Path(dir_path).exists() and Path(dir_path).is_dir():
project = self._create_manual_project(obs_dir)
if project:
projects.append(project)
self.logger.debug(f"Proyecto manual creado: {project.name}")
except Exception as e:
self.logger.error(f"Error creando proyecto manual para {dir_path}: {e}")
return projects
def _manual_search_s7p_files(self, directory: str) -> List[str]:
"""Búsqueda manual de archivos .s7p como fallback"""
s7p_files = []
try:
path = Path(directory)
if not path.exists():
return s7p_files
# Búsqueda recursiva con optimización (evitar último nivel)
skip_last_level = self.config.everything_api.get("skip_last_level_for_s7p", True)
for file_path in path.rglob("*.s7p"):
if skip_last_level:
# Verificar que no esté en el último nivel (debe tener subdirectorios)
parent_dir = file_path.parent
has_subdirs = any(item.is_dir() for item in parent_dir.iterdir())
if not has_subdirs:
continue # Saltar archivos en directorios sin subdirectorios
s7p_files.append(str(file_path))
except Exception as e:
self.logger.error(f"Error en búsqueda manual de S7P en {directory}: {e}")
return s7p_files
def _create_project_from_s7p(self, s7p_file_path: str, obs_dir: Dict[str, Any]) -> Optional[Project]:
"""Crear un objeto Project desde un archivo .s7p"""
try:
s7p_path = Path(s7p_file_path)
project_dir = s7p_path.parent
obs_path = Path(obs_dir["path"])
# Generar ID único para el proyecto
project_id = self._generate_project_id(s7p_file_path)
# Determinar nombre del proyecto
project_name = s7p_path.stem
# Calcular ruta relativa
try:
relative_path = project_dir.relative_to(obs_path)
backup_path = str(relative_path)
except ValueError:
# Si no se puede calcular relativa, usar nombre del directorio
backup_path = project_dir.name
relative_path = Path(project_dir.name)
# Crear datos del proyecto
project_data = {
"id": project_id,
"name": project_name,
"path": str(project_dir),
"type": "siemens_s7",
"s7p_file": s7p_file_path,
"observation_directory": obs_dir["path"],
"relative_path": str(relative_path),
"backup_path": backup_path,
"schedule_config": {
"schedule": self.config.global_settings.get("default_schedule", "daily"),
"schedule_time": self.config.global_settings.get("default_schedule_time", "02:00"),
"enabled": True,
"next_scheduled_backup": ""
},
"backup_history": {
"last_backup_date": "",
"last_backup_file": "",
"backup_count": 0,
"last_successful_backup": ""
},
"hash_info": {
"last_s7p_hash": "",
"last_full_hash": "",
"last_s7p_timestamp": "",
"last_s7p_size": 0,
"last_scan_timestamp": "",
"file_count": 0,
"total_size_bytes": 0
},
"status": {
"current_status": "ready",
"last_error": None,
"retry_count": 0,
"next_retry": None,
"files_in_use": False,
"exclusivity_check_passed": True,
"last_status_update": datetime.now(timezone.utc).isoformat()
},
"discovery_info": {
"discovered_date": datetime.now(timezone.utc).isoformat(),
"discovery_method": "everything_api" if self.everything_searcher else "manual_search",
"auto_discovered": True
}
}
return Project(project_data)
except Exception as e:
self.logger.error(f"Error creando proyecto desde {s7p_file_path}: {e}")
return None
def _create_manual_project(self, obs_dir: Dict[str, Any]) -> Optional[Project]:
"""Crear un proyecto para directorio manual"""
try:
dir_path = Path(obs_dir["path"])
# Generar ID único
project_id = self._generate_project_id(str(dir_path))
# Usar nombre del directorio como nombre del proyecto
project_name = dir_path.name
project_data = {
"id": project_id,
"name": project_name,
"path": str(dir_path),
"type": "manual",
"s7p_file": "",
"observation_directory": str(dir_path.parent),
"relative_path": dir_path.name,
"backup_path": dir_path.name,
"schedule_config": {
"schedule": self.config.global_settings.get("default_schedule", "daily"),
"schedule_time": self.config.global_settings.get("default_schedule_time", "02:00"),
"enabled": True,
"next_scheduled_backup": ""
},
"backup_history": {
"last_backup_date": "",
"last_backup_file": "",
"backup_count": 0,
"last_successful_backup": ""
},
"hash_info": {
"last_s7p_hash": "",
"last_full_hash": "",
"last_s7p_timestamp": "",
"last_s7p_size": 0,
"last_scan_timestamp": "",
"file_count": 0,
"total_size_bytes": 0
},
"status": {
"current_status": "ready",
"last_error": None,
"retry_count": 0,
"next_retry": None,
"files_in_use": False,
"exclusivity_check_passed": True,
"last_status_update": datetime.now(timezone.utc).isoformat()
},
"discovery_info": {
"discovered_date": datetime.now(timezone.utc).isoformat(),
"discovery_method": "manual_directory",
"auto_discovered": True
}
}
return Project(project_data)
except Exception as e:
self.logger.error(f"Error creando proyecto manual para {obs_dir['path']}: {e}")
return None
def _generate_project_id(self, path: str) -> str:
"""Generar ID único para un proyecto basado en su ruta"""
# Usar hash de la ruta + timestamp para garantizar unicidad
import hashlib
path_hash = hashlib.md5(path.encode()).hexdigest()[:8]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"project_{path_hash}_{timestamp}"
def _update_project_manager(self, discovered_projects: List[Project]) -> None:
"""Actualizar el project manager con proyectos descubiertos"""
try:
# Obtener proyectos existentes
existing_projects = {p.path: p for p in self.project_manager.get_all_projects()}
# Agregar nuevos proyectos
new_projects_count = 0
for project in discovered_projects:
if project.path not in existing_projects:
self.project_manager.add_project(project.to_dict())
new_projects_count += 1
self.logger.debug(f"Proyecto agregado: {project.name}")
if new_projects_count > 0:
self.logger.info(f"Se agregaron {new_projects_count} nuevos proyectos")
else:
self.logger.info("No se encontraron nuevos proyectos")
# Actualizar estadísticas
self.project_manager.update_statistics()
except Exception as e:
self.logger.error(f"Error actualizando project manager: {e}")
def rescan_observation_directories(self) -> int:
"""
Re-escanear directorios de observación para nuevos proyectos
Retorna el número de nuevos proyectos encontrados
"""
self.logger.info("Iniciando re-escaneo de directorios de observación...")
initial_count = len(self.project_manager.get_all_projects())
# Re-descubrir proyectos
self.discover_all_projects()
final_count = len(self.project_manager.get_all_projects())
new_projects = final_count - initial_count
self.logger.info(f"Re-escaneo completado: {new_projects} nuevos proyectos encontrados")
return new_projects