|
|
""" |
|
|
Real-time API Health Monitoring Module |
|
|
Implements comprehensive health checks with rate limiting, failure tracking, and database persistence |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import time |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
from datetime import datetime |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
from utils.api_client import APIClient |
|
|
from config import config |
|
|
from monitoring.rate_limiter import rate_limiter |
|
|
from utils.logger import setup_logger, log_api_request, log_error |
|
|
from monitor import HealthCheckResult, HealthStatus |
|
|
from database import Database |
|
|
|
|
|
|
|
|
logger = setup_logger("health_checker") |
|
|
|
|
|
|
|
|
class HealthChecker: |
|
|
""" |
|
|
Real-time API health monitoring with rate limiting and failure tracking |
|
|
""" |
|
|
|
|
|
def __init__(self, db_path: str = "data/health_metrics.db"): |
|
|
""" |
|
|
Initialize health checker |
|
|
|
|
|
Args: |
|
|
db_path: Path to SQLite database |
|
|
""" |
|
|
self.api_client = APIClient( |
|
|
default_timeout=10, |
|
|
max_connections=50, |
|
|
retry_attempts=1, |
|
|
retry_delay=1.0 |
|
|
) |
|
|
self.db = Database(db_path) |
|
|
self.consecutive_failures: Dict[str, int] = defaultdict(int) |
|
|
|
|
|
|
|
|
self._initialize_rate_limiters() |
|
|
|
|
|
logger.info("HealthChecker initialized") |
|
|
|
|
|
def _initialize_rate_limiters(self): |
|
|
"""Configure rate limiters for all providers""" |
|
|
for provider in config.get_all_providers(): |
|
|
if provider.rate_limit_type and provider.rate_limit_value: |
|
|
rate_limiter.configure_limit( |
|
|
provider=provider.name, |
|
|
limit_type=provider.rate_limit_type, |
|
|
limit_value=provider.rate_limit_value |
|
|
) |
|
|
logger.info( |
|
|
f"Configured rate limit for {provider.name}: " |
|
|
f"{provider.rate_limit_value} {provider.rate_limit_type}" |
|
|
) |
|
|
|
|
|
async def check_provider(self, provider_name: str) -> Optional[HealthCheckResult]: |
|
|
""" |
|
|
Check single provider health |
|
|
|
|
|
Args: |
|
|
provider_name: Name of the provider to check |
|
|
|
|
|
Returns: |
|
|
HealthCheckResult object or None if provider not found |
|
|
""" |
|
|
provider = config.get_provider(provider_name) |
|
|
if not provider: |
|
|
logger.error(f"Provider not found: {provider_name}") |
|
|
return None |
|
|
|
|
|
|
|
|
can_proceed, reason = rate_limiter.can_make_request(provider.name) |
|
|
if not can_proceed: |
|
|
logger.warning(f"Rate limit blocked request to {provider.name}: {reason}") |
|
|
|
|
|
|
|
|
result = HealthCheckResult( |
|
|
provider_name=provider.name, |
|
|
category=provider.category, |
|
|
status=HealthStatus.DEGRADED, |
|
|
response_time=0, |
|
|
status_code=None, |
|
|
error_message=f"Rate limited: {reason}", |
|
|
timestamp=time.time(), |
|
|
endpoint_tested=provider.health_check_endpoint |
|
|
) |
|
|
|
|
|
|
|
|
self.db.save_health_check(result) |
|
|
return result |
|
|
|
|
|
|
|
|
result = await self._perform_health_check(provider) |
|
|
|
|
|
|
|
|
rate_limiter.record_request(provider.name) |
|
|
|
|
|
|
|
|
if result.status == HealthStatus.OFFLINE: |
|
|
self.consecutive_failures[provider.name] += 1 |
|
|
logger.warning( |
|
|
f"{provider.name} offline - consecutive failures: " |
|
|
f"{self.consecutive_failures[provider.name]}" |
|
|
) |
|
|
else: |
|
|
self.consecutive_failures[provider.name] = 0 |
|
|
|
|
|
|
|
|
if self.consecutive_failures[provider.name] >= 3: |
|
|
result = HealthCheckResult( |
|
|
provider_name=result.provider_name, |
|
|
category=result.category, |
|
|
status=HealthStatus.OFFLINE, |
|
|
response_time=result.response_time, |
|
|
status_code=result.status_code, |
|
|
error_message=f"3+ consecutive failures (count: {self.consecutive_failures[provider.name]})", |
|
|
timestamp=result.timestamp, |
|
|
endpoint_tested=result.endpoint_tested |
|
|
) |
|
|
|
|
|
|
|
|
self.db.save_health_check(result) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger=logger, |
|
|
provider=provider.name, |
|
|
endpoint=provider.health_check_endpoint, |
|
|
duration_ms=result.response_time, |
|
|
status=result.status.value, |
|
|
http_code=result.status_code, |
|
|
level="INFO" if result.status == HealthStatus.ONLINE else "WARNING" |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
async def check_all_providers(self) -> List[HealthCheckResult]: |
|
|
""" |
|
|
Check all configured providers |
|
|
|
|
|
Returns: |
|
|
List of HealthCheckResult objects |
|
|
""" |
|
|
providers = config.get_all_providers() |
|
|
logger.info(f"Starting health check for {len(providers)} providers") |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for i, provider in enumerate(providers): |
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
task = asyncio.create_task(self.check_provider(provider.name)) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
valid_results = [] |
|
|
for i, result in enumerate(results): |
|
|
if isinstance(result, HealthCheckResult): |
|
|
valid_results.append(result) |
|
|
elif isinstance(result, Exception): |
|
|
logger.error(f"Health check failed with exception: {result}", exc_info=True) |
|
|
|
|
|
provider = providers[i] |
|
|
failed_result = HealthCheckResult( |
|
|
provider_name=provider.name, |
|
|
category=provider.category, |
|
|
status=HealthStatus.OFFLINE, |
|
|
response_time=0, |
|
|
status_code=None, |
|
|
error_message=f"Exception: {str(result)[:200]}", |
|
|
timestamp=time.time(), |
|
|
endpoint_tested=provider.health_check_endpoint |
|
|
) |
|
|
self.db.save_health_check(failed_result) |
|
|
valid_results.append(failed_result) |
|
|
elif result is None: |
|
|
|
|
|
continue |
|
|
|
|
|
logger.info(f"Completed health check: {len(valid_results)} results") |
|
|
|
|
|
|
|
|
self._log_summary_stats(valid_results) |
|
|
|
|
|
return valid_results |
|
|
|
|
|
async def check_category(self, category: str) -> List[HealthCheckResult]: |
|
|
""" |
|
|
Check providers in a specific category |
|
|
|
|
|
Args: |
|
|
category: Category name (e.g., 'market_data', 'blockchain_explorers') |
|
|
|
|
|
Returns: |
|
|
List of HealthCheckResult objects |
|
|
""" |
|
|
providers = config.get_providers_by_category(category) |
|
|
|
|
|
if not providers: |
|
|
logger.warning(f"No providers found for category: {category}") |
|
|
return [] |
|
|
|
|
|
logger.info(f"Starting health check for category '{category}': {len(providers)} providers") |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for i, provider in enumerate(providers): |
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
task = asyncio.create_task(self.check_provider(provider.name)) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
valid_results = [] |
|
|
for result in results: |
|
|
if isinstance(result, HealthCheckResult): |
|
|
valid_results.append(result) |
|
|
elif isinstance(result, Exception): |
|
|
logger.error(f"Category check failed with exception: {result}", exc_info=True) |
|
|
|
|
|
logger.info(f"Completed category '{category}' check: {len(valid_results)} results") |
|
|
|
|
|
return valid_results |
|
|
|
|
|
async def _perform_health_check(self, provider) -> HealthCheckResult: |
|
|
""" |
|
|
Perform the actual health check HTTP request |
|
|
|
|
|
Args: |
|
|
provider: ProviderConfig object |
|
|
|
|
|
Returns: |
|
|
HealthCheckResult object |
|
|
""" |
|
|
endpoint = provider.health_check_endpoint |
|
|
|
|
|
|
|
|
headers = {} |
|
|
params = {} |
|
|
|
|
|
|
|
|
if provider.requires_key and provider.api_key: |
|
|
if 'coinmarketcap' in provider.name.lower(): |
|
|
headers['X-CMC_PRO_API_KEY'] = provider.api_key |
|
|
elif 'cryptocompare' in provider.name.lower(): |
|
|
headers['authorization'] = f'Apikey {provider.api_key}' |
|
|
elif 'newsapi' in provider.name.lower() or 'newsdata' in endpoint.lower(): |
|
|
params['apikey'] = provider.api_key |
|
|
elif 'etherscan' in provider.name.lower() or 'bscscan' in provider.name.lower(): |
|
|
params['apikey'] = provider.api_key |
|
|
elif 'tronscan' in provider.name.lower(): |
|
|
headers['TRON-PRO-API-KEY'] = provider.api_key |
|
|
else: |
|
|
|
|
|
params['apikey'] = provider.api_key |
|
|
|
|
|
|
|
|
timeout = (provider.timeout_ms or 10000) / 1000.0 |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
response = await self.api_client.request( |
|
|
method='GET', |
|
|
url=endpoint, |
|
|
headers=headers if headers else None, |
|
|
params=params if params else None, |
|
|
timeout=int(timeout), |
|
|
retry=False |
|
|
) |
|
|
|
|
|
|
|
|
success = response.get('success', False) |
|
|
status_code = response.get('status_code', 0) |
|
|
response_time_ms = response.get('response_time_ms', 0) |
|
|
error_type = response.get('error_type') |
|
|
error_message = response.get('error_message') |
|
|
|
|
|
|
|
|
status = self._determine_health_status( |
|
|
success=success, |
|
|
status_code=status_code, |
|
|
response_time_ms=response_time_ms, |
|
|
error_type=error_type |
|
|
) |
|
|
|
|
|
|
|
|
final_error_message = None |
|
|
if not success: |
|
|
if error_message: |
|
|
final_error_message = error_message |
|
|
elif error_type: |
|
|
final_error_message = f"{error_type}: HTTP {status_code}" if status_code else error_type |
|
|
else: |
|
|
final_error_message = f"Request failed with status {status_code}" |
|
|
|
|
|
|
|
|
result = HealthCheckResult( |
|
|
provider_name=provider.name, |
|
|
category=provider.category, |
|
|
status=status, |
|
|
response_time=response_time_ms, |
|
|
status_code=status_code if status_code > 0 else None, |
|
|
error_message=final_error_message, |
|
|
timestamp=time.time(), |
|
|
endpoint_tested=endpoint |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
def _determine_health_status( |
|
|
self, |
|
|
success: bool, |
|
|
status_code: int, |
|
|
response_time_ms: float, |
|
|
error_type: Optional[str] |
|
|
) -> HealthStatus: |
|
|
""" |
|
|
Determine health status based on response metrics |
|
|
|
|
|
Rules: |
|
|
- ONLINE: status 200, response < 2000ms |
|
|
- DEGRADED: response 2000-5000ms OR status 4xx/5xx |
|
|
- OFFLINE: timeout OR status 0 (network error) |
|
|
|
|
|
Args: |
|
|
success: Whether request was successful |
|
|
status_code: HTTP status code |
|
|
response_time_ms: Response time in milliseconds |
|
|
error_type: Type of error if any |
|
|
|
|
|
Returns: |
|
|
HealthStatus enum value |
|
|
""" |
|
|
|
|
|
if error_type == 'timeout': |
|
|
return HealthStatus.OFFLINE |
|
|
|
|
|
if status_code == 0: |
|
|
return HealthStatus.OFFLINE |
|
|
|
|
|
|
|
|
if status_code >= 400: |
|
|
return HealthStatus.DEGRADED |
|
|
|
|
|
if response_time_ms >= 2000 and response_time_ms < 5000: |
|
|
return HealthStatus.DEGRADED |
|
|
|
|
|
if response_time_ms >= 5000: |
|
|
return HealthStatus.OFFLINE |
|
|
|
|
|
|
|
|
if status_code == 200 and response_time_ms < 2000: |
|
|
return HealthStatus.ONLINE |
|
|
|
|
|
|
|
|
if success and 200 <= status_code < 300 and response_time_ms < 2000: |
|
|
return HealthStatus.ONLINE |
|
|
|
|
|
|
|
|
return HealthStatus.DEGRADED |
|
|
|
|
|
def _log_summary_stats(self, results: List[HealthCheckResult]): |
|
|
""" |
|
|
Log summary statistics for health check results |
|
|
|
|
|
Args: |
|
|
results: List of HealthCheckResult objects |
|
|
""" |
|
|
if not results: |
|
|
return |
|
|
|
|
|
total = len(results) |
|
|
online = sum(1 for r in results if r.status == HealthStatus.ONLINE) |
|
|
degraded = sum(1 for r in results if r.status == HealthStatus.DEGRADED) |
|
|
offline = sum(1 for r in results if r.status == HealthStatus.OFFLINE) |
|
|
|
|
|
avg_response_time = sum(r.response_time for r in results) / total if total > 0 else 0 |
|
|
|
|
|
logger.info( |
|
|
f"Health Check Summary - Total: {total}, " |
|
|
f"Online: {online} ({online/total*100:.1f}%), " |
|
|
f"Degraded: {degraded} ({degraded/total*100:.1f}%), " |
|
|
f"Offline: {offline} ({offline/total*100:.1f}%), " |
|
|
f"Avg Response Time: {avg_response_time:.2f}ms" |
|
|
) |
|
|
|
|
|
def get_consecutive_failures(self, provider_name: str) -> int: |
|
|
""" |
|
|
Get consecutive failure count for a provider |
|
|
|
|
|
Args: |
|
|
provider_name: Provider name |
|
|
|
|
|
Returns: |
|
|
Number of consecutive failures |
|
|
""" |
|
|
return self.consecutive_failures.get(provider_name, 0) |
|
|
|
|
|
def reset_consecutive_failures(self, provider_name: str): |
|
|
""" |
|
|
Reset consecutive failure count for a provider |
|
|
|
|
|
Args: |
|
|
provider_name: Provider name |
|
|
""" |
|
|
if provider_name in self.consecutive_failures: |
|
|
self.consecutive_failures[provider_name] = 0 |
|
|
logger.info(f"Reset consecutive failures for {provider_name}") |
|
|
|
|
|
def get_all_consecutive_failures(self) -> Dict[str, int]: |
|
|
""" |
|
|
Get all consecutive failure counts |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping provider names to failure counts |
|
|
""" |
|
|
return dict(self.consecutive_failures) |
|
|
|
|
|
async def close(self): |
|
|
"""Close resources""" |
|
|
await self.api_client.close() |
|
|
logger.info("HealthChecker closed") |
|
|
|
|
|
|
|
|
|
|
|
def check_provider_sync(provider_name: str) -> Optional[HealthCheckResult]: |
|
|
""" |
|
|
Synchronous wrapper for checking a single provider |
|
|
|
|
|
Args: |
|
|
provider_name: Provider name |
|
|
|
|
|
Returns: |
|
|
HealthCheckResult object or None |
|
|
""" |
|
|
checker = HealthChecker() |
|
|
result = asyncio.run(checker.check_provider(provider_name)) |
|
|
asyncio.run(checker.close()) |
|
|
return result |
|
|
|
|
|
|
|
|
def check_all_providers_sync() -> List[HealthCheckResult]: |
|
|
""" |
|
|
Synchronous wrapper for checking all providers |
|
|
|
|
|
Returns: |
|
|
List of HealthCheckResult objects |
|
|
""" |
|
|
checker = HealthChecker() |
|
|
results = asyncio.run(checker.check_all_providers()) |
|
|
asyncio.run(checker.close()) |
|
|
return results |
|
|
|
|
|
|
|
|
def check_category_sync(category: str) -> List[HealthCheckResult]: |
|
|
""" |
|
|
Synchronous wrapper for checking a category |
|
|
|
|
|
Args: |
|
|
category: Category name |
|
|
|
|
|
Returns: |
|
|
List of HealthCheckResult objects |
|
|
""" |
|
|
checker = HealthChecker() |
|
|
results = asyncio.run(checker.check_category(category)) |
|
|
asyncio.run(checker.close()) |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
"""Example usage of HealthChecker""" |
|
|
checker = HealthChecker() |
|
|
|
|
|
|
|
|
print("\n=== Checking single provider: CoinGecko ===") |
|
|
result = await checker.check_provider('CoinGecko') |
|
|
if result: |
|
|
print(f"Status: {result.status.value}") |
|
|
print(f"Response Time: {result.response_time:.2f}ms") |
|
|
print(f"HTTP Code: {result.status_code}") |
|
|
print(f"Error: {result.error_message}") |
|
|
|
|
|
|
|
|
print("\n=== Checking all providers ===") |
|
|
results = await checker.check_all_providers() |
|
|
for r in results: |
|
|
print(f"{r.provider_name}: {r.status.value} ({r.response_time:.2f}ms)") |
|
|
|
|
|
|
|
|
print("\n=== Checking market_data category ===") |
|
|
market_results = await checker.check_category('market_data') |
|
|
for r in market_results: |
|
|
print(f"{r.provider_name}: {r.status.value} ({r.response_time:.2f}ms)") |
|
|
|
|
|
await checker.close() |
|
|
|
|
|
asyncio.run(main()) |
|
|
|