""" Health Monitoring System for API Providers """ import asyncio from datetime import datetime from sqlalchemy.orm import Session from database.db import get_db from database.models import Provider, ConnectionAttempt, StatusEnum, ProviderStatusEnum from utils.http_client import APIClient from config import config import logging logger = logging.getLogger(__name__) class HealthMonitor: def __init__(self): self.running = False async def start(self): """Start health monitoring loop""" self.running = True logger.info("Health monitoring started") while self.running: try: await self.check_all_providers() await asyncio.sleep(config.HEALTH_CHECK_INTERVAL) except Exception as e: logger.error(f"Health monitoring error: {e}") await asyncio.sleep(10) async def check_all_providers(self): """Check health of all providers""" with get_db() as db: providers = db.query(Provider).filter(Provider.priority_tier <= 2).all() async with APIClient() as client: tasks = [self.check_provider(client, provider, db) for provider in providers] await asyncio.gather(*tasks, return_exceptions=True) async def check_provider(self, client: APIClient, provider: Provider, db: Session): """Check health of a single provider""" try: # Build health check endpoint endpoint = self.get_health_endpoint(provider) headers = self.get_headers(provider) # Make request result = await client.get(endpoint, headers=headers) # Determine status status = StatusEnum.SUCCESS if result["success"] and result["status_code"] == 200 else StatusEnum.FAILED # Log attempt attempt = ConnectionAttempt( provider_id=provider.id, timestamp=datetime.utcnow(), endpoint=endpoint, status=status, response_time_ms=result["response_time_ms"], http_status_code=result["status_code"], error_type=result["error"]["type"] if result["error"] else None, error_message=result["error"]["message"] if result["error"] else None, retry_count=0 ) db.add(attempt) # Update provider status provider.last_response_time_ms = result["response_time_ms"] provider.last_check_at = datetime.utcnow() # Calculate overall status recent_attempts = db.query(ConnectionAttempt).filter( ConnectionAttempt.provider_id == provider.id ).order_by(ConnectionAttempt.timestamp.desc()).limit(5).all() success_count = sum(1 for a in recent_attempts if a.status == StatusEnum.SUCCESS) if success_count == 5: provider.status = ProviderStatusEnum.ONLINE elif success_count >= 3: provider.status = ProviderStatusEnum.DEGRADED else: provider.status = ProviderStatusEnum.OFFLINE db.commit() logger.info(f"Health check for {provider.name}: {status.value} ({result['response_time_ms']}ms)") except Exception as e: logger.error(f"Health check failed for {provider.name}: {e}") def get_health_endpoint(self, provider: Provider) -> str: """Get health check endpoint for provider""" endpoints = { "CoinGecko": f"{provider.endpoint_url}/ping", "CoinMarketCap": f"{provider.endpoint_url}/cryptocurrency/map?limit=1", "Etherscan": f"{provider.endpoint_url}?module=stats&action=ethsupply&apikey={config.API_KEYS['etherscan'][0] if config.API_KEYS['etherscan'] else ''}", "BscScan": f"{provider.endpoint_url}?module=stats&action=bnbsupply&apikey={config.API_KEYS['bscscan'][0] if config.API_KEYS['bscscan'] else ''}", "TronScan": f"{provider.endpoint_url}/system/status", "CryptoPanic": f"{provider.endpoint_url}/posts/?auth_token=free&public=true", "Alternative.me": f"{provider.endpoint_url}/fng/", "CryptoCompare": f"{provider.endpoint_url}/price?fsym=BTC&tsyms=USD", "Binance": f"{provider.endpoint_url}/ping", "NewsAPI": f"{provider.endpoint_url}/news?language=en&category=technology", "The Graph": "https://api.thegraph.com/index-node/graphql", "Blockchair": f"{provider.endpoint_url}/bitcoin/stats" } return endpoints.get(provider.name, provider.endpoint_url) def get_headers(self, provider: Provider) -> dict: """Get headers for provider""" headers = {"User-Agent": "CryptoMonitor/1.0"} if provider.name == "CoinMarketCap" and config.API_KEYS["coinmarketcap"]: headers["X-CMC_PRO_API_KEY"] = config.API_KEYS["coinmarketcap"][0] elif provider.name == "TronScan" and config.API_KEYS["tronscan"]: headers["TRON-PRO-API-KEY"] = config.API_KEYS["tronscan"][0] elif provider.name == "CryptoCompare" and config.API_KEYS["cryptocompare"]: headers["authorization"] = f"Apikey {config.API_KEYS['cryptocompare'][0]}" elif provider.name == "NewsAPI" and config.API_KEYS["newsapi"]: headers["X-ACCESS-KEY"] = config.API_KEYS["newsapi"][0] return headers def stop(self): """Stop health monitoring""" self.running = False logger.info("Health monitoring stopped") # Global instance health_monitor = HealthMonitor()