Really-amin's picture
Upload 143 files
5cd2b89 verified
"""
Health Monitoring System for API Providers
"""
import asyncio
from datetime import datetime
from sqlalchemy.orm import Session
from database.db import get_db
from database.models import Provider, ConnectionAttempt, StatusEnum, ProviderStatusEnum
from utils.http_client import APIClient
from config import config
import logging
logger = logging.getLogger(__name__)
class HealthMonitor:
def __init__(self):
self.running = False
async def start(self):
"""Start health monitoring loop"""
self.running = True
logger.info("Health monitoring started")
while self.running:
try:
await self.check_all_providers()
await asyncio.sleep(config.HEALTH_CHECK_INTERVAL)
except Exception as e:
logger.error(f"Health monitoring error: {e}")
await asyncio.sleep(10)
async def check_all_providers(self):
"""Check health of all providers"""
with get_db() as db:
providers = db.query(Provider).filter(Provider.priority_tier <= 2).all()
async with APIClient() as client:
tasks = [self.check_provider(client, provider, db) for provider in providers]
await asyncio.gather(*tasks, return_exceptions=True)
async def check_provider(self, client: APIClient, provider: Provider, db: Session):
"""Check health of a single provider"""
try:
# Build health check endpoint
endpoint = self.get_health_endpoint(provider)
headers = self.get_headers(provider)
# Make request
result = await client.get(endpoint, headers=headers)
# Determine status
status = StatusEnum.SUCCESS if result["success"] and result["status_code"] == 200 else StatusEnum.FAILED
# Log attempt
attempt = ConnectionAttempt(
provider_id=provider.id,
timestamp=datetime.utcnow(),
endpoint=endpoint,
status=status,
response_time_ms=result["response_time_ms"],
http_status_code=result["status_code"],
error_type=result["error"]["type"] if result["error"] else None,
error_message=result["error"]["message"] if result["error"] else None,
retry_count=0
)
db.add(attempt)
# Update provider status
provider.last_response_time_ms = result["response_time_ms"]
provider.last_check_at = datetime.utcnow()
# Calculate overall status
recent_attempts = db.query(ConnectionAttempt).filter(
ConnectionAttempt.provider_id == provider.id
).order_by(ConnectionAttempt.timestamp.desc()).limit(5).all()
success_count = sum(1 for a in recent_attempts if a.status == StatusEnum.SUCCESS)
if success_count == 5:
provider.status = ProviderStatusEnum.ONLINE
elif success_count >= 3:
provider.status = ProviderStatusEnum.DEGRADED
else:
provider.status = ProviderStatusEnum.OFFLINE
db.commit()
logger.info(f"Health check for {provider.name}: {status.value} ({result['response_time_ms']}ms)")
except Exception as e:
logger.error(f"Health check failed for {provider.name}: {e}")
def get_health_endpoint(self, provider: Provider) -> str:
"""Get health check endpoint for provider"""
endpoints = {
"CoinGecko": f"{provider.endpoint_url}/ping",
"CoinMarketCap": f"{provider.endpoint_url}/cryptocurrency/map?limit=1",
"Etherscan": f"{provider.endpoint_url}?module=stats&action=ethsupply&apikey={config.API_KEYS['etherscan'][0] if config.API_KEYS['etherscan'] else ''}",
"BscScan": f"{provider.endpoint_url}?module=stats&action=bnbsupply&apikey={config.API_KEYS['bscscan'][0] if config.API_KEYS['bscscan'] else ''}",
"TronScan": f"{provider.endpoint_url}/system/status",
"CryptoPanic": f"{provider.endpoint_url}/posts/?auth_token=free&public=true",
"Alternative.me": f"{provider.endpoint_url}/fng/",
"CryptoCompare": f"{provider.endpoint_url}/price?fsym=BTC&tsyms=USD",
"Binance": f"{provider.endpoint_url}/ping",
"NewsAPI": f"{provider.endpoint_url}/news?language=en&category=technology",
"The Graph": "https://api.thegraph.com/index-node/graphql",
"Blockchair": f"{provider.endpoint_url}/bitcoin/stats"
}
return endpoints.get(provider.name, provider.endpoint_url)
def get_headers(self, provider: Provider) -> dict:
"""Get headers for provider"""
headers = {"User-Agent": "CryptoMonitor/1.0"}
if provider.name == "CoinMarketCap" and config.API_KEYS["coinmarketcap"]:
headers["X-CMC_PRO_API_KEY"] = config.API_KEYS["coinmarketcap"][0]
elif provider.name == "TronScan" and config.API_KEYS["tronscan"]:
headers["TRON-PRO-API-KEY"] = config.API_KEYS["tronscan"][0]
elif provider.name == "CryptoCompare" and config.API_KEYS["cryptocompare"]:
headers["authorization"] = f"Apikey {config.API_KEYS['cryptocompare'][0]}"
elif provider.name == "NewsAPI" and config.API_KEYS["newsapi"]:
headers["X-ACCESS-KEY"] = config.API_KEYS["newsapi"][0]
return headers
def stop(self):
"""Stop health monitoring"""
self.running = False
logger.info("Health monitoring stopped")
# Global instance
health_monitor = HealthMonitor()