SIDEL_ScriptsManager/scripts/init_db.py

589 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Database initialization script for ScriptsManager
Creates the database tables and initial data
"""
import os
import sys
import json
from pathlib import Path
# Add the project root to the Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from app import create_app
from app.config.database import db
from app.models.user import User, UserRole
from app.models.script import Script, ScriptGroup
from app.services.script_discovery import ScriptDiscoveryService
from sqlalchemy.exc import IntegrityError
def init_database():
"""Initialize the database with tables and basic data"""
print("Initializing ScriptsManager database...")
# Create application instance
app = create_app()
with app.app_context():
try:
# Check if database is already initialized
from sqlalchemy import inspect
inspector = inspect(db.engine)
existing_tables = inspector.get_table_names()
if existing_tables:
print(f"Database already initialized with {len(existing_tables)} tables.")
print("Skipping table creation and data initialization.")
# Still discover script groups in case new ones were added
print("Updating script group discovery...")
discover_script_groups()
print("Database update completed successfully!")
return
# Only create tables if database is empty
print("Creating database tables...")
db.create_all()
# Create default admin user
print("Creating default admin user...")
create_admin_user()
# Create backend directory structure
print("Setting up backend directory...")
setup_backend_structure()
# Discover and register script groups
print("Discovering script groups...")
discover_script_groups()
print("Database initialization completed successfully!")
print("\nDefault admin credentials:")
print("Username: admin")
print("Password: admin123")
print("\nIMPORTANT: Change admin password after first login!")
except Exception as e:
print(f"Error during database initialization: {e}")
raise
def create_admin_user():
"""Create the default admin user"""
try:
# Check if admin user already exists
admin_user = User.query.filter_by(username="admin").first()
if admin_user:
print("Admin user already exists, skipping creation.")
return
# Create admin user
admin_user = User(
username="admin",
email="admin@scriptsmanager.local",
user_level=UserRole.ADMIN.value,
preferred_language="en",
preferred_theme="light",
)
admin_user.set_password("admin123")
db.session.add(admin_user)
db.session.commit()
print("✓ Admin user created successfully")
except IntegrityError as e:
db.session.rollback()
print(f"Error creating admin user: {e}")
raise
def setup_backend_structure():
"""Set up the backend directory structure"""
backend_dir = project_root / "backend" / "script_groups"
backend_dir.mkdir(parents=True, exist_ok=True)
# Create example script groups
create_data_processing_group(backend_dir)
create_system_utilities_group(backend_dir)
print("✓ Backend directory structure created")
def create_data_processing_group(backend_dir):
"""Create data processing example group"""
group_dir = backend_dir / "data_processing"
group_dir.mkdir(exist_ok=True)
# Create metadata.json
metadata = {
"name": "data_processing",
"display_name": "Data Processing Tools",
"description": "Scripts for data analysis and processing tasks",
"conda_env": "data_science",
"scripts": ["data_analyzer.py", "csv_processor.py", "report_generator.py"],
}
with open(group_dir / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
# Create example scripts
create_data_analyzer_script(group_dir)
create_csv_processor_script(group_dir)
create_report_generator_script(group_dir)
def create_system_utilities_group(backend_dir):
"""Create system utilities example group"""
group_dir = backend_dir / "system_utilities"
group_dir.mkdir(exist_ok=True)
# Create metadata.json
metadata = {
"name": "system_utilities",
"display_name": "System Utilities",
"description": "System administration and utility scripts",
"conda_env": "system_tools",
"scripts": ["disk_cleanup.py", "log_analyzer.py", "backup_manager.py"],
}
with open(group_dir / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
# Create example scripts
create_disk_cleanup_script(group_dir)
create_log_analyzer_script(group_dir)
create_backup_manager_script(group_dir)
def create_data_analyzer_script(group_dir):
"""Create data analyzer example script"""
script_content = '''#!/usr/bin/env python3
"""
Data Analyzer
Analyzes data files and generates statistics
Parameters:
- input_file: Input File Path
- output_format: Output Format
Interface: No
"""
import sys
import json
import time
import argparse
from pathlib import Path
def main():
"""Main execution function"""
parser = argparse.ArgumentParser(description="Analyzes data files and generates statistics")
parser.add_argument("--input_file", help="Input File Path", required=True)
parser.add_argument("--output_format", help="Output Format", default="json")
args = parser.parse_args()
print(f"Starting Data Analyzer...")
print(f"Parameters: {args}")
# Simulate some work
for i in range(5):
print(f"Processing step {i+1}/5...")
time.sleep(1)
print(f"Data Analyzer completed successfully!")
# Example result
result = {
"status": "completed",
"message": "Data Analyzer executed successfully",
"parameters": vars(args),
"timestamp": time.time()
}
print(json.dumps(result, indent=2))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\\nScript interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
'''
with open(group_dir / "data_analyzer.py", "w") as f:
f.write(script_content)
def create_csv_processor_script(group_dir):
"""Create CSV processor example script"""
script_content = '''#!/usr/bin/env python3
"""
CSV Processor
Processes and cleans CSV files
Parameters:
- csv_file: CSV File Path
- remove_duplicates: Remove Duplicates
- max_rows: Max Rows to Process
Interface: No
"""
import sys
import json
import time
import argparse
from pathlib import Path
def main():
"""Main execution function"""
parser = argparse.ArgumentParser(description="Processes and cleans CSV files")
parser.add_argument("--csv_file", help="CSV File Path", required=True)
parser.add_argument("--remove_duplicates", help="Remove Duplicates", default="true")
parser.add_argument("--max_rows", help="Max Rows to Process", default="10000")
args = parser.parse_args()
print(f"Starting CSV Processor...")
print(f"Parameters: {args}")
# Simulate some work
for i in range(5):
print(f"Processing step {i+1}/5...")
time.sleep(1)
print(f"CSV Processor completed successfully!")
# Example result
result = {
"status": "completed",
"message": "CSV Processor executed successfully",
"parameters": vars(args),
"timestamp": time.time()
}
print(json.dumps(result, indent=2))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\\nScript interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
'''
with open(group_dir / "csv_processor.py", "w") as f:
f.write(script_content)
def create_report_generator_script(group_dir):
"""Create report generator example script"""
script_content = '''#!/usr/bin/env python3
"""
Report Generator
Generates formatted reports from processed data
Parameters:
- data_source: Data Source
- report_type: Report Type
Interface: Yes
"""
import sys
import json
import time
import argparse
from pathlib import Path
def main():
"""Main execution function"""
parser = argparse.ArgumentParser(description="Generates formatted reports from processed data")
parser.add_argument("--data_source", help="Data Source", required=True)
parser.add_argument("--report_type", help="Report Type", default="summary")
args = parser.parse_args()
print(f"Starting Report Generator...")
print(f"Parameters: {args}")
# Simulate some work
for i in range(5):
print(f"Processing step {i+1}/5...")
time.sleep(1)
print(f"Report Generator completed successfully!")
# Example result
result = {
"status": "completed",
"message": "Report Generator executed successfully",
"parameters": vars(args),
"timestamp": time.time()
}
print(json.dumps(result, indent=2))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\\nScript interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
'''
with open(group_dir / "report_generator.py", "w") as f:
f.write(script_content)
def create_disk_cleanup_script(group_dir):
"""Create disk cleanup example script"""
script_content = '''#!/usr/bin/env python3
"""
Disk Cleanup
Cleans up temporary files and free disk space
Parameters:
- target_directory: Target Directory
- file_age_days: Delete files older than (days)
- dry_run: Dry Run (don't actually delete)
Interface: No
"""
import sys
import json
import time
import argparse
from pathlib import Path
def main():
"""Main execution function"""
parser = argparse.ArgumentParser(description="Cleans up temporary files and free disk space")
parser.add_argument("--target_directory", help="Target Directory", required=True)
parser.add_argument("--file_age_days", help="Delete files older than (days)", default="30")
parser.add_argument("--dry_run", help="Dry Run (don't actually delete)", default="true")
args = parser.parse_args()
print(f"Starting Disk Cleanup...")
print(f"Parameters: {args}")
# Simulate some work
for i in range(5):
print(f"Processing step {i+1}/5...")
time.sleep(1)
print(f"Disk Cleanup completed successfully!")
# Example result
result = {
"status": "completed",
"message": "Disk Cleanup executed successfully",
"parameters": vars(args),
"timestamp": time.time()
}
print(json.dumps(result, indent=2))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\\nScript interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
'''
with open(group_dir / "disk_cleanup.py", "w") as f:
f.write(script_content)
def create_log_analyzer_script(group_dir):
"""Create log analyzer example script"""
script_content = '''#!/usr/bin/env python3
"""
Log Analyzer
Analyzes system log files for errors and patterns
Parameters:
- log_file: Log File Path
- error_level: Minimum Level
Interface: Yes
"""
import sys
import json
import time
import argparse
from pathlib import Path
def main():
"""Main execution function"""
parser = argparse.ArgumentParser(description="Analyzes system log files for errors and patterns")
parser.add_argument("--log_file", help="Log File Path", required=True)
parser.add_argument("--error_level", help="Minimum Level", default="error")
args = parser.parse_args()
print(f"Starting Log Analyzer...")
print(f"Parameters: {args}")
# Simulate some work
for i in range(5):
print(f"Processing step {i+1}/5...")
time.sleep(1)
print(f"Log Analyzer completed successfully!")
# Example result
result = {
"status": "completed",
"message": "Log Analyzer executed successfully",
"parameters": vars(args),
"timestamp": time.time()
}
print(json.dumps(result, indent=2))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\\nScript interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
'''
with open(group_dir / "log_analyzer.py", "w") as f:
f.write(script_content)
def create_backup_manager_script(group_dir):
"""Create backup manager example script"""
script_content = '''#!/usr/bin/env python3
"""
Backup Manager
Creates and manages system backups
Parameters:
- source_path: Source Path
- backup_path: Backup Destination
- compression: Compression Type
Interface: No
"""
import sys
import json
import time
import argparse
from pathlib import Path
def main():
"""Main execution function"""
parser = argparse.ArgumentParser(description="Creates and manages system backups")
parser.add_argument("--source_path", help="Source Path", required=True)
parser.add_argument("--backup_path", help="Backup Destination", required=True)
parser.add_argument("--compression", help="Compression Type", default="zip")
args = parser.parse_args()
print(f"Starting Backup Manager...")
print(f"Parameters: {args}")
# Simulate some work
for i in range(5):
print(f"Processing step {i+1}/5...")
time.sleep(1)
print(f"Backup Manager completed successfully!")
# Example result
result = {
"status": "completed",
"message": "Backup Manager executed successfully",
"parameters": vars(args),
"timestamp": time.time()
}
print(json.dumps(result, indent=2))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\\nScript interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
'''
with open(group_dir / "backup_manager.py", "w") as f:
f.write(script_content)
def discover_script_groups():
"""Discover and register script groups from the backend directory"""
try:
discovery_service = ScriptDiscoveryService()
# Get the backend script groups directory
backend_dir = project_root / "backend" / "script_groups"
# Set the backend path for the service
discovery_service.backend_path = backend_dir
# Discover script groups
groups = discovery_service.scan_script_groups()
print(f"✓ Discovered and registered {len(groups)} script groups")
except Exception as e:
db.session.rollback()
print(f"Error discovering script groups: {e}")
raise
if __name__ == "__main__":
init_database()