File size: 3,954 Bytes
e4e4574 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
"""Base provider interface for data sources"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Optional
from datetime import datetime
import time
import httpx
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from core.models import OHLCV, Price, ProviderHealth
class CircuitBreaker:
"""Circuit breaker for provider failures"""
def __init__(self, threshold: int = 5, timeout: int = 60):
self.threshold = threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time: Optional[float] = None
self.is_open = False
def record_success(self):
"""Record successful request"""
self.failures = 0
self.is_open = False
def record_failure(self):
"""Record failed request"""
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.threshold:
self.is_open = True
def can_attempt(self) -> bool:
"""Check if we can attempt a request"""
if not self.is_open:
return True
# Check if timeout has passed
if self.last_failure_time:
elapsed = time.time() - self.last_failure_time
if elapsed >= self.timeout:
self.is_open = False
self.failures = 0
return True
return False
class BaseProvider(ABC):
"""Base class for all data providers"""
def __init__(self, name: str, base_url: str, timeout: int = 10):
self.name = name
self.base_url = base_url
self.timeout = timeout
self.circuit_breaker = CircuitBreaker()
self.last_latency: Optional[int] = None
self.last_check: Optional[datetime] = None
self.last_error: Optional[str] = None
self.client: Optional[httpx.AsyncClient] = None
async def get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client"""
if self.client is None:
self.client = httpx.AsyncClient(timeout=self.timeout)
return self.client
async def close(self):
"""Close HTTP client"""
if self.client:
await self.client.aclose()
self.client = None
async def _make_request(self, url: str, params: Optional[dict] = None) -> dict:
"""Make HTTP request with timing and error handling"""
if not self.circuit_breaker.can_attempt():
raise Exception(f"Circuit breaker open for {self.name}")
client = await self.get_client()
start_time = time.time()
try:
response = await client.get(url, params=params)
response.raise_for_status()
self.last_latency = int((time.time() - start_time) * 1000)
self.last_check = datetime.now()
self.last_error = None
self.circuit_breaker.record_success()
return response.json()
except Exception as e:
self.last_error = str(e)
self.circuit_breaker.record_failure()
raise
@abstractmethod
async def fetch_ohlcv(self, symbol: str, interval: str, limit: int) -> List[OHLCV]:
"""Fetch OHLCV data"""
pass
@abstractmethod
async def fetch_prices(self, symbols: List[str]) -> List[Price]:
"""Fetch current prices"""
pass
async def get_health(self) -> ProviderHealth:
"""Get provider health status"""
if self.circuit_breaker.is_open:
status = "offline"
elif self.last_error:
status = "degraded"
else:
status = "online"
return ProviderHealth(
name=self.name,
status=status,
latency=self.last_latency,
lastCheck=self.last_check.isoformat() if self.last_check else datetime.now().isoformat(),
errorMessage=self.last_error
)
|