File size: 13,195 Bytes
e4e4574
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
"""
Database Migration System
Handles schema versioning and migrations for SQLite database
"""

import sqlite3
import logging
from typing import List, Callable, Tuple
from datetime import datetime
from pathlib import Path
import traceback

logger = logging.getLogger(__name__)


class Migration:
    """Represents a single database migration"""

    def __init__(
        self,
        version: int,
        description: str,
        up_sql: str,
        down_sql: str = ""
    ):
        """
        Initialize migration

        Args:
            version: Migration version number (sequential)
            description: Human-readable description
            up_sql: SQL to apply migration
            down_sql: SQL to rollback migration
        """
        self.version = version
        self.description = description
        self.up_sql = up_sql
        self.down_sql = down_sql


class MigrationManager:
    """
    Manages database schema migrations
    Tracks applied migrations and handles upgrades/downgrades
    """

    def __init__(self, db_path: str):
        """
        Initialize migration manager

        Args:
            db_path: Path to SQLite database file
        """
        self.db_path = db_path
        self.migrations: List[Migration] = []
        self._init_migrations_table()
        self._register_migrations()

    def _init_migrations_table(self):
        """Create migrations tracking table if not exists"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            cursor.execute("""
                CREATE TABLE IF NOT EXISTS schema_migrations (
                    version INTEGER PRIMARY KEY,
                    description TEXT NOT NULL,
                    applied_at TIMESTAMP NOT NULL,
                    execution_time_ms INTEGER
                )
            """)

            conn.commit()
            conn.close()

            logger.info("Migrations table initialized")

        except Exception as e:
            logger.error(f"Failed to initialize migrations table: {e}")
            raise

    def _register_migrations(self):
        """Register all migrations in order"""

        # Migration 1: Add whale tracking table
        self.migrations.append(Migration(
            version=1,
            description="Add whale tracking table",
            up_sql="""
                CREATE TABLE IF NOT EXISTS whale_transactions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    transaction_hash TEXT UNIQUE NOT NULL,
                    blockchain TEXT NOT NULL,
                    from_address TEXT NOT NULL,
                    to_address TEXT NOT NULL,
                    amount REAL NOT NULL,
                    token_symbol TEXT,
                    usd_value REAL,
                    timestamp TIMESTAMP NOT NULL,
                    detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );

                CREATE INDEX IF NOT EXISTS idx_whale_timestamp
                ON whale_transactions(timestamp);

                CREATE INDEX IF NOT EXISTS idx_whale_blockchain
                ON whale_transactions(blockchain);
            """,
            down_sql="DROP TABLE IF EXISTS whale_transactions;"
        ))

        # Migration 2: Add indices for performance
        self.migrations.append(Migration(
            version=2,
            description="Add performance indices",
            up_sql="""
                CREATE INDEX IF NOT EXISTS idx_prices_symbol_timestamp
                ON prices(symbol, timestamp);

                CREATE INDEX IF NOT EXISTS idx_news_published_date
                ON news(published_date DESC);

                CREATE INDEX IF NOT EXISTS idx_analysis_symbol_timestamp
                ON market_analysis(symbol, timestamp DESC);
            """,
            down_sql="""
                DROP INDEX IF EXISTS idx_prices_symbol_timestamp;
                DROP INDEX IF EXISTS idx_news_published_date;
                DROP INDEX IF EXISTS idx_analysis_symbol_timestamp;
            """
        ))

        # Migration 3: Add API key tracking
        self.migrations.append(Migration(
            version=3,
            description="Add API key tracking table",
            up_sql="""
                CREATE TABLE IF NOT EXISTS api_key_usage (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    api_key_hash TEXT NOT NULL,
                    endpoint TEXT NOT NULL,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    response_time_ms INTEGER,
                    status_code INTEGER,
                    ip_address TEXT
                );

                CREATE INDEX IF NOT EXISTS idx_api_usage_timestamp
                ON api_key_usage(timestamp);

                CREATE INDEX IF NOT EXISTS idx_api_usage_key
                ON api_key_usage(api_key_hash);
            """,
            down_sql="DROP TABLE IF EXISTS api_key_usage;"
        ))

        # Migration 4: Add user queries metadata
        self.migrations.append(Migration(
            version=4,
            description="Enhance user queries table with metadata",
            up_sql="""
                CREATE TABLE IF NOT EXISTS user_queries_v2 (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    query TEXT NOT NULL,
                    query_type TEXT,
                    result_count INTEGER,
                    execution_time_ms INTEGER,
                    user_id TEXT,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );

                -- Migrate old data if exists
                INSERT INTO user_queries_v2 (query, result_count, timestamp)
                SELECT query, result_count, timestamp
                FROM user_queries
                WHERE EXISTS (SELECT 1 FROM sqlite_master WHERE type='table' AND name='user_queries');

                DROP TABLE IF EXISTS user_queries;

                ALTER TABLE user_queries_v2 RENAME TO user_queries;

                CREATE INDEX IF NOT EXISTS idx_user_queries_timestamp
                ON user_queries(timestamp);
            """,
            down_sql="-- Cannot rollback data migration"
        ))

        # Migration 5: Add caching metadata table
        self.migrations.append(Migration(
            version=5,
            description="Add cache metadata table",
            up_sql="""
                CREATE TABLE IF NOT EXISTS cache_metadata (
                    cache_key TEXT PRIMARY KEY,
                    data_type TEXT NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    expires_at TIMESTAMP NOT NULL,
                    hit_count INTEGER DEFAULT 0,
                    size_bytes INTEGER
                );

                CREATE INDEX IF NOT EXISTS idx_cache_expires
                ON cache_metadata(expires_at);
            """,
            down_sql="DROP TABLE IF EXISTS cache_metadata;"
        ))

        logger.info(f"Registered {len(self.migrations)} migrations")

    def get_current_version(self) -> int:
        """
        Get current database schema version

        Returns:
            Current version number (0 if no migrations applied)
        """
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            cursor.execute(
                "SELECT MAX(version) FROM schema_migrations"
            )
            result = cursor.fetchone()

            conn.close()

            return result[0] if result[0] is not None else 0

        except Exception as e:
            logger.error(f"Failed to get current version: {e}")
            return 0

    def get_pending_migrations(self) -> List[Migration]:
        """
        Get list of pending migrations

        Returns:
            List of migrations not yet applied
        """
        current_version = self.get_current_version()

        return [
            migration for migration in self.migrations
            if migration.version > current_version
        ]

    def apply_migration(self, migration: Migration) -> bool:
        """
        Apply a single migration

        Args:
            migration: Migration to apply

        Returns:
            True if successful, False otherwise
        """
        try:
            start_time = datetime.now()

            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            # Execute migration SQL
            cursor.executescript(migration.up_sql)

            # Record migration
            execution_time = int((datetime.now() - start_time).total_seconds() * 1000)

            cursor.execute(
                """
                INSERT INTO schema_migrations
                (version, description, applied_at, execution_time_ms)
                VALUES (?, ?, ?, ?)
                """,
                (
                    migration.version,
                    migration.description,
                    datetime.now(),
                    execution_time
                )
            )

            conn.commit()
            conn.close()

            logger.info(
                f"Applied migration {migration.version}: {migration.description} "
                f"({execution_time}ms)"
            )

            return True

        except Exception as e:
            logger.error(
                f"Failed to apply migration {migration.version}: {e}\n"
                f"{traceback.format_exc()}"
            )
            return False

    def migrate_to_latest(self) -> Tuple[bool, List[int]]:
        """
        Apply all pending migrations

        Returns:
            Tuple of (success: bool, applied_versions: List[int])
        """
        pending = self.get_pending_migrations()

        if not pending:
            logger.info("No pending migrations")
            return True, []

        logger.info(f"Applying {len(pending)} pending migrations...")

        applied = []
        for migration in pending:
            if self.apply_migration(migration):
                applied.append(migration.version)
            else:
                logger.error(f"Migration failed at version {migration.version}")
                return False, applied

        logger.info(f"Successfully applied {len(applied)} migrations")
        return True, applied

    def rollback_migration(self, version: int) -> bool:
        """
        Rollback a specific migration

        Args:
            version: Migration version to rollback

        Returns:
            True if successful, False otherwise
        """
        migration = next(
            (m for m in self.migrations if m.version == version),
            None
        )

        if not migration:
            logger.error(f"Migration {version} not found")
            return False

        if not migration.down_sql:
            logger.error(f"Migration {version} has no rollback SQL")
            return False

        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            # Execute rollback SQL
            cursor.executescript(migration.down_sql)

            # Remove migration record
            cursor.execute(
                "DELETE FROM schema_migrations WHERE version = ?",
                (version,)
            )

            conn.commit()
            conn.close()

            logger.info(f"Rolled back migration {version}")
            return True

        except Exception as e:
            logger.error(f"Failed to rollback migration {version}: {e}")
            return False

    def get_migration_history(self) -> List[Tuple[int, str, str]]:
        """
        Get migration history

        Returns:
            List of (version, description, applied_at) tuples
        """
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            cursor.execute("""
                SELECT version, description, applied_at
                FROM schema_migrations
                ORDER BY version
            """)

            history = cursor.fetchall()
            conn.close()

            return history

        except Exception as e:
            logger.error(f"Failed to get migration history: {e}")
            return []


# ==================== CONVENIENCE FUNCTIONS ====================


def auto_migrate(db_path: str) -> bool:
    """
    Automatically apply all pending migrations on startup

    Args:
        db_path: Path to database file

    Returns:
        True if all migrations applied successfully
    """
    try:
        manager = MigrationManager(db_path)
        current = manager.get_current_version()
        logger.info(f"Current schema version: {current}")

        success, applied = manager.migrate_to_latest()

        if success and applied:
            logger.info(f"Database migrated to version {max(applied)}")
        elif success:
            logger.info("Database already at latest version")
        else:
            logger.error("Migration failed")

        return success

    except Exception as e:
        logger.error(f"Auto-migration failed: {e}")
        return False