145 lines
5.1 KiB
Python
145 lines
5.1 KiB
Python
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import LoginManager
|
|
from datetime import datetime
|
|
import os
|
|
import logging
|
|
from sqlalchemy.engine import Engine
|
|
from sqlalchemy import event, text
|
|
import sqlite3
|
|
|
|
db = SQLAlchemy()
|
|
login_manager = LoginManager()
|
|
|
|
# Configure logging for database operations
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@event.listens_for(Engine, "connect")
|
|
def set_sqlite_pragma(dbapi_connection, connection_record):
|
|
"""Set SQLite-specific pragmas for better performance and foreign key support."""
|
|
if 'sqlite' in str(dbapi_connection):
|
|
cursor = dbapi_connection.cursor()
|
|
cursor.execute("PRAGMA foreign_keys=ON")
|
|
cursor.execute("PRAGMA journal_mode=WAL")
|
|
cursor.execute("PRAGMA synchronous=NORMAL")
|
|
cursor.execute("PRAGMA cache_size=10000")
|
|
cursor.execute("PRAGMA temp_store=MEMORY")
|
|
cursor.close()
|
|
|
|
|
|
def init_db(app):
|
|
"""Initialize database with Flask app."""
|
|
db.init_app(app)
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = "login"
|
|
login_manager.login_message = "Please log in to access this page."
|
|
|
|
with app.app_context():
|
|
try:
|
|
# Get database configuration
|
|
from .config import Config
|
|
db_config = Config.get_database_config()
|
|
|
|
if db_config['engine'] == 'postgresql':
|
|
logger.info(f"Connecting to PostgreSQL database: {db_config['host']}:{db_config['port']}/{db_config['database']}")
|
|
|
|
# Test PostgreSQL connection
|
|
try:
|
|
with db.engine.connect() as connection:
|
|
connection.execute(text("SELECT 1"))
|
|
logger.info("PostgreSQL connection successful")
|
|
except Exception as e:
|
|
logger.error(f"PostgreSQL connection failed: {e}")
|
|
raise
|
|
|
|
elif db_config['engine'] == 'sqlite':
|
|
logger.info(f"Using SQLite database: {db_config.get('file', ':memory:')}")
|
|
|
|
# Create all tables
|
|
db.create_all()
|
|
logger.info("Database tables created successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Database initialization failed: {e}")
|
|
raise
|
|
|
|
return db
|
|
|
|
|
|
def get_db_info():
|
|
"""Get database connection information."""
|
|
try:
|
|
from .config import Config
|
|
db_config = Config.get_database_config()
|
|
|
|
if db_config['engine'] == 'postgresql':
|
|
# Get PostgreSQL version and connection info
|
|
with db.engine.connect() as connection:
|
|
result = connection.execute(text("SELECT version()"))
|
|
version = result.fetchone()[0]
|
|
|
|
return {
|
|
'engine': 'PostgreSQL',
|
|
'version': version.split()[1],
|
|
'host': db_config['host'],
|
|
'port': db_config['port'],
|
|
'database': db_config['database'],
|
|
'connection_pool_size': db.engine.pool.size(),
|
|
'checked_out_connections': db.engine.pool.checkedout(),
|
|
}
|
|
elif db_config['engine'] == 'sqlite':
|
|
# Get SQLite version
|
|
with db.engine.connect() as connection:
|
|
result = connection.execute(text("SELECT sqlite_version()"))
|
|
version = result.fetchone()[0]
|
|
|
|
return {
|
|
'engine': 'SQLite',
|
|
'version': version,
|
|
'file': db_config.get('file', ':memory:'),
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get database info: {e}")
|
|
return {'error': str(e)}
|
|
|
|
|
|
def check_db_health():
|
|
"""Check database health and connectivity."""
|
|
try:
|
|
from .config import Config
|
|
db_config = Config.get_database_config()
|
|
|
|
if db_config['engine'] == 'postgresql':
|
|
# Check PostgreSQL health
|
|
with db.engine.connect() as connection:
|
|
result = connection.execute(text("SELECT 1"))
|
|
result.fetchone()
|
|
|
|
# Check connection pool status
|
|
pool = db.engine.pool
|
|
return {
|
|
'status': 'healthy',
|
|
'engine': 'PostgreSQL',
|
|
'pool_size': pool.size(),
|
|
'checked_out': pool.checkedout(),
|
|
'overflow': pool.overflow(),
|
|
'checked_in': pool.checkedin(),
|
|
}
|
|
elif db_config['engine'] == 'sqlite':
|
|
# Check SQLite health
|
|
with db.engine.connect() as connection:
|
|
result = connection.execute(text("SELECT 1"))
|
|
result.fetchone()
|
|
|
|
return {
|
|
'status': 'healthy',
|
|
'engine': 'SQLite',
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Database health check failed: {e}")
|
|
return {
|
|
'status': 'unhealthy',
|
|
'error': str(e)
|
|
}
|