|
|
""" |
|
|
Health Monitoring System for API Providers |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
from sqlalchemy.orm import Session |
|
|
from database.db import get_db |
|
|
from database.models import Provider, ConnectionAttempt, StatusEnum, ProviderStatusEnum |
|
|
from utils.http_client import APIClient |
|
|
from config import config |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class HealthMonitor: |
|
|
def __init__(self): |
|
|
self.running = False |
|
|
|
|
|
async def start(self): |
|
|
"""Start health monitoring loop""" |
|
|
self.running = True |
|
|
logger.info("Health monitoring started") |
|
|
|
|
|
while self.running: |
|
|
try: |
|
|
await self.check_all_providers() |
|
|
await asyncio.sleep(config.HEALTH_CHECK_INTERVAL) |
|
|
except Exception as e: |
|
|
logger.error(f"Health monitoring error: {e}") |
|
|
await asyncio.sleep(10) |
|
|
|
|
|
async def check_all_providers(self): |
|
|
"""Check health of all providers""" |
|
|
with get_db() as db: |
|
|
providers = db.query(Provider).filter(Provider.priority_tier <= 2).all() |
|
|
|
|
|
async with APIClient() as client: |
|
|
tasks = [self.check_provider(client, provider, db) for provider in providers] |
|
|
await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
async def check_provider(self, client: APIClient, provider: Provider, db: Session): |
|
|
"""Check health of a single provider""" |
|
|
try: |
|
|
|
|
|
endpoint = self.get_health_endpoint(provider) |
|
|
headers = self.get_headers(provider) |
|
|
|
|
|
|
|
|
result = await client.get(endpoint, headers=headers) |
|
|
|
|
|
|
|
|
status = StatusEnum.SUCCESS if result["success"] and result["status_code"] == 200 else StatusEnum.FAILED |
|
|
|
|
|
|
|
|
attempt = ConnectionAttempt( |
|
|
provider_id=provider.id, |
|
|
timestamp=datetime.utcnow(), |
|
|
endpoint=endpoint, |
|
|
status=status, |
|
|
response_time_ms=result["response_time_ms"], |
|
|
http_status_code=result["status_code"], |
|
|
error_type=result["error"]["type"] if result["error"] else None, |
|
|
error_message=result["error"]["message"] if result["error"] else None, |
|
|
retry_count=0 |
|
|
) |
|
|
db.add(attempt) |
|
|
|
|
|
|
|
|
provider.last_response_time_ms = result["response_time_ms"] |
|
|
provider.last_check_at = datetime.utcnow() |
|
|
|
|
|
|
|
|
recent_attempts = db.query(ConnectionAttempt).filter( |
|
|
ConnectionAttempt.provider_id == provider.id |
|
|
).order_by(ConnectionAttempt.timestamp.desc()).limit(5).all() |
|
|
|
|
|
success_count = sum(1 for a in recent_attempts if a.status == StatusEnum.SUCCESS) |
|
|
|
|
|
if success_count == 5: |
|
|
provider.status = ProviderStatusEnum.ONLINE |
|
|
elif success_count >= 3: |
|
|
provider.status = ProviderStatusEnum.DEGRADED |
|
|
else: |
|
|
provider.status = ProviderStatusEnum.OFFLINE |
|
|
|
|
|
db.commit() |
|
|
|
|
|
logger.info(f"Health check for {provider.name}: {status.value} ({result['response_time_ms']}ms)") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Health check failed for {provider.name}: {e}") |
|
|
|
|
|
def get_health_endpoint(self, provider: Provider) -> str: |
|
|
"""Get health check endpoint for provider""" |
|
|
endpoints = { |
|
|
"CoinGecko": f"{provider.endpoint_url}/ping", |
|
|
"CoinMarketCap": f"{provider.endpoint_url}/cryptocurrency/map?limit=1", |
|
|
"Etherscan": f"{provider.endpoint_url}?module=stats&action=ethsupply&apikey={config.API_KEYS['etherscan'][0] if config.API_KEYS['etherscan'] else ''}", |
|
|
"BscScan": f"{provider.endpoint_url}?module=stats&action=bnbsupply&apikey={config.API_KEYS['bscscan'][0] if config.API_KEYS['bscscan'] else ''}", |
|
|
"TronScan": f"{provider.endpoint_url}/system/status", |
|
|
"CryptoPanic": f"{provider.endpoint_url}/posts/?auth_token=free&public=true", |
|
|
"Alternative.me": f"{provider.endpoint_url}/fng/", |
|
|
"CryptoCompare": f"{provider.endpoint_url}/price?fsym=BTC&tsyms=USD", |
|
|
"Binance": f"{provider.endpoint_url}/ping", |
|
|
"NewsAPI": f"{provider.endpoint_url}/news?language=en&category=technology", |
|
|
"The Graph": "https://api.thegraph.com/index-node/graphql", |
|
|
"Blockchair": f"{provider.endpoint_url}/bitcoin/stats" |
|
|
} |
|
|
|
|
|
return endpoints.get(provider.name, provider.endpoint_url) |
|
|
|
|
|
def get_headers(self, provider: Provider) -> dict: |
|
|
"""Get headers for provider""" |
|
|
headers = {"User-Agent": "CryptoMonitor/1.0"} |
|
|
|
|
|
if provider.name == "CoinMarketCap" and config.API_KEYS["coinmarketcap"]: |
|
|
headers["X-CMC_PRO_API_KEY"] = config.API_KEYS["coinmarketcap"][0] |
|
|
elif provider.name == "TronScan" and config.API_KEYS["tronscan"]: |
|
|
headers["TRON-PRO-API-KEY"] = config.API_KEYS["tronscan"][0] |
|
|
elif provider.name == "CryptoCompare" and config.API_KEYS["cryptocompare"]: |
|
|
headers["authorization"] = f"Apikey {config.API_KEYS['cryptocompare'][0]}" |
|
|
elif provider.name == "NewsAPI" and config.API_KEYS["newsapi"]: |
|
|
headers["X-ACCESS-KEY"] = config.API_KEYS["newsapi"][0] |
|
|
|
|
|
return headers |
|
|
|
|
|
def stop(self): |
|
|
"""Stop health monitoring""" |
|
|
self.running = False |
|
|
logger.info("Health monitoring stopped") |
|
|
|
|
|
|
|
|
|
|
|
health_monitor = HealthMonitor() |
|
|
|