312 lines
12 KiB
Python
312 lines
12 KiB
Python
import os
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
from datetime import datetime
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from app.models import ScriptGroup, Script
|
|
from app.config.database import db
|
|
|
|
|
|
class ScriptDiscoveryService:
|
|
"""Service for discovering and managing scripts."""
|
|
|
|
def __init__(self, backend_path: str = "app/backend/script_groups"):
|
|
self.backend_path = Path(backend_path)
|
|
self.observer = None
|
|
|
|
def scan_script_groups(self) -> List[ScriptGroup]:
|
|
"""Scan for script groups and update database."""
|
|
if not self.backend_path.exists():
|
|
self.backend_path.mkdir(parents=True, exist_ok=True)
|
|
return []
|
|
|
|
discovered_groups = []
|
|
|
|
for group_dir in self.backend_path.iterdir():
|
|
if group_dir.is_dir() and not group_dir.name.startswith("."):
|
|
group = self.process_script_group(group_dir)
|
|
if group:
|
|
discovered_groups.append(group)
|
|
|
|
return discovered_groups
|
|
|
|
def process_script_group(self, group_path: Path) -> Optional[ScriptGroup]:
|
|
"""Process a single script group directory."""
|
|
metadata_file = group_path / "metadata.json"
|
|
|
|
# Load or create metadata
|
|
if metadata_file.exists():
|
|
with open(metadata_file, "r", encoding="utf-8") as f:
|
|
metadata = json.load(f)
|
|
else:
|
|
metadata = self.create_default_group_metadata(group_path.name)
|
|
self.save_group_metadata(group_path, metadata)
|
|
|
|
# Check if group exists in database
|
|
existing_group = ScriptGroup.query.filter_by(
|
|
directory_path=str(group_path)
|
|
).first()
|
|
|
|
if existing_group:
|
|
# Update existing group
|
|
self.update_group_from_metadata(existing_group, metadata)
|
|
script_group = existing_group
|
|
else:
|
|
# Create new group
|
|
script_group = ScriptGroup(
|
|
name=metadata.get("name", group_path.name),
|
|
directory_path=str(group_path),
|
|
description=json.dumps(metadata.get("description", {})),
|
|
required_level=metadata.get("required_level", "viewer"),
|
|
conda_environment=metadata.get("conda_environment", "base"),
|
|
)
|
|
db.session.add(script_group)
|
|
db.session.flush() # Get ID
|
|
|
|
# Discover scripts in this group
|
|
self.discover_scripts_in_group(script_group, group_path)
|
|
|
|
db.session.commit()
|
|
return script_group
|
|
|
|
def create_default_group_metadata(self, group_name: str) -> Dict:
|
|
"""Create default metadata for a script group."""
|
|
return {
|
|
"name": group_name.replace("_", " ").title(),
|
|
"description": {
|
|
"en": f"Scripts in {group_name} group",
|
|
"es": f"Scripts en el grupo {group_name}",
|
|
"it": f"Script nel gruppo {group_name}",
|
|
"fr": f"Scripts dans le groupe {group_name}",
|
|
},
|
|
"icon": "folder",
|
|
"required_level": "operator",
|
|
"category": "general",
|
|
"conda_environment": "base",
|
|
"auto_discovery": True,
|
|
"execution_timeout": 300,
|
|
}
|
|
|
|
def save_group_metadata(self, group_path: Path, metadata: Dict):
|
|
"""Save group metadata to file."""
|
|
metadata_file = group_path / "metadata.json"
|
|
with open(metadata_file, "w", encoding="utf-8") as f:
|
|
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
|
|
|
def update_group_from_metadata(self, group: ScriptGroup, metadata: Dict):
|
|
"""Update script group from metadata."""
|
|
group.name = metadata.get("name", group.name)
|
|
group.description = json.dumps(metadata.get("description", {}))
|
|
group.required_level = metadata.get("required_level", group.required_level)
|
|
group.conda_environment = metadata.get(
|
|
"conda_environment", group.conda_environment
|
|
)
|
|
|
|
def discover_scripts_in_group(self, group: ScriptGroup, group_path: Path):
|
|
"""Discover all scripts in a group directory."""
|
|
script_extensions = [".py"] # Can be extended for other types
|
|
|
|
# Track existing files to identify removed scripts
|
|
found_scripts = set()
|
|
|
|
for script_file in group_path.rglob("*"):
|
|
if (
|
|
script_file.is_file()
|
|
and script_file.suffix in script_extensions
|
|
and not script_file.name.startswith(".")
|
|
):
|
|
relative_path = script_file.relative_to(group_path)
|
|
found_scripts.add(str(relative_path))
|
|
self.process_script_file(group, script_file, group_path)
|
|
|
|
# Remove scripts from database that no longer exist on disk
|
|
self.cleanup_removed_scripts(group, found_scripts)
|
|
|
|
def process_script_file(
|
|
self, group: ScriptGroup, script_path: Path, group_path: Path
|
|
):
|
|
"""Process a single script file."""
|
|
relative_path = script_path.relative_to(group_path)
|
|
|
|
# Check if script exists in database
|
|
existing_script = Script.query.filter_by(
|
|
group_id=group.id, filename=str(relative_path)
|
|
).first()
|
|
|
|
if existing_script:
|
|
# Update existing script if file was modified
|
|
if script_path.stat().st_mtime > existing_script.last_modified.timestamp():
|
|
self.update_script_from_file(existing_script, script_path)
|
|
else:
|
|
# Create new script
|
|
script_metadata = self.parse_script_header(script_path)
|
|
new_script = Script(
|
|
group_id=group.id,
|
|
filename=str(relative_path),
|
|
display_name=script_metadata.get("display_name", script_path.stem),
|
|
description=script_metadata.get("description", ""),
|
|
description_long_path=script_metadata.get("description_long_path"),
|
|
required_level=script_metadata.get(
|
|
"required_level", group.required_level
|
|
),
|
|
last_modified=datetime.fromtimestamp(script_path.stat().st_mtime),
|
|
)
|
|
|
|
# Set parameters and tags
|
|
new_script.set_parameters(script_metadata.get("parameters", []))
|
|
new_script.set_tags_list(script_metadata.get("tags", []))
|
|
|
|
db.session.add(new_script)
|
|
|
|
def cleanup_removed_scripts(self, group: ScriptGroup, found_scripts: set):
|
|
"""Remove scripts from database that no longer exist on disk."""
|
|
# Get all scripts in the database for this group
|
|
existing_scripts = Script.query.filter_by(group_id=group.id).all()
|
|
|
|
for script in existing_scripts:
|
|
if script.filename not in found_scripts:
|
|
print(
|
|
f"Removing script from database: {script.filename} "
|
|
f"(file no longer exists)"
|
|
)
|
|
db.session.delete(script)
|
|
|
|
def parse_script_header(self, script_path: Path) -> Dict:
|
|
"""Parse script header for metadata."""
|
|
metadata = {}
|
|
|
|
try:
|
|
with open(script_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
# Look for ScriptsManager metadata in docstring or comments
|
|
metadata_pattern = r"ScriptsManager Metadata:(.*?)(?:\n\"\"\"|$)"
|
|
match = re.search(metadata_pattern, content, re.DOTALL | re.IGNORECASE)
|
|
|
|
if match:
|
|
metadata_content = match.group(1)
|
|
metadata = self.parse_metadata_content(metadata_content)
|
|
|
|
except Exception as e:
|
|
print(f"Error parsing script header {script_path}: {e}")
|
|
|
|
return metadata
|
|
|
|
def parse_metadata_content(self, content: str) -> Dict:
|
|
"""Parse metadata content from script header."""
|
|
metadata = {}
|
|
|
|
# Parse individual metadata fields
|
|
patterns = {
|
|
"description": r"@description:\s*(.+)",
|
|
"description_long": r"@description_long:\s*(.+)",
|
|
"required_level": r"@required_level:\s*(.+)",
|
|
"category": r"@category:\s*(.+)",
|
|
"tags": r"@tags:\s*(.+)",
|
|
"execution_timeout": r"@execution_timeout:\s*(\d+)",
|
|
"flask_port": r"@flask_port:\s*(\d+)",
|
|
}
|
|
|
|
for key, pattern in patterns.items():
|
|
match = re.search(pattern, content, re.IGNORECASE)
|
|
if match:
|
|
value = match.group(1).strip()
|
|
if key == "tags":
|
|
metadata[key] = [tag.strip() for tag in value.split(",")]
|
|
elif key in ["execution_timeout", "flask_port"]:
|
|
metadata[key] = int(value)
|
|
else:
|
|
metadata[key] = value
|
|
|
|
# Parse parameters (JSON array)
|
|
params_pattern = r"@parameters:\s*(\[.*?\])"
|
|
params_match = re.search(params_pattern, content, re.DOTALL | re.IGNORECASE)
|
|
if params_match:
|
|
try:
|
|
metadata["parameters"] = json.loads(params_match.group(1))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return metadata
|
|
|
|
def update_script_from_file(self, script: Script, script_path: Path):
|
|
"""Update script metadata from file."""
|
|
metadata = self.parse_script_header(script_path)
|
|
|
|
# Update fields that should be refreshed from file
|
|
if "description" in metadata:
|
|
script.description = metadata["description"]
|
|
if "required_level" in metadata:
|
|
script.required_level = metadata["required_level"]
|
|
if "parameters" in metadata:
|
|
script.set_parameters(metadata["parameters"])
|
|
if "tags" in metadata:
|
|
script.set_tags_list(metadata["tags"])
|
|
|
|
script.last_modified = datetime.fromtimestamp(script_path.stat().st_mtime)
|
|
|
|
def start_file_watcher(self):
|
|
"""Start file system watcher for real-time discovery."""
|
|
if not self.backend_path.exists():
|
|
return
|
|
|
|
class ScriptFileHandler(FileSystemEventHandler):
|
|
def __init__(self, discovery_service):
|
|
self.discovery_service = discovery_service
|
|
|
|
def on_modified(self, event):
|
|
if not event.is_directory:
|
|
# Re-scan when files are modified
|
|
self.discovery_service.scan_script_groups()
|
|
|
|
def on_created(self, event):
|
|
if not event.is_directory:
|
|
# Re-scan when new files are created
|
|
self.discovery_service.scan_script_groups()
|
|
|
|
handler = ScriptFileHandler(self)
|
|
self.observer = Observer()
|
|
self.observer.schedule(handler, str(self.backend_path), recursive=True)
|
|
self.observer.start()
|
|
|
|
def stop_file_watcher(self):
|
|
"""Stop file system watcher."""
|
|
if self.observer:
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
self.observer = None
|
|
|
|
def update_group_metadata_file(self, group: ScriptGroup, updates: Dict):
|
|
"""Update specific fields in the group's metadata.json file."""
|
|
group_path = Path(group.directory_path)
|
|
metadata_file = group_path / "metadata.json"
|
|
|
|
if not metadata_file.exists():
|
|
print(
|
|
f"WARNING: metadata.json not found for group {group.name} at {metadata_file}"
|
|
)
|
|
return False
|
|
|
|
try:
|
|
# Read existing metadata
|
|
with open(metadata_file, "r", encoding="utf-8") as f:
|
|
metadata = json.load(f)
|
|
|
|
# Update with new values
|
|
metadata.update(updates)
|
|
|
|
# Write back to file
|
|
with open(metadata_file, "w", encoding="utf-8") as f:
|
|
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"DEBUG: Successfully updated {metadata_file} with {updates}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to update metadata.json for group {group.name}: {e}")
|
|
return False
|