|
|
""" |
|
|
Database Migration System |
|
|
Handles schema versioning and migrations for SQLite database |
|
|
""" |
|
|
|
|
|
import sqlite3 |
|
|
import logging |
|
|
from typing import List, Callable, Tuple |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import traceback |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class Migration: |
|
|
"""Represents a single database migration""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
version: int, |
|
|
description: str, |
|
|
up_sql: str, |
|
|
down_sql: str = "" |
|
|
): |
|
|
""" |
|
|
Initialize migration |
|
|
|
|
|
Args: |
|
|
version: Migration version number (sequential) |
|
|
description: Human-readable description |
|
|
up_sql: SQL to apply migration |
|
|
down_sql: SQL to rollback migration |
|
|
""" |
|
|
self.version = version |
|
|
self.description = description |
|
|
self.up_sql = up_sql |
|
|
self.down_sql = down_sql |
|
|
|
|
|
|
|
|
class MigrationManager: |
|
|
""" |
|
|
Manages database schema migrations |
|
|
Tracks applied migrations and handles upgrades/downgrades |
|
|
""" |
|
|
|
|
|
def __init__(self, db_path: str): |
|
|
""" |
|
|
Initialize migration manager |
|
|
|
|
|
Args: |
|
|
db_path: Path to SQLite database file |
|
|
""" |
|
|
self.db_path = db_path |
|
|
self.migrations: List[Migration] = [] |
|
|
self._init_migrations_table() |
|
|
self._register_migrations() |
|
|
|
|
|
def _init_migrations_table(self): |
|
|
"""Create migrations tracking table if not exists""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS schema_migrations ( |
|
|
version INTEGER PRIMARY KEY, |
|
|
description TEXT NOT NULL, |
|
|
applied_at TIMESTAMP NOT NULL, |
|
|
execution_time_ms INTEGER |
|
|
) |
|
|
""") |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
logger.info("Migrations table initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize migrations table: {e}") |
|
|
raise |
|
|
|
|
|
def _register_migrations(self): |
|
|
"""Register all migrations in order""" |
|
|
|
|
|
|
|
|
self.migrations.append(Migration( |
|
|
version=1, |
|
|
description="Add whale tracking table", |
|
|
up_sql=""" |
|
|
CREATE TABLE IF NOT EXISTS whale_transactions ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
transaction_hash TEXT UNIQUE NOT NULL, |
|
|
blockchain TEXT NOT NULL, |
|
|
from_address TEXT NOT NULL, |
|
|
to_address TEXT NOT NULL, |
|
|
amount REAL NOT NULL, |
|
|
token_symbol TEXT, |
|
|
usd_value REAL, |
|
|
timestamp TIMESTAMP NOT NULL, |
|
|
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
); |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_whale_timestamp |
|
|
ON whale_transactions(timestamp); |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_whale_blockchain |
|
|
ON whale_transactions(blockchain); |
|
|
""", |
|
|
down_sql="DROP TABLE IF EXISTS whale_transactions;" |
|
|
)) |
|
|
|
|
|
|
|
|
self.migrations.append(Migration( |
|
|
version=2, |
|
|
description="Add performance indices", |
|
|
up_sql=""" |
|
|
CREATE INDEX IF NOT EXISTS idx_prices_symbol_timestamp |
|
|
ON prices(symbol, timestamp); |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_news_published_date |
|
|
ON news(published_date DESC); |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_analysis_symbol_timestamp |
|
|
ON market_analysis(symbol, timestamp DESC); |
|
|
""", |
|
|
down_sql=""" |
|
|
DROP INDEX IF EXISTS idx_prices_symbol_timestamp; |
|
|
DROP INDEX IF EXISTS idx_news_published_date; |
|
|
DROP INDEX IF EXISTS idx_analysis_symbol_timestamp; |
|
|
""" |
|
|
)) |
|
|
|
|
|
|
|
|
self.migrations.append(Migration( |
|
|
version=3, |
|
|
description="Add API key tracking table", |
|
|
up_sql=""" |
|
|
CREATE TABLE IF NOT EXISTS api_key_usage ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
api_key_hash TEXT NOT NULL, |
|
|
endpoint TEXT NOT NULL, |
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
response_time_ms INTEGER, |
|
|
status_code INTEGER, |
|
|
ip_address TEXT |
|
|
); |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_api_usage_timestamp |
|
|
ON api_key_usage(timestamp); |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_api_usage_key |
|
|
ON api_key_usage(api_key_hash); |
|
|
""", |
|
|
down_sql="DROP TABLE IF EXISTS api_key_usage;" |
|
|
)) |
|
|
|
|
|
|
|
|
self.migrations.append(Migration( |
|
|
version=4, |
|
|
description="Enhance user queries table with metadata", |
|
|
up_sql=""" |
|
|
CREATE TABLE IF NOT EXISTS user_queries_v2 ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
query TEXT NOT NULL, |
|
|
query_type TEXT, |
|
|
result_count INTEGER, |
|
|
execution_time_ms INTEGER, |
|
|
user_id TEXT, |
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
); |
|
|
|
|
|
-- Migrate old data if exists |
|
|
INSERT INTO user_queries_v2 (query, result_count, timestamp) |
|
|
SELECT query, result_count, timestamp |
|
|
FROM user_queries |
|
|
WHERE EXISTS (SELECT 1 FROM sqlite_master WHERE type='table' AND name='user_queries'); |
|
|
|
|
|
DROP TABLE IF EXISTS user_queries; |
|
|
|
|
|
ALTER TABLE user_queries_v2 RENAME TO user_queries; |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_user_queries_timestamp |
|
|
ON user_queries(timestamp); |
|
|
""", |
|
|
down_sql="-- Cannot rollback data migration" |
|
|
)) |
|
|
|
|
|
|
|
|
self.migrations.append(Migration( |
|
|
version=5, |
|
|
description="Add cache metadata table", |
|
|
up_sql=""" |
|
|
CREATE TABLE IF NOT EXISTS cache_metadata ( |
|
|
cache_key TEXT PRIMARY KEY, |
|
|
data_type TEXT NOT NULL, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
expires_at TIMESTAMP NOT NULL, |
|
|
hit_count INTEGER DEFAULT 0, |
|
|
size_bytes INTEGER |
|
|
); |
|
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_cache_expires |
|
|
ON cache_metadata(expires_at); |
|
|
""", |
|
|
down_sql="DROP TABLE IF EXISTS cache_metadata;" |
|
|
)) |
|
|
|
|
|
logger.info(f"Registered {len(self.migrations)} migrations") |
|
|
|
|
|
def get_current_version(self) -> int: |
|
|
""" |
|
|
Get current database schema version |
|
|
|
|
|
Returns: |
|
|
Current version number (0 if no migrations applied) |
|
|
""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute( |
|
|
"SELECT MAX(version) FROM schema_migrations" |
|
|
) |
|
|
result = cursor.fetchone() |
|
|
|
|
|
conn.close() |
|
|
|
|
|
return result[0] if result[0] is not None else 0 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get current version: {e}") |
|
|
return 0 |
|
|
|
|
|
def get_pending_migrations(self) -> List[Migration]: |
|
|
""" |
|
|
Get list of pending migrations |
|
|
|
|
|
Returns: |
|
|
List of migrations not yet applied |
|
|
""" |
|
|
current_version = self.get_current_version() |
|
|
|
|
|
return [ |
|
|
migration for migration in self.migrations |
|
|
if migration.version > current_version |
|
|
] |
|
|
|
|
|
def apply_migration(self, migration: Migration) -> bool: |
|
|
""" |
|
|
Apply a single migration |
|
|
|
|
|
Args: |
|
|
migration: Migration to apply |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
start_time = datetime.now() |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.executescript(migration.up_sql) |
|
|
|
|
|
|
|
|
execution_time = int((datetime.now() - start_time).total_seconds() * 1000) |
|
|
|
|
|
cursor.execute( |
|
|
""" |
|
|
INSERT INTO schema_migrations |
|
|
(version, description, applied_at, execution_time_ms) |
|
|
VALUES (?, ?, ?, ?) |
|
|
""", |
|
|
( |
|
|
migration.version, |
|
|
migration.description, |
|
|
datetime.now(), |
|
|
execution_time |
|
|
) |
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
logger.info( |
|
|
f"Applied migration {migration.version}: {migration.description} " |
|
|
f"({execution_time}ms)" |
|
|
) |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"Failed to apply migration {migration.version}: {e}\n" |
|
|
f"{traceback.format_exc()}" |
|
|
) |
|
|
return False |
|
|
|
|
|
def migrate_to_latest(self) -> Tuple[bool, List[int]]: |
|
|
""" |
|
|
Apply all pending migrations |
|
|
|
|
|
Returns: |
|
|
Tuple of (success: bool, applied_versions: List[int]) |
|
|
""" |
|
|
pending = self.get_pending_migrations() |
|
|
|
|
|
if not pending: |
|
|
logger.info("No pending migrations") |
|
|
return True, [] |
|
|
|
|
|
logger.info(f"Applying {len(pending)} pending migrations...") |
|
|
|
|
|
applied = [] |
|
|
for migration in pending: |
|
|
if self.apply_migration(migration): |
|
|
applied.append(migration.version) |
|
|
else: |
|
|
logger.error(f"Migration failed at version {migration.version}") |
|
|
return False, applied |
|
|
|
|
|
logger.info(f"Successfully applied {len(applied)} migrations") |
|
|
return True, applied |
|
|
|
|
|
def rollback_migration(self, version: int) -> bool: |
|
|
""" |
|
|
Rollback a specific migration |
|
|
|
|
|
Args: |
|
|
version: Migration version to rollback |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
migration = next( |
|
|
(m for m in self.migrations if m.version == version), |
|
|
None |
|
|
) |
|
|
|
|
|
if not migration: |
|
|
logger.error(f"Migration {version} not found") |
|
|
return False |
|
|
|
|
|
if not migration.down_sql: |
|
|
logger.error(f"Migration {version} has no rollback SQL") |
|
|
return False |
|
|
|
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.executescript(migration.down_sql) |
|
|
|
|
|
|
|
|
cursor.execute( |
|
|
"DELETE FROM schema_migrations WHERE version = ?", |
|
|
(version,) |
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
logger.info(f"Rolled back migration {version}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to rollback migration {version}: {e}") |
|
|
return False |
|
|
|
|
|
def get_migration_history(self) -> List[Tuple[int, str, str]]: |
|
|
""" |
|
|
Get migration history |
|
|
|
|
|
Returns: |
|
|
List of (version, description, applied_at) tuples |
|
|
""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT version, description, applied_at |
|
|
FROM schema_migrations |
|
|
ORDER BY version |
|
|
""") |
|
|
|
|
|
history = cursor.fetchall() |
|
|
conn.close() |
|
|
|
|
|
return history |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get migration history: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def auto_migrate(db_path: str) -> bool: |
|
|
""" |
|
|
Automatically apply all pending migrations on startup |
|
|
|
|
|
Args: |
|
|
db_path: Path to database file |
|
|
|
|
|
Returns: |
|
|
True if all migrations applied successfully |
|
|
""" |
|
|
try: |
|
|
manager = MigrationManager(db_path) |
|
|
current = manager.get_current_version() |
|
|
logger.info(f"Current schema version: {current}") |
|
|
|
|
|
success, applied = manager.migrate_to_latest() |
|
|
|
|
|
if success and applied: |
|
|
logger.info(f"Database migrated to version {max(applied)}") |
|
|
elif success: |
|
|
logger.info("Database already at latest version") |
|
|
else: |
|
|
logger.error("Migration failed") |
|
|
|
|
|
return success |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Auto-migration failed: {e}") |
|
|
return False |
|
|
|