import os import json import re from pathlib import Path from typing import Dict, List, Optional from datetime import datetime from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from app.models import ScriptGroup, Script from app.config.database import db class ScriptDiscoveryService: """Service for discovering and managing scripts.""" def __init__(self, backend_path: str = "app/backend/script_groups"): self.backend_path = Path(backend_path) self.observer = None def scan_script_groups(self) -> List[ScriptGroup]: """Scan for script groups and update database.""" if not self.backend_path.exists(): self.backend_path.mkdir(parents=True, exist_ok=True) return [] discovered_groups = [] for group_dir in self.backend_path.iterdir(): if group_dir.is_dir() and not group_dir.name.startswith("."): group = self.process_script_group(group_dir) if group: discovered_groups.append(group) return discovered_groups def process_script_group(self, group_path: Path) -> Optional[ScriptGroup]: """Process a single script group directory.""" metadata_file = group_path / "metadata.json" # Load or create metadata if metadata_file.exists(): with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) else: metadata = self.create_default_group_metadata(group_path.name) self.save_group_metadata(group_path, metadata) # Check if group exists in database existing_group = ScriptGroup.query.filter_by( directory_path=str(group_path) ).first() if existing_group: # Update existing group self.update_group_from_metadata(existing_group, metadata) script_group = existing_group else: # Create new group script_group = ScriptGroup( name=metadata.get("name", group_path.name), directory_path=str(group_path), description=json.dumps(metadata.get("description", {})), required_level=metadata.get("required_level", "viewer"), conda_environment=metadata.get("conda_environment", "base"), ) db.session.add(script_group) db.session.flush() # Get ID # Discover scripts in this group self.discover_scripts_in_group(script_group, group_path) db.session.commit() return script_group def create_default_group_metadata(self, group_name: str) -> Dict: """Create default metadata for a script group.""" return { "name": group_name.replace("_", " ").title(), "description": { "en": f"Scripts in {group_name} group", "es": f"Scripts en el grupo {group_name}", "it": f"Script nel gruppo {group_name}", "fr": f"Scripts dans le groupe {group_name}", }, "icon": "folder", "required_level": "operator", "category": "general", "conda_environment": "base", "auto_discovery": True, "execution_timeout": 300, } def save_group_metadata(self, group_path: Path, metadata: Dict): """Save group metadata to file.""" metadata_file = group_path / "metadata.json" with open(metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, ensure_ascii=False) def update_group_from_metadata(self, group: ScriptGroup, metadata: Dict): """Update script group from metadata.""" group.name = metadata.get("name", group.name) group.description = json.dumps(metadata.get("description", {})) group.required_level = metadata.get("required_level", group.required_level) group.conda_environment = metadata.get( "conda_environment", group.conda_environment ) def discover_scripts_in_group(self, group: ScriptGroup, group_path: Path): """Discover all scripts in a group directory.""" script_extensions = [".py"] # Can be extended for other types # Track existing files to identify removed scripts found_scripts = set() for script_file in group_path.rglob("*"): if ( script_file.is_file() and script_file.suffix in script_extensions and not script_file.name.startswith(".") ): relative_path = script_file.relative_to(group_path) found_scripts.add(str(relative_path)) self.process_script_file(group, script_file, group_path) # Remove scripts from database that no longer exist on disk self.cleanup_removed_scripts(group, found_scripts) def process_script_file( self, group: ScriptGroup, script_path: Path, group_path: Path ): """Process a single script file.""" relative_path = script_path.relative_to(group_path) # Check if script exists in database existing_script = Script.query.filter_by( group_id=group.id, filename=str(relative_path) ).first() if existing_script: # Update existing script if file was modified if script_path.stat().st_mtime > existing_script.last_modified.timestamp(): self.update_script_from_file(existing_script, script_path) else: # Create new script script_metadata = self.parse_script_header(script_path) new_script = Script( group_id=group.id, filename=str(relative_path), display_name=script_metadata.get("display_name", script_path.stem), description=script_metadata.get("description", ""), description_long_path=script_metadata.get("description_long_path"), required_level=script_metadata.get( "required_level", group.required_level ), last_modified=datetime.fromtimestamp(script_path.stat().st_mtime), ) # Set parameters and tags new_script.set_parameters(script_metadata.get("parameters", [])) new_script.set_tags_list(script_metadata.get("tags", [])) db.session.add(new_script) def cleanup_removed_scripts(self, group: ScriptGroup, found_scripts: set): """Remove scripts from database that no longer exist on disk.""" # Get all scripts in the database for this group existing_scripts = Script.query.filter_by(group_id=group.id).all() for script in existing_scripts: if script.filename not in found_scripts: print( f"Removing script from database: {script.filename} " f"(file no longer exists)" ) db.session.delete(script) def parse_script_header(self, script_path: Path) -> Dict: """Parse script header for metadata.""" metadata = {} try: with open(script_path, "r", encoding="utf-8") as f: content = f.read() # Look for ScriptsManager metadata in docstring or comments metadata_pattern = r"ScriptsManager Metadata:(.*?)(?:\n\"\"\"|$)" match = re.search(metadata_pattern, content, re.DOTALL | re.IGNORECASE) if match: metadata_content = match.group(1) metadata = self.parse_metadata_content(metadata_content) except Exception as e: print(f"Error parsing script header {script_path}: {e}") return metadata def parse_metadata_content(self, content: str) -> Dict: """Parse metadata content from script header.""" metadata = {} # Parse individual metadata fields patterns = { "description": r"@description:\s*(.+)", "description_long": r"@description_long:\s*(.+)", "required_level": r"@required_level:\s*(.+)", "category": r"@category:\s*(.+)", "tags": r"@tags:\s*(.+)", "execution_timeout": r"@execution_timeout:\s*(\d+)", "flask_port": r"@flask_port:\s*(\d+)", } for key, pattern in patterns.items(): match = re.search(pattern, content, re.IGNORECASE) if match: value = match.group(1).strip() if key == "tags": metadata[key] = [tag.strip() for tag in value.split(",")] elif key in ["execution_timeout", "flask_port"]: metadata[key] = int(value) else: metadata[key] = value # Parse parameters (JSON array) params_pattern = r"@parameters:\s*(\[.*?\])" params_match = re.search(params_pattern, content, re.DOTALL | re.IGNORECASE) if params_match: try: metadata["parameters"] = json.loads(params_match.group(1)) except json.JSONDecodeError: pass return metadata def update_script_from_file(self, script: Script, script_path: Path): """Update script metadata from file.""" metadata = self.parse_script_header(script_path) # Update fields that should be refreshed from file if "description" in metadata: script.description = metadata["description"] if "required_level" in metadata: script.required_level = metadata["required_level"] if "parameters" in metadata: script.set_parameters(metadata["parameters"]) if "tags" in metadata: script.set_tags_list(metadata["tags"]) script.last_modified = datetime.fromtimestamp(script_path.stat().st_mtime) def start_file_watcher(self): """Start file system watcher for real-time discovery.""" if not self.backend_path.exists(): return class ScriptFileHandler(FileSystemEventHandler): def __init__(self, discovery_service): self.discovery_service = discovery_service def on_modified(self, event): if not event.is_directory: # Re-scan when files are modified self.discovery_service.scan_script_groups() def on_created(self, event): if not event.is_directory: # Re-scan when new files are created self.discovery_service.scan_script_groups() handler = ScriptFileHandler(self) self.observer = Observer() self.observer.schedule(handler, str(self.backend_path), recursive=True) self.observer.start() def stop_file_watcher(self): """Stop file system watcher.""" if self.observer: self.observer.stop() self.observer.join() self.observer = None def update_group_metadata_file(self, group: ScriptGroup, updates: Dict): """Update specific fields in the group's metadata.json file.""" group_path = Path(group.directory_path) metadata_file = group_path / "metadata.json" if not metadata_file.exists(): print( f"WARNING: metadata.json not found for group {group.name} at {metadata_file}" ) return False try: # Read existing metadata with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) # Update with new values metadata.update(updates) # Write back to file with open(metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, ensure_ascii=False) print(f"DEBUG: Successfully updated {metadata_file} with {updates}") return True except Exception as e: print(f"ERROR: Failed to update metadata.json for group {group.name}: {e}") return False