"""Async collectors that power the FastAPI endpoints.""" from __future__ import annotations import asyncio import json import logging import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional import httpx from config import CACHE_TTL, COIN_SYMBOL_MAPPING, USER_AGENT, get_settings logger = logging.getLogger(__name__) settings = get_settings() class CollectorError(RuntimeError): """Raised when a provider fails to return data.""" def __init__(self, message: str, provider: Optional[str] = None, status_code: Optional[int] = None): super().__init__(message) self.provider = provider self.status_code = status_code @dataclass class CacheEntry: value: Any expires_at: float class TTLCache: """Simple in-memory TTL cache safe for async usage.""" def __init__(self, ttl: int = CACHE_TTL) -> None: self.ttl = ttl or CACHE_TTL self._store: Dict[str, CacheEntry] = {} self._lock = asyncio.Lock() async def get(self, key: str) -> Any: async with self._lock: entry = self._store.get(key) if not entry: return None if entry.expires_at < time.time(): self._store.pop(key, None) return None return entry.value async def set(self, key: str, value: Any) -> None: async with self._lock: self._store[key] = CacheEntry(value=value, expires_at=time.time() + self.ttl) class ProvidersRegistry: """Utility that loads provider definitions from disk.""" def __init__(self, path: Optional[Path] = None) -> None: self.path = Path(path or settings.providers_config_path) self._providers: Dict[str, Any] = {} self._load() def _load(self) -> None: if not self.path.exists(): logger.warning("Providers config not found at %s", self.path) self._providers = {} return with self.path.open("r", encoding="utf-8") as handle: data = json.load(handle) self._providers = data.get("providers", {}) @property def providers(self) -> Dict[str, Any]: return self._providers class MarketDataCollector: """Fetch market data from public providers with caching and fallbacks.""" def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: self.registry = registry or ProvidersRegistry() self.cache = TTLCache(settings.cache_ttl) self._symbol_map = {symbol.lower(): coin_id for coin_id, symbol in COIN_SYMBOL_MAPPING.items()} self.headers = {"User-Agent": settings.user_agent or USER_AGENT} self.timeout = 15.0 async def _request(self, provider_key: str, path: str, params: Optional[Dict[str, Any]] = None) -> Any: provider = self.registry.providers.get(provider_key) if not provider: raise CollectorError(f"Provider {provider_key} not configured", provider=provider_key) url = provider["base_url"].rstrip("/") + path async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: response = await client.get(url, params=params) if response.status_code != 200: raise CollectorError( f"{provider_key} request failed with HTTP {response.status_code}", provider=provider_key, status_code=response.status_code, ) return response.json() async def get_top_coins(self, limit: int = 10) -> List[Dict[str, Any]]: cache_key = f"top_coins:{limit}" cached = await self.cache.get(cache_key) if cached: return cached providers = ["coingecko", "coincap"] last_error: Optional[Exception] = None for provider in providers: try: if provider == "coingecko": data = await self._request( "coingecko", "/coins/markets", { "vs_currency": "usd", "order": "market_cap_desc", "per_page": limit, "page": 1, "sparkline": "false", "price_change_percentage": "24h", }, ) coins = [ { "name": item.get("name"), "symbol": item.get("symbol", "").upper(), "price": item.get("current_price"), "change_24h": item.get("price_change_percentage_24h"), "market_cap": item.get("market_cap"), "volume_24h": item.get("total_volume"), "rank": item.get("market_cap_rank"), "last_updated": item.get("last_updated"), } for item in data ] await self.cache.set(cache_key, coins) return coins if provider == "coincap": data = await self._request("coincap", "/assets", {"limit": limit}) coins = [ { "name": item.get("name"), "symbol": item.get("symbol", "").upper(), "price": float(item.get("priceUsd", 0)), "change_24h": float(item.get("changePercent24Hr", 0)), "market_cap": float(item.get("marketCapUsd", 0)), "volume_24h": float(item.get("volumeUsd24Hr", 0)), "rank": int(item.get("rank", 0)), } for item in data.get("data", []) ] await self.cache.set(cache_key, coins) return coins except Exception as exc: # pragma: no cover - network heavy last_error = exc logger.warning("Provider %s failed: %s", provider, exc) raise CollectorError("Unable to fetch top coins", provider=str(last_error)) async def _coin_id(self, symbol: str) -> str: symbol_lower = symbol.lower() if symbol_lower in self._symbol_map: return self._symbol_map[symbol_lower] cache_key = "coingecko:symbols" cached = await self.cache.get(cache_key) if cached: mapping = cached else: data = await self._request("coingecko", "/coins/list") mapping = {item["symbol"].lower(): item["id"] for item in data} await self.cache.set(cache_key, mapping) if symbol_lower not in mapping: raise CollectorError(f"Unknown symbol: {symbol}") return mapping[symbol_lower] async def get_coin_details(self, symbol: str) -> Dict[str, Any]: coin_id = await self._coin_id(symbol) cache_key = f"coin:{coin_id}" cached = await self.cache.get(cache_key) if cached: return cached data = await self._request( "coingecko", f"/coins/{coin_id}", {"localization": "false", "tickers": "false", "market_data": "true"}, ) market_data = data.get("market_data", {}) coin = { "id": coin_id, "name": data.get("name"), "symbol": data.get("symbol", "").upper(), "description": data.get("description", {}).get("en"), "homepage": data.get("links", {}).get("homepage", [None])[0], "price": market_data.get("current_price", {}).get("usd"), "market_cap": market_data.get("market_cap", {}).get("usd"), "volume_24h": market_data.get("total_volume", {}).get("usd"), "change_24h": market_data.get("price_change_percentage_24h"), "high_24h": market_data.get("high_24h", {}).get("usd"), "low_24h": market_data.get("low_24h", {}).get("usd"), "circulating_supply": market_data.get("circulating_supply"), "total_supply": market_data.get("total_supply"), "ath": market_data.get("ath", {}).get("usd"), "atl": market_data.get("atl", {}).get("usd"), "last_updated": data.get("last_updated"), } await self.cache.set(cache_key, coin) return coin async def get_market_stats(self) -> Dict[str, Any]: cache_key = "market:stats" cached = await self.cache.get(cache_key) if cached: return cached global_data = await self._request("coingecko", "/global") stats = global_data.get("data", {}) market = { "total_market_cap": stats.get("total_market_cap", {}).get("usd"), "total_volume_24h": stats.get("total_volume", {}).get("usd"), "market_cap_change_percentage_24h": stats.get("market_cap_change_percentage_24h_usd"), "btc_dominance": stats.get("market_cap_percentage", {}).get("btc"), "eth_dominance": stats.get("market_cap_percentage", {}).get("eth"), "active_cryptocurrencies": stats.get("active_cryptocurrencies"), "markets": stats.get("markets"), "updated_at": stats.get("updated_at"), } await self.cache.set(cache_key, market) return market async def get_price_history(self, symbol: str, timeframe: str = "7d") -> List[Dict[str, Any]]: coin_id = await self._coin_id(symbol) mapping = {"1d": 1, "7d": 7, "30d": 30, "90d": 90} days = mapping.get(timeframe, 7) cache_key = f"history:{coin_id}:{days}" cached = await self.cache.get(cache_key) if cached: return cached data = await self._request( "coingecko", f"/coins/{coin_id}/market_chart", {"vs_currency": "usd", "days": days}, ) prices = [ { "timestamp": datetime.fromtimestamp(point[0] / 1000, tz=timezone.utc).isoformat(), "price": round(point[1], 4), } for point in data.get("prices", []) ] await self.cache.set(cache_key, prices) return prices async def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: """Return OHLCV data from Binance with caching and validation.""" cache_key = f"ohlcv:{symbol.upper()}:{interval}:{limit}" cached = await self.cache.get(cache_key) if cached: return cached params = {"symbol": symbol.upper(), "interval": interval, "limit": min(max(limit, 1), 1000)} data = await self._request("binance", "/klines", params) candles: List[Dict[str, Any]] = [] for item in data: try: candles.append( { "timestamp": datetime.fromtimestamp(item[0] / 1000, tz=timezone.utc).isoformat(), "open": float(item[1]), "high": float(item[2]), "low": float(item[3]), "close": float(item[4]), "volume": float(item[5]), } ) except (TypeError, ValueError): # pragma: no cover - defensive continue if not candles: raise CollectorError(f"No OHLCV data returned for {symbol}", provider="binance") await self.cache.set(cache_key, candles) return candles class NewsCollector: """Fetch latest crypto news.""" def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: self.registry = registry or ProvidersRegistry() self.cache = TTLCache(settings.cache_ttl) self.headers = {"User-Agent": settings.user_agent or USER_AGENT} self.timeout = 15.0 async def get_latest_news(self, limit: int = 10) -> List[Dict[str, Any]]: cache_key = f"news:{limit}" cached = await self.cache.get(cache_key) if cached: return cached url = "https://min-api.cryptocompare.com/data/v2/news/" params = {"lang": "EN"} async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: response = await client.get(url, params=params) if response.status_code != 200: raise CollectorError(f"News provider error: HTTP {response.status_code}") payload = response.json() items = [] for entry in payload.get("Data", [])[:limit]: published = datetime.fromtimestamp(entry.get("published_on", 0), tz=timezone.utc) items.append( { "id": entry.get("id"), "title": entry.get("title"), "body": entry.get("body"), "url": entry.get("url"), "source": entry.get("source"), "categories": entry.get("categories"), "published_at": published.isoformat(), } ) await self.cache.set(cache_key, items) return items class ProviderStatusCollector: """Perform lightweight health checks against configured providers.""" def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: self.registry = registry or ProvidersRegistry() self.cache = TTLCache(max(settings.cache_ttl, 600)) self.headers = {"User-Agent": settings.user_agent or USER_AGENT} self.timeout = 8.0 async def _check_provider(self, client: httpx.AsyncClient, provider_id: str, data: Dict[str, Any]) -> Dict[str, Any]: url = data.get("health_check") or data.get("base_url") start = time.perf_counter() try: response = await client.get(url, timeout=self.timeout) latency = round((time.perf_counter() - start) * 1000, 2) status = "online" if response.status_code < 400 else "degraded" return { "provider_id": provider_id, "name": data.get("name", provider_id), "category": data.get("category"), "status": status, "status_code": response.status_code, "latency_ms": latency, } except Exception as exc: # pragma: no cover - network heavy logger.warning("Provider %s health check failed: %s", provider_id, exc) return { "provider_id": provider_id, "name": data.get("name", provider_id), "category": data.get("category"), "status": "offline", "status_code": None, "latency_ms": None, "error": str(exc), } async def get_providers_status(self) -> List[Dict[str, Any]]: cached = await self.cache.get("providers_status") if cached: return cached providers = self.registry.providers if not providers: return [] results: List[Dict[str, Any]] = [] async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: tasks = [self._check_provider(client, pid, data) for pid, data in providers.items()] for chunk in asyncio.as_completed(tasks): results.append(await chunk) await self.cache.set("providers_status", results) return results __all__ = [ "CollectorError", "MarketDataCollector", "NewsCollector", "ProviderStatusCollector", ]