|
|
""" |
|
|
Database Manager Module |
|
|
Provides comprehensive database operations for the crypto API monitoring system |
|
|
""" |
|
|
|
|
|
import os |
|
|
from contextlib import contextmanager |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, List, Dict, Any, Tuple |
|
|
from pathlib import Path |
|
|
|
|
|
from sqlalchemy import create_engine, func, and_, or_, desc, text |
|
|
from sqlalchemy.orm import sessionmaker, Session |
|
|
from sqlalchemy.exc import SQLAlchemyError, IntegrityError |
|
|
|
|
|
from database.models import ( |
|
|
Base, |
|
|
Provider, |
|
|
ConnectionAttempt, |
|
|
DataCollection, |
|
|
RateLimitUsage, |
|
|
ScheduleConfig, |
|
|
ScheduleCompliance, |
|
|
FailureLog, |
|
|
Alert, |
|
|
SystemMetrics, |
|
|
ConnectionStatus, |
|
|
ProviderCategory, |
|
|
|
|
|
MarketPrice, |
|
|
NewsArticle, |
|
|
WhaleTransaction, |
|
|
SentimentMetric, |
|
|
GasPrice, |
|
|
BlockchainStat |
|
|
) |
|
|
from database.data_access import DataAccessMixin |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
|
|
|
logger = setup_logger("db_manager", level="INFO") |
|
|
|
|
|
|
|
|
class DatabaseManager(DataAccessMixin): |
|
|
""" |
|
|
Comprehensive database manager for API monitoring system |
|
|
Handles all database operations with proper error handling and logging |
|
|
""" |
|
|
|
|
|
def __init__(self, db_path: str = "data/api_monitor.db"): |
|
|
""" |
|
|
Initialize database manager |
|
|
|
|
|
Args: |
|
|
db_path: Path to SQLite database file |
|
|
""" |
|
|
self.db_path = db_path |
|
|
self._ensure_data_directory() |
|
|
|
|
|
|
|
|
db_url = f"sqlite:///{self.db_path}" |
|
|
self.engine = create_engine( |
|
|
db_url, |
|
|
echo=False, |
|
|
connect_args={"check_same_thread": False} |
|
|
) |
|
|
|
|
|
|
|
|
self.SessionLocal = sessionmaker( |
|
|
autocommit=False, |
|
|
autoflush=False, |
|
|
bind=self.engine, |
|
|
expire_on_commit=False |
|
|
) |
|
|
|
|
|
logger.info(f"Database manager initialized with database: {self.db_path}") |
|
|
|
|
|
def _ensure_data_directory(self): |
|
|
"""Ensure the data directory exists""" |
|
|
data_dir = Path(self.db_path).parent |
|
|
data_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
@contextmanager |
|
|
def get_session(self) -> Session: |
|
|
""" |
|
|
Context manager for database sessions |
|
|
Automatically handles commit/rollback and cleanup |
|
|
|
|
|
Yields: |
|
|
SQLAlchemy session |
|
|
|
|
|
Example: |
|
|
with db_manager.get_session() as session: |
|
|
provider = session.query(Provider).first() |
|
|
""" |
|
|
session = self.SessionLocal() |
|
|
try: |
|
|
yield session |
|
|
session.commit() |
|
|
except Exception as e: |
|
|
session.rollback() |
|
|
logger.error(f"Session error: {str(e)}", exc_info=True) |
|
|
raise |
|
|
finally: |
|
|
session.close() |
|
|
|
|
|
def init_database(self) -> bool: |
|
|
""" |
|
|
Initialize database by creating all tables |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
Base.metadata.create_all(bind=self.engine) |
|
|
logger.info("Database tables created successfully") |
|
|
return True |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to initialize database: {str(e)}", exc_info=True) |
|
|
return False |
|
|
|
|
|
def drop_all_tables(self) -> bool: |
|
|
""" |
|
|
Drop all tables (use with caution!) |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
Base.metadata.drop_all(bind=self.engine) |
|
|
logger.warning("All database tables dropped") |
|
|
return True |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to drop tables: {str(e)}", exc_info=True) |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_provider( |
|
|
self, |
|
|
name: str, |
|
|
category: str, |
|
|
endpoint_url: str, |
|
|
requires_key: bool = False, |
|
|
api_key_masked: Optional[str] = None, |
|
|
rate_limit_type: Optional[str] = None, |
|
|
rate_limit_value: Optional[int] = None, |
|
|
timeout_ms: int = 10000, |
|
|
priority_tier: int = 3 |
|
|
) -> Optional[Provider]: |
|
|
""" |
|
|
Create a new provider |
|
|
|
|
|
Args: |
|
|
name: Provider name |
|
|
category: Provider category |
|
|
endpoint_url: API endpoint URL |
|
|
requires_key: Whether API key is required |
|
|
api_key_masked: Masked API key for display |
|
|
rate_limit_type: Rate limit type (per_minute, per_hour, per_day) |
|
|
rate_limit_value: Rate limit value |
|
|
timeout_ms: Timeout in milliseconds |
|
|
priority_tier: Priority tier (1-4, 1 is highest) |
|
|
|
|
|
Returns: |
|
|
Created Provider object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
provider = Provider( |
|
|
name=name, |
|
|
category=category, |
|
|
endpoint_url=endpoint_url, |
|
|
requires_key=requires_key, |
|
|
api_key_masked=api_key_masked, |
|
|
rate_limit_type=rate_limit_type, |
|
|
rate_limit_value=rate_limit_value, |
|
|
timeout_ms=timeout_ms, |
|
|
priority_tier=priority_tier |
|
|
) |
|
|
session.add(provider) |
|
|
session.commit() |
|
|
session.refresh(provider) |
|
|
logger.info(f"Created provider: {name}") |
|
|
return provider |
|
|
except IntegrityError: |
|
|
logger.error(f"Provider already exists: {name}") |
|
|
return None |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to create provider {name}: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_provider(self, provider_id: Optional[int] = None, name: Optional[str] = None) -> Optional[Provider]: |
|
|
""" |
|
|
Get a provider by ID or name |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
name: Provider name |
|
|
|
|
|
Returns: |
|
|
Provider object or None if not found |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
if provider_id: |
|
|
provider = session.query(Provider).filter(Provider.id == provider_id).first() |
|
|
elif name: |
|
|
provider = session.query(Provider).filter(Provider.name == name).first() |
|
|
else: |
|
|
logger.warning("Either provider_id or name must be provided") |
|
|
return None |
|
|
|
|
|
if provider: |
|
|
session.refresh(provider) |
|
|
return provider |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get provider: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_all_providers(self, category: Optional[str] = None, enabled_only: bool = False) -> List[Provider]: |
|
|
""" |
|
|
Get all providers with optional filtering |
|
|
|
|
|
Args: |
|
|
category: Filter by category |
|
|
enabled_only: Only return enabled providers (based on schedule_config) |
|
|
|
|
|
Returns: |
|
|
List of Provider objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
query = session.query(Provider) |
|
|
|
|
|
if category: |
|
|
query = query.filter(Provider.category == category) |
|
|
|
|
|
if enabled_only: |
|
|
query = query.join(ScheduleConfig).filter(ScheduleConfig.enabled == True) |
|
|
|
|
|
providers = query.order_by(Provider.priority_tier, Provider.name).all() |
|
|
|
|
|
|
|
|
for provider in providers: |
|
|
session.refresh(provider) |
|
|
|
|
|
return providers |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get providers: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
def update_provider(self, provider_id: int, **kwargs) -> bool: |
|
|
""" |
|
|
Update a provider's attributes |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
**kwargs: Attributes to update |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
provider = session.query(Provider).filter(Provider.id == provider_id).first() |
|
|
if not provider: |
|
|
logger.warning(f"Provider not found: {provider_id}") |
|
|
return False |
|
|
|
|
|
for key, value in kwargs.items(): |
|
|
if hasattr(provider, key): |
|
|
setattr(provider, key, value) |
|
|
|
|
|
provider.updated_at = datetime.utcnow() |
|
|
session.commit() |
|
|
logger.info(f"Updated provider: {provider.name}") |
|
|
return True |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to update provider {provider_id}: {str(e)}", exc_info=True) |
|
|
return False |
|
|
|
|
|
def delete_provider(self, provider_id: int) -> bool: |
|
|
""" |
|
|
Delete a provider and all related records |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
provider = session.query(Provider).filter(Provider.id == provider_id).first() |
|
|
if not provider: |
|
|
logger.warning(f"Provider not found: {provider_id}") |
|
|
return False |
|
|
|
|
|
provider_name = provider.name |
|
|
session.delete(provider) |
|
|
session.commit() |
|
|
logger.info(f"Deleted provider: {provider_name}") |
|
|
return True |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to delete provider {provider_id}: {str(e)}", exc_info=True) |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_connection_attempt( |
|
|
self, |
|
|
provider_id: int, |
|
|
endpoint: str, |
|
|
status: str, |
|
|
response_time_ms: Optional[int] = None, |
|
|
http_status_code: Optional[int] = None, |
|
|
error_type: Optional[str] = None, |
|
|
error_message: Optional[str] = None, |
|
|
retry_count: int = 0, |
|
|
retry_result: Optional[str] = None |
|
|
) -> Optional[ConnectionAttempt]: |
|
|
""" |
|
|
Save a connection attempt log |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
endpoint: API endpoint |
|
|
status: Connection status |
|
|
response_time_ms: Response time in milliseconds |
|
|
http_status_code: HTTP status code |
|
|
error_type: Error type if failed |
|
|
error_message: Error message if failed |
|
|
retry_count: Number of retries |
|
|
retry_result: Result of retry attempt |
|
|
|
|
|
Returns: |
|
|
Created ConnectionAttempt object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
attempt = ConnectionAttempt( |
|
|
provider_id=provider_id, |
|
|
endpoint=endpoint, |
|
|
status=status, |
|
|
response_time_ms=response_time_ms, |
|
|
http_status_code=http_status_code, |
|
|
error_type=error_type, |
|
|
error_message=error_message, |
|
|
retry_count=retry_count, |
|
|
retry_result=retry_result |
|
|
) |
|
|
session.add(attempt) |
|
|
session.commit() |
|
|
session.refresh(attempt) |
|
|
return attempt |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to save connection attempt: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_connection_attempts( |
|
|
self, |
|
|
provider_id: Optional[int] = None, |
|
|
status: Optional[str] = None, |
|
|
hours: int = 24, |
|
|
limit: int = 1000 |
|
|
) -> List[ConnectionAttempt]: |
|
|
""" |
|
|
Get connection attempts with filtering |
|
|
|
|
|
Args: |
|
|
provider_id: Filter by provider ID |
|
|
status: Filter by status |
|
|
hours: Get attempts from last N hours |
|
|
limit: Maximum number of records to return |
|
|
|
|
|
Returns: |
|
|
List of ConnectionAttempt objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
query = session.query(ConnectionAttempt).filter( |
|
|
ConnectionAttempt.timestamp >= cutoff_time |
|
|
) |
|
|
|
|
|
if provider_id: |
|
|
query = query.filter(ConnectionAttempt.provider_id == provider_id) |
|
|
|
|
|
if status: |
|
|
query = query.filter(ConnectionAttempt.status == status) |
|
|
|
|
|
attempts = query.order_by(desc(ConnectionAttempt.timestamp)).limit(limit).all() |
|
|
|
|
|
for attempt in attempts: |
|
|
session.refresh(attempt) |
|
|
|
|
|
return attempts |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get connection attempts: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_data_collection( |
|
|
self, |
|
|
provider_id: int, |
|
|
category: str, |
|
|
scheduled_time: datetime, |
|
|
actual_fetch_time: datetime, |
|
|
data_timestamp: Optional[datetime] = None, |
|
|
staleness_minutes: Optional[float] = None, |
|
|
record_count: int = 0, |
|
|
payload_size_bytes: int = 0, |
|
|
data_quality_score: float = 1.0, |
|
|
on_schedule: bool = True, |
|
|
skip_reason: Optional[str] = None |
|
|
) -> Optional[DataCollection]: |
|
|
""" |
|
|
Save a data collection record |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
category: Data category |
|
|
scheduled_time: Scheduled collection time |
|
|
actual_fetch_time: Actual fetch time |
|
|
data_timestamp: Timestamp from API response |
|
|
staleness_minutes: Data staleness in minutes |
|
|
record_count: Number of records collected |
|
|
payload_size_bytes: Payload size in bytes |
|
|
data_quality_score: Data quality score (0-1) |
|
|
on_schedule: Whether collection was on schedule |
|
|
skip_reason: Reason if skipped |
|
|
|
|
|
Returns: |
|
|
Created DataCollection object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
collection = DataCollection( |
|
|
provider_id=provider_id, |
|
|
category=category, |
|
|
scheduled_time=scheduled_time, |
|
|
actual_fetch_time=actual_fetch_time, |
|
|
data_timestamp=data_timestamp, |
|
|
staleness_minutes=staleness_minutes, |
|
|
record_count=record_count, |
|
|
payload_size_bytes=payload_size_bytes, |
|
|
data_quality_score=data_quality_score, |
|
|
on_schedule=on_schedule, |
|
|
skip_reason=skip_reason |
|
|
) |
|
|
session.add(collection) |
|
|
session.commit() |
|
|
session.refresh(collection) |
|
|
return collection |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to save data collection: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_data_collections( |
|
|
self, |
|
|
provider_id: Optional[int] = None, |
|
|
category: Optional[str] = None, |
|
|
hours: int = 24, |
|
|
limit: int = 1000 |
|
|
) -> List[DataCollection]: |
|
|
""" |
|
|
Get data collections with filtering |
|
|
|
|
|
Args: |
|
|
provider_id: Filter by provider ID |
|
|
category: Filter by category |
|
|
hours: Get collections from last N hours |
|
|
limit: Maximum number of records to return |
|
|
|
|
|
Returns: |
|
|
List of DataCollection objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
query = session.query(DataCollection).filter( |
|
|
DataCollection.actual_fetch_time >= cutoff_time |
|
|
) |
|
|
|
|
|
if provider_id: |
|
|
query = query.filter(DataCollection.provider_id == provider_id) |
|
|
|
|
|
if category: |
|
|
query = query.filter(DataCollection.category == category) |
|
|
|
|
|
collections = query.order_by(desc(DataCollection.actual_fetch_time)).limit(limit).all() |
|
|
|
|
|
for collection in collections: |
|
|
session.refresh(collection) |
|
|
|
|
|
return collections |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get data collections: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_rate_limit_usage( |
|
|
self, |
|
|
provider_id: int, |
|
|
limit_type: str, |
|
|
limit_value: int, |
|
|
current_usage: int, |
|
|
reset_time: datetime |
|
|
) -> Optional[RateLimitUsage]: |
|
|
""" |
|
|
Save rate limit usage record |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
limit_type: Limit type (per_minute, per_hour, per_day) |
|
|
limit_value: Rate limit value |
|
|
current_usage: Current usage count |
|
|
reset_time: When the limit resets |
|
|
|
|
|
Returns: |
|
|
Created RateLimitUsage object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
percentage = (current_usage / limit_value * 100) if limit_value > 0 else 0 |
|
|
|
|
|
usage = RateLimitUsage( |
|
|
provider_id=provider_id, |
|
|
limit_type=limit_type, |
|
|
limit_value=limit_value, |
|
|
current_usage=current_usage, |
|
|
percentage=percentage, |
|
|
reset_time=reset_time |
|
|
) |
|
|
session.add(usage) |
|
|
session.commit() |
|
|
session.refresh(usage) |
|
|
return usage |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to save rate limit usage: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_rate_limit_usage( |
|
|
self, |
|
|
provider_id: Optional[int] = None, |
|
|
hours: int = 24, |
|
|
high_usage_only: bool = False, |
|
|
threshold: float = 80.0 |
|
|
) -> List[RateLimitUsage]: |
|
|
""" |
|
|
Get rate limit usage records |
|
|
|
|
|
Args: |
|
|
provider_id: Filter by provider ID |
|
|
hours: Get usage from last N hours |
|
|
high_usage_only: Only return high usage records |
|
|
threshold: Percentage threshold for high usage |
|
|
|
|
|
Returns: |
|
|
List of RateLimitUsage objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
query = session.query(RateLimitUsage).filter( |
|
|
RateLimitUsage.timestamp >= cutoff_time |
|
|
) |
|
|
|
|
|
if provider_id: |
|
|
query = query.filter(RateLimitUsage.provider_id == provider_id) |
|
|
|
|
|
if high_usage_only: |
|
|
query = query.filter(RateLimitUsage.percentage >= threshold) |
|
|
|
|
|
usage_records = query.order_by(desc(RateLimitUsage.timestamp)).all() |
|
|
|
|
|
for record in usage_records: |
|
|
session.refresh(record) |
|
|
|
|
|
return usage_records |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get rate limit usage: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_schedule_config( |
|
|
self, |
|
|
provider_id: int, |
|
|
schedule_interval: str, |
|
|
enabled: bool = True, |
|
|
next_run: Optional[datetime] = None |
|
|
) -> Optional[ScheduleConfig]: |
|
|
""" |
|
|
Create schedule configuration for a provider |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
schedule_interval: Schedule interval (e.g., "every_1_min") |
|
|
enabled: Whether schedule is enabled |
|
|
next_run: Next scheduled run time |
|
|
|
|
|
Returns: |
|
|
Created ScheduleConfig object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
config = ScheduleConfig( |
|
|
provider_id=provider_id, |
|
|
schedule_interval=schedule_interval, |
|
|
enabled=enabled, |
|
|
next_run=next_run |
|
|
) |
|
|
session.add(config) |
|
|
session.commit() |
|
|
session.refresh(config) |
|
|
logger.info(f"Created schedule config for provider {provider_id}") |
|
|
return config |
|
|
except IntegrityError: |
|
|
logger.error(f"Schedule config already exists for provider {provider_id}") |
|
|
return None |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to create schedule config: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_schedule_config(self, provider_id: int) -> Optional[ScheduleConfig]: |
|
|
""" |
|
|
Get schedule configuration for a provider |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
|
|
|
Returns: |
|
|
ScheduleConfig object or None if not found |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
config = session.query(ScheduleConfig).filter( |
|
|
ScheduleConfig.provider_id == provider_id |
|
|
).first() |
|
|
|
|
|
if config: |
|
|
session.refresh(config) |
|
|
return config |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get schedule config: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def update_schedule_config(self, provider_id: int, **kwargs) -> bool: |
|
|
""" |
|
|
Update schedule configuration |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
**kwargs: Attributes to update |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
config = session.query(ScheduleConfig).filter( |
|
|
ScheduleConfig.provider_id == provider_id |
|
|
).first() |
|
|
|
|
|
if not config: |
|
|
logger.warning(f"Schedule config not found for provider {provider_id}") |
|
|
return False |
|
|
|
|
|
for key, value in kwargs.items(): |
|
|
if hasattr(config, key): |
|
|
setattr(config, key, value) |
|
|
|
|
|
session.commit() |
|
|
logger.info(f"Updated schedule config for provider {provider_id}") |
|
|
return True |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to update schedule config: {str(e)}", exc_info=True) |
|
|
return False |
|
|
|
|
|
def get_all_schedule_configs(self, enabled_only: bool = True) -> List[ScheduleConfig]: |
|
|
""" |
|
|
Get all schedule configurations |
|
|
|
|
|
Args: |
|
|
enabled_only: Only return enabled schedules |
|
|
|
|
|
Returns: |
|
|
List of ScheduleConfig objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
query = session.query(ScheduleConfig) |
|
|
|
|
|
if enabled_only: |
|
|
query = query.filter(ScheduleConfig.enabled == True) |
|
|
|
|
|
configs = query.all() |
|
|
|
|
|
for config in configs: |
|
|
session.refresh(config) |
|
|
|
|
|
return configs |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get schedule configs: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_schedule_compliance( |
|
|
self, |
|
|
provider_id: int, |
|
|
expected_time: datetime, |
|
|
actual_time: Optional[datetime] = None, |
|
|
delay_seconds: Optional[int] = None, |
|
|
on_time: bool = True, |
|
|
skip_reason: Optional[str] = None |
|
|
) -> Optional[ScheduleCompliance]: |
|
|
""" |
|
|
Save schedule compliance record |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
expected_time: Expected execution time |
|
|
actual_time: Actual execution time |
|
|
delay_seconds: Delay in seconds |
|
|
on_time: Whether execution was on time |
|
|
skip_reason: Reason if skipped |
|
|
|
|
|
Returns: |
|
|
Created ScheduleCompliance object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
compliance = ScheduleCompliance( |
|
|
provider_id=provider_id, |
|
|
expected_time=expected_time, |
|
|
actual_time=actual_time, |
|
|
delay_seconds=delay_seconds, |
|
|
on_time=on_time, |
|
|
skip_reason=skip_reason |
|
|
) |
|
|
session.add(compliance) |
|
|
session.commit() |
|
|
session.refresh(compliance) |
|
|
return compliance |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to save schedule compliance: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_schedule_compliance( |
|
|
self, |
|
|
provider_id: Optional[int] = None, |
|
|
hours: int = 24, |
|
|
late_only: bool = False |
|
|
) -> List[ScheduleCompliance]: |
|
|
""" |
|
|
Get schedule compliance records |
|
|
|
|
|
Args: |
|
|
provider_id: Filter by provider ID |
|
|
hours: Get records from last N hours |
|
|
late_only: Only return late executions |
|
|
|
|
|
Returns: |
|
|
List of ScheduleCompliance objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
query = session.query(ScheduleCompliance).filter( |
|
|
ScheduleCompliance.timestamp >= cutoff_time |
|
|
) |
|
|
|
|
|
if provider_id: |
|
|
query = query.filter(ScheduleCompliance.provider_id == provider_id) |
|
|
|
|
|
if late_only: |
|
|
query = query.filter(ScheduleCompliance.on_time == False) |
|
|
|
|
|
compliance_records = query.order_by(desc(ScheduleCompliance.timestamp)).all() |
|
|
|
|
|
for record in compliance_records: |
|
|
session.refresh(record) |
|
|
|
|
|
return compliance_records |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get schedule compliance: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_failure_log( |
|
|
self, |
|
|
provider_id: int, |
|
|
endpoint: str, |
|
|
error_type: str, |
|
|
error_message: Optional[str] = None, |
|
|
http_status: Optional[int] = None, |
|
|
retry_attempted: bool = False, |
|
|
retry_result: Optional[str] = None, |
|
|
remediation_applied: Optional[str] = None |
|
|
) -> Optional[FailureLog]: |
|
|
""" |
|
|
Save failure log record |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
endpoint: API endpoint |
|
|
error_type: Type of error |
|
|
error_message: Error message |
|
|
http_status: HTTP status code |
|
|
retry_attempted: Whether retry was attempted |
|
|
retry_result: Result of retry |
|
|
remediation_applied: Remediation action taken |
|
|
|
|
|
Returns: |
|
|
Created FailureLog object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
failure = FailureLog( |
|
|
provider_id=provider_id, |
|
|
endpoint=endpoint, |
|
|
error_type=error_type, |
|
|
error_message=error_message, |
|
|
http_status=http_status, |
|
|
retry_attempted=retry_attempted, |
|
|
retry_result=retry_result, |
|
|
remediation_applied=remediation_applied |
|
|
) |
|
|
session.add(failure) |
|
|
session.commit() |
|
|
session.refresh(failure) |
|
|
return failure |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to save failure log: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_failure_logs( |
|
|
self, |
|
|
provider_id: Optional[int] = None, |
|
|
error_type: Optional[str] = None, |
|
|
hours: int = 24, |
|
|
limit: int = 1000 |
|
|
) -> List[FailureLog]: |
|
|
""" |
|
|
Get failure logs with filtering |
|
|
|
|
|
Args: |
|
|
provider_id: Filter by provider ID |
|
|
error_type: Filter by error type |
|
|
hours: Get logs from last N hours |
|
|
limit: Maximum number of records to return |
|
|
|
|
|
Returns: |
|
|
List of FailureLog objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
query = session.query(FailureLog).filter( |
|
|
FailureLog.timestamp >= cutoff_time |
|
|
) |
|
|
|
|
|
if provider_id: |
|
|
query = query.filter(FailureLog.provider_id == provider_id) |
|
|
|
|
|
if error_type: |
|
|
query = query.filter(FailureLog.error_type == error_type) |
|
|
|
|
|
failures = query.order_by(desc(FailureLog.timestamp)).limit(limit).all() |
|
|
|
|
|
for failure in failures: |
|
|
session.refresh(failure) |
|
|
|
|
|
return failures |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get failure logs: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_alert( |
|
|
self, |
|
|
provider_id: int, |
|
|
alert_type: str, |
|
|
message: str, |
|
|
severity: str = "medium" |
|
|
) -> Optional[Alert]: |
|
|
""" |
|
|
Create an alert |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
alert_type: Type of alert |
|
|
message: Alert message |
|
|
severity: Alert severity (low, medium, high, critical) |
|
|
|
|
|
Returns: |
|
|
Created Alert object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
alert = Alert( |
|
|
provider_id=provider_id, |
|
|
alert_type=alert_type, |
|
|
message=message, |
|
|
severity=severity |
|
|
) |
|
|
session.add(alert) |
|
|
session.commit() |
|
|
session.refresh(alert) |
|
|
logger.warning(f"Alert created: {alert_type} - {message}") |
|
|
return alert |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to create alert: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_alerts( |
|
|
self, |
|
|
provider_id: Optional[int] = None, |
|
|
alert_type: Optional[str] = None, |
|
|
severity: Optional[str] = None, |
|
|
acknowledged: Optional[bool] = None, |
|
|
hours: int = 24 |
|
|
) -> List[Alert]: |
|
|
""" |
|
|
Get alerts with filtering |
|
|
|
|
|
Args: |
|
|
provider_id: Filter by provider ID |
|
|
alert_type: Filter by alert type |
|
|
severity: Filter by severity |
|
|
acknowledged: Filter by acknowledgment status |
|
|
hours: Get alerts from last N hours |
|
|
|
|
|
Returns: |
|
|
List of Alert objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
query = session.query(Alert).filter( |
|
|
Alert.timestamp >= cutoff_time |
|
|
) |
|
|
|
|
|
if provider_id: |
|
|
query = query.filter(Alert.provider_id == provider_id) |
|
|
|
|
|
if alert_type: |
|
|
query = query.filter(Alert.alert_type == alert_type) |
|
|
|
|
|
if severity: |
|
|
query = query.filter(Alert.severity == severity) |
|
|
|
|
|
if acknowledged is not None: |
|
|
query = query.filter(Alert.acknowledged == acknowledged) |
|
|
|
|
|
alerts = query.order_by(desc(Alert.timestamp)).all() |
|
|
|
|
|
for alert in alerts: |
|
|
session.refresh(alert) |
|
|
|
|
|
return alerts |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get alerts: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
def acknowledge_alert(self, alert_id: int) -> bool: |
|
|
""" |
|
|
Acknowledge an alert |
|
|
|
|
|
Args: |
|
|
alert_id: Alert ID |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
alert = session.query(Alert).filter(Alert.id == alert_id).first() |
|
|
if not alert: |
|
|
logger.warning(f"Alert not found: {alert_id}") |
|
|
return False |
|
|
|
|
|
alert.acknowledged = True |
|
|
alert.acknowledged_at = datetime.utcnow() |
|
|
session.commit() |
|
|
logger.info(f"Alert acknowledged: {alert_id}") |
|
|
return True |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to acknowledge alert: {str(e)}", exc_info=True) |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_system_metrics( |
|
|
self, |
|
|
total_providers: int, |
|
|
online_count: int, |
|
|
degraded_count: int, |
|
|
offline_count: int, |
|
|
avg_response_time_ms: float, |
|
|
total_requests_hour: int, |
|
|
total_failures_hour: int, |
|
|
system_health: str = "healthy" |
|
|
) -> Optional[SystemMetrics]: |
|
|
""" |
|
|
Save system metrics snapshot |
|
|
|
|
|
Args: |
|
|
total_providers: Total number of providers |
|
|
online_count: Number of online providers |
|
|
degraded_count: Number of degraded providers |
|
|
offline_count: Number of offline providers |
|
|
avg_response_time_ms: Average response time |
|
|
total_requests_hour: Total requests in last hour |
|
|
total_failures_hour: Total failures in last hour |
|
|
system_health: Overall system health |
|
|
|
|
|
Returns: |
|
|
Created SystemMetrics object or None if failed |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
metrics = SystemMetrics( |
|
|
total_providers=total_providers, |
|
|
online_count=online_count, |
|
|
degraded_count=degraded_count, |
|
|
offline_count=offline_count, |
|
|
avg_response_time_ms=avg_response_time_ms, |
|
|
total_requests_hour=total_requests_hour, |
|
|
total_failures_hour=total_failures_hour, |
|
|
system_health=system_health |
|
|
) |
|
|
session.add(metrics) |
|
|
session.commit() |
|
|
session.refresh(metrics) |
|
|
return metrics |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to save system metrics: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
def get_system_metrics(self, hours: int = 24, limit: int = 1000) -> List[SystemMetrics]: |
|
|
""" |
|
|
Get system metrics history |
|
|
|
|
|
Args: |
|
|
hours: Get metrics from last N hours |
|
|
limit: Maximum number of records to return |
|
|
|
|
|
Returns: |
|
|
List of SystemMetrics objects |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
metrics = session.query(SystemMetrics).filter( |
|
|
SystemMetrics.timestamp >= cutoff_time |
|
|
).order_by(desc(SystemMetrics.timestamp)).limit(limit).all() |
|
|
|
|
|
for metric in metrics: |
|
|
session.refresh(metric) |
|
|
|
|
|
return metrics |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get system metrics: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
def get_latest_system_metrics(self) -> Optional[SystemMetrics]: |
|
|
""" |
|
|
Get the most recent system metrics |
|
|
|
|
|
Returns: |
|
|
Latest SystemMetrics object or None |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
metrics = session.query(SystemMetrics).order_by( |
|
|
desc(SystemMetrics.timestamp) |
|
|
).first() |
|
|
|
|
|
if metrics: |
|
|
session.refresh(metrics) |
|
|
return metrics |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get latest system metrics: {str(e)}", exc_info=True) |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_provider_stats(self, provider_id: int, hours: int = 24) -> Dict[str, Any]: |
|
|
""" |
|
|
Get comprehensive statistics for a provider |
|
|
|
|
|
Args: |
|
|
provider_id: Provider ID |
|
|
hours: Time window in hours |
|
|
|
|
|
Returns: |
|
|
Dictionary with provider statistics |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
|
|
|
|
|
|
provider = session.query(Provider).filter(Provider.id == provider_id).first() |
|
|
if not provider: |
|
|
return {} |
|
|
|
|
|
|
|
|
connection_stats = session.query( |
|
|
func.count(ConnectionAttempt.id).label('total_attempts'), |
|
|
func.sum(func.case((ConnectionAttempt.status == 'success', 1), else_=0)).label('successful'), |
|
|
func.sum(func.case((ConnectionAttempt.status == 'failed', 1), else_=0)).label('failed'), |
|
|
func.sum(func.case((ConnectionAttempt.status == 'timeout', 1), else_=0)).label('timeout'), |
|
|
func.sum(func.case((ConnectionAttempt.status == 'rate_limited', 1), else_=0)).label('rate_limited'), |
|
|
func.avg(ConnectionAttempt.response_time_ms).label('avg_response_time') |
|
|
).filter( |
|
|
ConnectionAttempt.provider_id == provider_id, |
|
|
ConnectionAttempt.timestamp >= cutoff_time |
|
|
).first() |
|
|
|
|
|
|
|
|
collection_stats = session.query( |
|
|
func.count(DataCollection.id).label('total_collections'), |
|
|
func.sum(DataCollection.record_count).label('total_records'), |
|
|
func.sum(DataCollection.payload_size_bytes).label('total_bytes'), |
|
|
func.avg(DataCollection.data_quality_score).label('avg_quality'), |
|
|
func.avg(DataCollection.staleness_minutes).label('avg_staleness') |
|
|
).filter( |
|
|
DataCollection.provider_id == provider_id, |
|
|
DataCollection.actual_fetch_time >= cutoff_time |
|
|
).first() |
|
|
|
|
|
|
|
|
failure_count = session.query(func.count(FailureLog.id)).filter( |
|
|
FailureLog.provider_id == provider_id, |
|
|
FailureLog.timestamp >= cutoff_time |
|
|
).scalar() |
|
|
|
|
|
|
|
|
total_attempts = connection_stats.total_attempts or 0 |
|
|
successful = connection_stats.successful or 0 |
|
|
success_rate = (successful / total_attempts * 100) if total_attempts > 0 else 0 |
|
|
|
|
|
return { |
|
|
'provider_name': provider.name, |
|
|
'provider_id': provider_id, |
|
|
'time_window_hours': hours, |
|
|
'connection_stats': { |
|
|
'total_attempts': total_attempts, |
|
|
'successful': successful, |
|
|
'failed': connection_stats.failed or 0, |
|
|
'timeout': connection_stats.timeout or 0, |
|
|
'rate_limited': connection_stats.rate_limited or 0, |
|
|
'success_rate': round(success_rate, 2), |
|
|
'avg_response_time_ms': round(connection_stats.avg_response_time or 0, 2) |
|
|
}, |
|
|
'data_collection_stats': { |
|
|
'total_collections': collection_stats.total_collections or 0, |
|
|
'total_records': collection_stats.total_records or 0, |
|
|
'total_bytes': collection_stats.total_bytes or 0, |
|
|
'avg_quality_score': round(collection_stats.avg_quality or 0, 2), |
|
|
'avg_staleness_minutes': round(collection_stats.avg_staleness or 0, 2) |
|
|
}, |
|
|
'failure_count': failure_count or 0 |
|
|
} |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get provider stats: {str(e)}", exc_info=True) |
|
|
return {} |
|
|
|
|
|
def get_failure_analysis(self, hours: int = 24) -> Dict[str, Any]: |
|
|
""" |
|
|
Get comprehensive failure analysis across all providers |
|
|
|
|
|
Args: |
|
|
hours: Time window in hours |
|
|
|
|
|
Returns: |
|
|
Dictionary with failure analysis |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
|
|
|
|
|
|
error_type_stats = session.query( |
|
|
FailureLog.error_type, |
|
|
func.count(FailureLog.id).label('count') |
|
|
).filter( |
|
|
FailureLog.timestamp >= cutoff_time |
|
|
).group_by(FailureLog.error_type).all() |
|
|
|
|
|
|
|
|
provider_stats = session.query( |
|
|
Provider.name, |
|
|
func.count(FailureLog.id).label('count') |
|
|
).join( |
|
|
FailureLog, Provider.id == FailureLog.provider_id |
|
|
).filter( |
|
|
FailureLog.timestamp >= cutoff_time |
|
|
).group_by(Provider.name).order_by(desc('count')).limit(10).all() |
|
|
|
|
|
|
|
|
retry_stats = session.query( |
|
|
func.sum(func.case((FailureLog.retry_attempted == True, 1), else_=0)).label('total_retries'), |
|
|
func.sum(func.case((FailureLog.retry_result == 'success', 1), else_=0)).label('successful_retries') |
|
|
).filter( |
|
|
FailureLog.timestamp >= cutoff_time |
|
|
).first() |
|
|
|
|
|
total_retries = retry_stats.total_retries or 0 |
|
|
successful_retries = retry_stats.successful_retries or 0 |
|
|
retry_success_rate = (successful_retries / total_retries * 100) if total_retries > 0 else 0 |
|
|
|
|
|
return { |
|
|
'time_window_hours': hours, |
|
|
'failures_by_error_type': [ |
|
|
{'error_type': stat.error_type, 'count': stat.count} |
|
|
for stat in error_type_stats |
|
|
], |
|
|
'top_failing_providers': [ |
|
|
{'provider': stat.name, 'failure_count': stat.count} |
|
|
for stat in provider_stats |
|
|
], |
|
|
'retry_statistics': { |
|
|
'total_retries': total_retries, |
|
|
'successful_retries': successful_retries, |
|
|
'retry_success_rate': round(retry_success_rate, 2) |
|
|
} |
|
|
} |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get failure analysis: {str(e)}", exc_info=True) |
|
|
return {} |
|
|
|
|
|
def get_recent_logs( |
|
|
self, |
|
|
log_type: str, |
|
|
provider_id: Optional[int] = None, |
|
|
hours: int = 1, |
|
|
limit: int = 100 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get recent logs of specified type with filtering |
|
|
|
|
|
Args: |
|
|
log_type: Type of logs (connection, failure, collection, rate_limit) |
|
|
provider_id: Filter by provider ID |
|
|
hours: Get logs from last N hours |
|
|
limit: Maximum number of records |
|
|
|
|
|
Returns: |
|
|
List of log dictionaries |
|
|
""" |
|
|
try: |
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours) |
|
|
|
|
|
if log_type == 'connection': |
|
|
attempts = self.get_connection_attempts(provider_id=provider_id, hours=hours, limit=limit) |
|
|
return [ |
|
|
{ |
|
|
'id': a.id, |
|
|
'timestamp': a.timestamp.isoformat(), |
|
|
'provider_id': a.provider_id, |
|
|
'endpoint': a.endpoint, |
|
|
'status': a.status, |
|
|
'response_time_ms': a.response_time_ms, |
|
|
'http_status_code': a.http_status_code, |
|
|
'error_type': a.error_type, |
|
|
'error_message': a.error_message |
|
|
} |
|
|
for a in attempts |
|
|
] |
|
|
|
|
|
elif log_type == 'failure': |
|
|
failures = self.get_failure_logs(provider_id=provider_id, hours=hours, limit=limit) |
|
|
return [ |
|
|
{ |
|
|
'id': f.id, |
|
|
'timestamp': f.timestamp.isoformat(), |
|
|
'provider_id': f.provider_id, |
|
|
'endpoint': f.endpoint, |
|
|
'error_type': f.error_type, |
|
|
'error_message': f.error_message, |
|
|
'http_status': f.http_status, |
|
|
'retry_attempted': f.retry_attempted, |
|
|
'retry_result': f.retry_result |
|
|
} |
|
|
for f in failures |
|
|
] |
|
|
|
|
|
elif log_type == 'collection': |
|
|
collections = self.get_data_collections(provider_id=provider_id, hours=hours, limit=limit) |
|
|
return [ |
|
|
{ |
|
|
'id': c.id, |
|
|
'provider_id': c.provider_id, |
|
|
'category': c.category, |
|
|
'scheduled_time': c.scheduled_time.isoformat(), |
|
|
'actual_fetch_time': c.actual_fetch_time.isoformat(), |
|
|
'record_count': c.record_count, |
|
|
'payload_size_bytes': c.payload_size_bytes, |
|
|
'data_quality_score': c.data_quality_score, |
|
|
'on_schedule': c.on_schedule |
|
|
} |
|
|
for c in collections |
|
|
] |
|
|
|
|
|
elif log_type == 'rate_limit': |
|
|
usage = self.get_rate_limit_usage(provider_id=provider_id, hours=hours) |
|
|
return [ |
|
|
{ |
|
|
'id': u.id, |
|
|
'timestamp': u.timestamp.isoformat(), |
|
|
'provider_id': u.provider_id, |
|
|
'limit_type': u.limit_type, |
|
|
'limit_value': u.limit_value, |
|
|
'current_usage': u.current_usage, |
|
|
'percentage': u.percentage, |
|
|
'reset_time': u.reset_time.isoformat() |
|
|
} |
|
|
for u in usage[:limit] |
|
|
] |
|
|
|
|
|
else: |
|
|
logger.warning(f"Unknown log type: {log_type}") |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to get recent logs: {str(e)}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
def cleanup_old_data(self, days: int = 30) -> Dict[str, int]: |
|
|
""" |
|
|
Remove old records from the database to manage storage |
|
|
|
|
|
Args: |
|
|
days: Remove records older than N days |
|
|
|
|
|
Returns: |
|
|
Dictionary with count of deleted records per table |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(days=days) |
|
|
deleted_counts = {} |
|
|
|
|
|
|
|
|
deleted = session.query(ConnectionAttempt).filter( |
|
|
ConnectionAttempt.timestamp < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['connection_attempts'] = deleted |
|
|
|
|
|
|
|
|
deleted = session.query(DataCollection).filter( |
|
|
DataCollection.actual_fetch_time < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['data_collections'] = deleted |
|
|
|
|
|
|
|
|
deleted = session.query(RateLimitUsage).filter( |
|
|
RateLimitUsage.timestamp < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['rate_limit_usage'] = deleted |
|
|
|
|
|
|
|
|
deleted = session.query(ScheduleCompliance).filter( |
|
|
ScheduleCompliance.timestamp < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['schedule_compliance'] = deleted |
|
|
|
|
|
|
|
|
deleted = session.query(FailureLog).filter( |
|
|
FailureLog.timestamp < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['failure_logs'] = deleted |
|
|
|
|
|
|
|
|
deleted = session.query(Alert).filter( |
|
|
and_( |
|
|
Alert.timestamp < cutoff_time, |
|
|
Alert.acknowledged == True |
|
|
) |
|
|
).delete() |
|
|
deleted_counts['alerts'] = deleted |
|
|
|
|
|
|
|
|
deleted = session.query(SystemMetrics).filter( |
|
|
SystemMetrics.timestamp < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['system_metrics'] = deleted |
|
|
|
|
|
session.commit() |
|
|
|
|
|
total_deleted = sum(deleted_counts.values()) |
|
|
logger.info(f"Cleaned up {total_deleted} old records (older than {days} days)") |
|
|
|
|
|
return deleted_counts |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to cleanup old data: {str(e)}", exc_info=True) |
|
|
return {} |
|
|
|
|
|
def get_database_stats(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get database statistics |
|
|
|
|
|
Returns: |
|
|
Dictionary with database statistics |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
stats = { |
|
|
'providers': session.query(func.count(Provider.id)).scalar(), |
|
|
'connection_attempts': session.query(func.count(ConnectionAttempt.id)).scalar(), |
|
|
'data_collections': session.query(func.count(DataCollection.id)).scalar(), |
|
|
'rate_limit_usage': session.query(func.count(RateLimitUsage.id)).scalar(), |
|
|
'schedule_configs': session.query(func.count(ScheduleConfig.id)).scalar(), |
|
|
'schedule_compliance': session.query(func.count(ScheduleCompliance.id)).scalar(), |
|
|
'failure_logs': session.query(func.count(FailureLog.id)).scalar(), |
|
|
'alerts': session.query(func.count(Alert.id)).scalar(), |
|
|
'system_metrics': session.query(func.count(SystemMetrics.id)).scalar(), |
|
|
} |
|
|
|
|
|
|
|
|
if os.path.exists(self.db_path): |
|
|
stats['database_size_mb'] = round(os.path.getsize(self.db_path) / (1024 * 1024), 2) |
|
|
else: |
|
|
stats['database_size_mb'] = 0 |
|
|
|
|
|
return stats |
|
|
except SQLAlchemyError as e: |
|
|
logger.error(f"Failed to get database stats: {str(e)}", exc_info=True) |
|
|
return {} |
|
|
|
|
|
def health_check(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform database health check |
|
|
|
|
|
Returns: |
|
|
Dictionary with health check results |
|
|
""" |
|
|
try: |
|
|
with self.get_session() as session: |
|
|
|
|
|
result = session.execute(text("SELECT 1")).scalar() |
|
|
|
|
|
|
|
|
stats = self.get_database_stats() |
|
|
|
|
|
return { |
|
|
'status': 'healthy' if result == 1 else 'unhealthy', |
|
|
'database_path': self.db_path, |
|
|
'database_exists': os.path.exists(self.db_path), |
|
|
'stats': stats, |
|
|
'timestamp': datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Health check failed: {str(e)}", exc_info=True) |
|
|
return { |
|
|
'status': 'unhealthy', |
|
|
'error': str(e), |
|
|
'timestamp': datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
db_manager = DatabaseManager() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_db(db_path: str = "data/api_monitor.db") -> DatabaseManager: |
|
|
""" |
|
|
Initialize database and return manager instance |
|
|
|
|
|
Args: |
|
|
db_path: Path to database file |
|
|
|
|
|
Returns: |
|
|
DatabaseManager instance |
|
|
""" |
|
|
manager = DatabaseManager(db_path=db_path) |
|
|
manager.init_database() |
|
|
logger.info("Database initialized successfully") |
|
|
return manager |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("Database Manager Module") |
|
|
print("=" * 80) |
|
|
|
|
|
|
|
|
manager = init_db() |
|
|
|
|
|
|
|
|
health = manager.health_check() |
|
|
print(f"\nHealth Check: {health['status']}") |
|
|
print(f"Database Stats: {health.get('stats', {})}") |
|
|
|
|
|
|
|
|
stats = manager.get_database_stats() |
|
|
print(f"\nDatabase Statistics:") |
|
|
for table, count in stats.items(): |
|
|
if table != 'database_size_mb': |
|
|
print(f" {table}: {count}") |
|
|
print(f" Database Size: {stats.get('database_size_mb', 0)} MB") |
|
|
|