File size: 5,683 Bytes
5cd2b89 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
"""
Health Monitoring System for API Providers
"""
import asyncio
from datetime import datetime
from sqlalchemy.orm import Session
from database.db import get_db
from database.models import Provider, ConnectionAttempt, StatusEnum, ProviderStatusEnum
from utils.http_client import APIClient
from config import config
import logging
logger = logging.getLogger(__name__)
class HealthMonitor:
def __init__(self):
self.running = False
async def start(self):
"""Start health monitoring loop"""
self.running = True
logger.info("Health monitoring started")
while self.running:
try:
await self.check_all_providers()
await asyncio.sleep(config.HEALTH_CHECK_INTERVAL)
except Exception as e:
logger.error(f"Health monitoring error: {e}")
await asyncio.sleep(10)
async def check_all_providers(self):
"""Check health of all providers"""
with get_db() as db:
providers = db.query(Provider).filter(Provider.priority_tier <= 2).all()
async with APIClient() as client:
tasks = [self.check_provider(client, provider, db) for provider in providers]
await asyncio.gather(*tasks, return_exceptions=True)
async def check_provider(self, client: APIClient, provider: Provider, db: Session):
"""Check health of a single provider"""
try:
# Build health check endpoint
endpoint = self.get_health_endpoint(provider)
headers = self.get_headers(provider)
# Make request
result = await client.get(endpoint, headers=headers)
# Determine status
status = StatusEnum.SUCCESS if result["success"] and result["status_code"] == 200 else StatusEnum.FAILED
# Log attempt
attempt = ConnectionAttempt(
provider_id=provider.id,
timestamp=datetime.utcnow(),
endpoint=endpoint,
status=status,
response_time_ms=result["response_time_ms"],
http_status_code=result["status_code"],
error_type=result["error"]["type"] if result["error"] else None,
error_message=result["error"]["message"] if result["error"] else None,
retry_count=0
)
db.add(attempt)
# Update provider status
provider.last_response_time_ms = result["response_time_ms"]
provider.last_check_at = datetime.utcnow()
# Calculate overall status
recent_attempts = db.query(ConnectionAttempt).filter(
ConnectionAttempt.provider_id == provider.id
).order_by(ConnectionAttempt.timestamp.desc()).limit(5).all()
success_count = sum(1 for a in recent_attempts if a.status == StatusEnum.SUCCESS)
if success_count == 5:
provider.status = ProviderStatusEnum.ONLINE
elif success_count >= 3:
provider.status = ProviderStatusEnum.DEGRADED
else:
provider.status = ProviderStatusEnum.OFFLINE
db.commit()
logger.info(f"Health check for {provider.name}: {status.value} ({result['response_time_ms']}ms)")
except Exception as e:
logger.error(f"Health check failed for {provider.name}: {e}")
def get_health_endpoint(self, provider: Provider) -> str:
"""Get health check endpoint for provider"""
endpoints = {
"CoinGecko": f"{provider.endpoint_url}/ping",
"CoinMarketCap": f"{provider.endpoint_url}/cryptocurrency/map?limit=1",
"Etherscan": f"{provider.endpoint_url}?module=stats&action=ethsupply&apikey={config.API_KEYS['etherscan'][0] if config.API_KEYS['etherscan'] else ''}",
"BscScan": f"{provider.endpoint_url}?module=stats&action=bnbsupply&apikey={config.API_KEYS['bscscan'][0] if config.API_KEYS['bscscan'] else ''}",
"TronScan": f"{provider.endpoint_url}/system/status",
"CryptoPanic": f"{provider.endpoint_url}/posts/?auth_token=free&public=true",
"Alternative.me": f"{provider.endpoint_url}/fng/",
"CryptoCompare": f"{provider.endpoint_url}/price?fsym=BTC&tsyms=USD",
"Binance": f"{provider.endpoint_url}/ping",
"NewsAPI": f"{provider.endpoint_url}/news?language=en&category=technology",
"The Graph": "https://api.thegraph.com/index-node/graphql",
"Blockchair": f"{provider.endpoint_url}/bitcoin/stats"
}
return endpoints.get(provider.name, provider.endpoint_url)
def get_headers(self, provider: Provider) -> dict:
"""Get headers for provider"""
headers = {"User-Agent": "CryptoMonitor/1.0"}
if provider.name == "CoinMarketCap" and config.API_KEYS["coinmarketcap"]:
headers["X-CMC_PRO_API_KEY"] = config.API_KEYS["coinmarketcap"][0]
elif provider.name == "TronScan" and config.API_KEYS["tronscan"]:
headers["TRON-PRO-API-KEY"] = config.API_KEYS["tronscan"][0]
elif provider.name == "CryptoCompare" and config.API_KEYS["cryptocompare"]:
headers["authorization"] = f"Apikey {config.API_KEYS['cryptocompare'][0]}"
elif provider.name == "NewsAPI" and config.API_KEYS["newsapi"]:
headers["X-ACCESS-KEY"] = config.API_KEYS["newsapi"][0]
return headers
def stop(self):
"""Stop health monitoring"""
self.running = False
logger.info("Health monitoring stopped")
# Global instance
health_monitor = HealthMonitor()
|