|
|
"""Data aggregator with multi-provider fallback""" |
|
|
from __future__ import annotations |
|
|
from typing import List, Optional |
|
|
from datetime import datetime |
|
|
import time |
|
|
import logging |
|
|
import sys |
|
|
import os |
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) |
|
|
|
|
|
from providers import BinanceProvider, CoinGeckoProvider, KrakenProvider, CoinCapProvider |
|
|
from core.models import ( |
|
|
OHLCV, Price, SentimentData, FearGreedIndex, NewsSentiment, |
|
|
OverallSentiment, MarketOverview, ProviderHealth |
|
|
) |
|
|
from core.config import settings |
|
|
from core.cache import cache, cache_key, get_or_set |
|
|
import httpx |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DataAggregator: |
|
|
"""Aggregates data from multiple providers with fallback""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.ohlcv_providers = [ |
|
|
BinanceProvider(), |
|
|
KrakenProvider(), |
|
|
] |
|
|
|
|
|
self.price_providers = [ |
|
|
CoinGeckoProvider(api_key=settings.COINGECKO_API_KEY), |
|
|
CoinCapProvider(), |
|
|
BinanceProvider(), |
|
|
] |
|
|
|
|
|
self.market_provider = CoinGeckoProvider(api_key=settings.COINGECKO_API_KEY) |
|
|
|
|
|
self.start_time = time.time() |
|
|
|
|
|
async def close(self): |
|
|
"""Close all provider connections""" |
|
|
for provider in self.ohlcv_providers + self.price_providers: |
|
|
await provider.close() |
|
|
|
|
|
async def fetch_ohlcv( |
|
|
self, |
|
|
symbol: str, |
|
|
interval: str = "1h", |
|
|
limit: int = 100 |
|
|
) -> tuple[List[OHLCV], str]: |
|
|
"""Fetch OHLCV data with provider fallback""" |
|
|
|
|
|
|
|
|
for provider in self.ohlcv_providers: |
|
|
try: |
|
|
logger.info(f"Trying {provider.name} for OHLCV data: {symbol} {interval}") |
|
|
data = await provider.fetch_ohlcv(symbol, interval, limit) |
|
|
|
|
|
if data and len(data) > 0: |
|
|
logger.info(f"Successfully fetched {len(data)} candles from {provider.name}") |
|
|
return data, provider.name |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Provider {provider.name} failed: {e}") |
|
|
continue |
|
|
|
|
|
raise Exception("All OHLCV providers failed") |
|
|
|
|
|
async def fetch_prices(self, symbols: List[str]) -> tuple[List[Price], str]: |
|
|
"""Fetch prices with aggregation from multiple providers""" |
|
|
|
|
|
all_prices = {} |
|
|
sources_used = [] |
|
|
|
|
|
|
|
|
for provider in self.price_providers: |
|
|
try: |
|
|
logger.info(f"Fetching prices from {provider.name}") |
|
|
prices = await provider.fetch_prices(symbols) |
|
|
|
|
|
for price in prices: |
|
|
if price.symbol not in all_prices: |
|
|
all_prices[price.symbol] = [] |
|
|
all_prices[price.symbol].append((provider.name, price)) |
|
|
|
|
|
sources_used.append(provider.name) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Provider {provider.name} failed for prices: {e}") |
|
|
continue |
|
|
|
|
|
if not all_prices: |
|
|
raise Exception("All price providers failed") |
|
|
|
|
|
|
|
|
aggregated = [] |
|
|
for symbol, price_list in all_prices.items(): |
|
|
if price_list: |
|
|
|
|
|
|
|
|
_, price = price_list[0] |
|
|
aggregated.append(price) |
|
|
|
|
|
source_str = "+".join(sources_used) if sources_used else "multi-provider" |
|
|
|
|
|
return aggregated, source_str |
|
|
|
|
|
async def fetch_fear_greed_index(self) -> FearGreedIndex: |
|
|
"""Fetch Fear & Greed Index from Alternative.me""" |
|
|
try: |
|
|
async with httpx.AsyncClient(timeout=10) as client: |
|
|
response = await client.get("https://api.alternative.me/fng/") |
|
|
data = response.json() |
|
|
|
|
|
if "data" in data and len(data["data"]) > 0: |
|
|
fng_data = data["data"][0] |
|
|
return FearGreedIndex( |
|
|
value=int(fng_data["value"]), |
|
|
classification=fng_data["value_classification"], |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to fetch Fear & Greed Index: {e}") |
|
|
|
|
|
|
|
|
return FearGreedIndex( |
|
|
value=50, |
|
|
classification="Neutral", |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
async def fetch_sentiment(self) -> SentimentData: |
|
|
"""Fetch sentiment data""" |
|
|
fear_greed = await self.fetch_fear_greed_index() |
|
|
|
|
|
|
|
|
if fear_greed.value >= 75: |
|
|
sentiment = "extreme_greed" |
|
|
score = fear_greed.value |
|
|
elif fear_greed.value >= 55: |
|
|
sentiment = "bullish" |
|
|
score = fear_greed.value |
|
|
elif fear_greed.value >= 45: |
|
|
sentiment = "neutral" |
|
|
score = fear_greed.value |
|
|
elif fear_greed.value >= 25: |
|
|
sentiment = "bearish" |
|
|
score = fear_greed.value |
|
|
else: |
|
|
sentiment = "extreme_fear" |
|
|
score = fear_greed.value |
|
|
|
|
|
return SentimentData( |
|
|
fearGreed=fear_greed, |
|
|
news=NewsSentiment(total=0), |
|
|
overall=OverallSentiment( |
|
|
sentiment=sentiment, |
|
|
score=score, |
|
|
confidence=0.8 |
|
|
) |
|
|
) |
|
|
|
|
|
async def fetch_market_overview(self) -> MarketOverview: |
|
|
"""Fetch market overview data""" |
|
|
try: |
|
|
market_data = await self.market_provider.fetch_market_data() |
|
|
|
|
|
return MarketOverview( |
|
|
totalMarketCap=market_data.get("total_market_cap", {}).get("usd", 0), |
|
|
totalVolume24h=market_data.get("total_volume", {}).get("usd", 0), |
|
|
btcDominance=market_data.get("market_cap_percentage", {}).get("btc", 0), |
|
|
ethDominance=market_data.get("market_cap_percentage", {}).get("eth", 0), |
|
|
activeCoins=market_data.get("active_cryptocurrencies", 0) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to fetch market overview: {e}") |
|
|
|
|
|
return MarketOverview( |
|
|
totalMarketCap=0, |
|
|
totalVolume24h=0, |
|
|
btcDominance=0, |
|
|
ethDominance=0, |
|
|
activeCoins=0 |
|
|
) |
|
|
|
|
|
async def get_all_provider_health(self) -> List[ProviderHealth]: |
|
|
"""Get health status of all providers""" |
|
|
all_providers = set(self.ohlcv_providers + self.price_providers + [self.market_provider]) |
|
|
health_list = [] |
|
|
|
|
|
for provider in all_providers: |
|
|
health = await provider.get_health() |
|
|
health_list.append(health) |
|
|
|
|
|
return health_list |
|
|
|
|
|
def get_uptime(self) -> int: |
|
|
"""Get service uptime in seconds""" |
|
|
return int(time.time() - self.start_time) |
|
|
|
|
|
|
|
|
|
|
|
aggregator: Optional[DataAggregator] = None |
|
|
|
|
|
|
|
|
def get_aggregator() -> DataAggregator: |
|
|
"""Get global aggregator instance""" |
|
|
global aggregator |
|
|
if aggregator is None: |
|
|
aggregator = DataAggregator() |
|
|
return aggregator |
|
|
|