SIDEL_ScriptsManager/app/services/script_discovery.py

312 lines
12 KiB
Python

import os
import json
import re
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from app.models import ScriptGroup, Script
from app.config.database import db
class ScriptDiscoveryService:
"""Service for discovering and managing scripts."""
def __init__(self, backend_path: str = "app/backend/script_groups"):
self.backend_path = Path(backend_path)
self.observer = None
def scan_script_groups(self) -> List[ScriptGroup]:
"""Scan for script groups and update database."""
if not self.backend_path.exists():
self.backend_path.mkdir(parents=True, exist_ok=True)
return []
discovered_groups = []
for group_dir in self.backend_path.iterdir():
if group_dir.is_dir() and not group_dir.name.startswith("."):
group = self.process_script_group(group_dir)
if group:
discovered_groups.append(group)
return discovered_groups
def process_script_group(self, group_path: Path) -> Optional[ScriptGroup]:
"""Process a single script group directory."""
metadata_file = group_path / "metadata.json"
# Load or create metadata
if metadata_file.exists():
with open(metadata_file, "r", encoding="utf-8") as f:
metadata = json.load(f)
else:
metadata = self.create_default_group_metadata(group_path.name)
self.save_group_metadata(group_path, metadata)
# Check if group exists in database
existing_group = ScriptGroup.query.filter_by(
directory_path=str(group_path)
).first()
if existing_group:
# Update existing group
self.update_group_from_metadata(existing_group, metadata)
script_group = existing_group
else:
# Create new group
script_group = ScriptGroup(
name=metadata.get("name", group_path.name),
directory_path=str(group_path),
description=json.dumps(metadata.get("description", {})),
required_level=metadata.get("required_level", "viewer"),
conda_environment=metadata.get("conda_environment", "base"),
)
db.session.add(script_group)
db.session.flush() # Get ID
# Discover scripts in this group
self.discover_scripts_in_group(script_group, group_path)
db.session.commit()
return script_group
def create_default_group_metadata(self, group_name: str) -> Dict:
"""Create default metadata for a script group."""
return {
"name": group_name.replace("_", " ").title(),
"description": {
"en": f"Scripts in {group_name} group",
"es": f"Scripts en el grupo {group_name}",
"it": f"Script nel gruppo {group_name}",
"fr": f"Scripts dans le groupe {group_name}",
},
"icon": "folder",
"required_level": "operator",
"category": "general",
"conda_environment": "base",
"auto_discovery": True,
"execution_timeout": 300,
}
def save_group_metadata(self, group_path: Path, metadata: Dict):
"""Save group metadata to file."""
metadata_file = group_path / "metadata.json"
with open(metadata_file, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
def update_group_from_metadata(self, group: ScriptGroup, metadata: Dict):
"""Update script group from metadata."""
group.name = metadata.get("name", group.name)
group.description = json.dumps(metadata.get("description", {}))
group.required_level = metadata.get("required_level", group.required_level)
group.conda_environment = metadata.get(
"conda_environment", group.conda_environment
)
def discover_scripts_in_group(self, group: ScriptGroup, group_path: Path):
"""Discover all scripts in a group directory."""
script_extensions = [".py"] # Can be extended for other types
# Track existing files to identify removed scripts
found_scripts = set()
for script_file in group_path.rglob("*"):
if (
script_file.is_file()
and script_file.suffix in script_extensions
and not script_file.name.startswith(".")
):
relative_path = script_file.relative_to(group_path)
found_scripts.add(str(relative_path))
self.process_script_file(group, script_file, group_path)
# Remove scripts from database that no longer exist on disk
self.cleanup_removed_scripts(group, found_scripts)
def process_script_file(
self, group: ScriptGroup, script_path: Path, group_path: Path
):
"""Process a single script file."""
relative_path = script_path.relative_to(group_path)
# Check if script exists in database
existing_script = Script.query.filter_by(
group_id=group.id, filename=str(relative_path)
).first()
if existing_script:
# Update existing script if file was modified
if script_path.stat().st_mtime > existing_script.last_modified.timestamp():
self.update_script_from_file(existing_script, script_path)
else:
# Create new script
script_metadata = self.parse_script_header(script_path)
new_script = Script(
group_id=group.id,
filename=str(relative_path),
display_name=script_metadata.get("display_name", script_path.stem),
description=script_metadata.get("description", ""),
description_long_path=script_metadata.get("description_long_path"),
required_level=script_metadata.get(
"required_level", group.required_level
),
last_modified=datetime.fromtimestamp(script_path.stat().st_mtime),
)
# Set parameters and tags
new_script.set_parameters(script_metadata.get("parameters", []))
new_script.set_tags_list(script_metadata.get("tags", []))
db.session.add(new_script)
def cleanup_removed_scripts(self, group: ScriptGroup, found_scripts: set):
"""Remove scripts from database that no longer exist on disk."""
# Get all scripts in the database for this group
existing_scripts = Script.query.filter_by(group_id=group.id).all()
for script in existing_scripts:
if script.filename not in found_scripts:
print(
f"Removing script from database: {script.filename} "
f"(file no longer exists)"
)
db.session.delete(script)
def parse_script_header(self, script_path: Path) -> Dict:
"""Parse script header for metadata."""
metadata = {}
try:
with open(script_path, "r", encoding="utf-8") as f:
content = f.read()
# Look for ScriptsManager metadata in docstring or comments
metadata_pattern = r"ScriptsManager Metadata:(.*?)(?:\n\"\"\"|$)"
match = re.search(metadata_pattern, content, re.DOTALL | re.IGNORECASE)
if match:
metadata_content = match.group(1)
metadata = self.parse_metadata_content(metadata_content)
except Exception as e:
print(f"Error parsing script header {script_path}: {e}")
return metadata
def parse_metadata_content(self, content: str) -> Dict:
"""Parse metadata content from script header."""
metadata = {}
# Parse individual metadata fields
patterns = {
"description": r"@description:\s*(.+)",
"description_long": r"@description_long:\s*(.+)",
"required_level": r"@required_level:\s*(.+)",
"category": r"@category:\s*(.+)",
"tags": r"@tags:\s*(.+)",
"execution_timeout": r"@execution_timeout:\s*(\d+)",
"flask_port": r"@flask_port:\s*(\d+)",
}
for key, pattern in patterns.items():
match = re.search(pattern, content, re.IGNORECASE)
if match:
value = match.group(1).strip()
if key == "tags":
metadata[key] = [tag.strip() for tag in value.split(",")]
elif key in ["execution_timeout", "flask_port"]:
metadata[key] = int(value)
else:
metadata[key] = value
# Parse parameters (JSON array)
params_pattern = r"@parameters:\s*(\[.*?\])"
params_match = re.search(params_pattern, content, re.DOTALL | re.IGNORECASE)
if params_match:
try:
metadata["parameters"] = json.loads(params_match.group(1))
except json.JSONDecodeError:
pass
return metadata
def update_script_from_file(self, script: Script, script_path: Path):
"""Update script metadata from file."""
metadata = self.parse_script_header(script_path)
# Update fields that should be refreshed from file
if "description" in metadata:
script.description = metadata["description"]
if "required_level" in metadata:
script.required_level = metadata["required_level"]
if "parameters" in metadata:
script.set_parameters(metadata["parameters"])
if "tags" in metadata:
script.set_tags_list(metadata["tags"])
script.last_modified = datetime.fromtimestamp(script_path.stat().st_mtime)
def start_file_watcher(self):
"""Start file system watcher for real-time discovery."""
if not self.backend_path.exists():
return
class ScriptFileHandler(FileSystemEventHandler):
def __init__(self, discovery_service):
self.discovery_service = discovery_service
def on_modified(self, event):
if not event.is_directory:
# Re-scan when files are modified
self.discovery_service.scan_script_groups()
def on_created(self, event):
if not event.is_directory:
# Re-scan when new files are created
self.discovery_service.scan_script_groups()
handler = ScriptFileHandler(self)
self.observer = Observer()
self.observer.schedule(handler, str(self.backend_path), recursive=True)
self.observer.start()
def stop_file_watcher(self):
"""Stop file system watcher."""
if self.observer:
self.observer.stop()
self.observer.join()
self.observer = None
def update_group_metadata_file(self, group: ScriptGroup, updates: Dict):
"""Update specific fields in the group's metadata.json file."""
group_path = Path(group.directory_path)
metadata_file = group_path / "metadata.json"
if not metadata_file.exists():
print(
f"WARNING: metadata.json not found for group {group.name} at {metadata_file}"
)
return False
try:
# Read existing metadata
with open(metadata_file, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Update with new values
metadata.update(updates)
# Write back to file
with open(metadata_file, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print(f"DEBUG: Successfully updated {metadata_file} with {updates}")
return True
except Exception as e:
print(f"ERROR: Failed to update metadata.json for group {group.name}: {e}")
return False