Really-amin's picture
Upload 17 files
6b51475 verified
"""
Market Data Collectors
Fetches cryptocurrency market data from CoinGecko, CoinMarketCap, and Binance
"""
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from utils.api_client import get_client
from utils.logger import setup_logger, log_api_request, log_error
from config import config
logger = setup_logger("market_data_collector")
def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]:
"""
Calculate staleness in minutes from data timestamp to now
Args:
data_timestamp: Timestamp of the data
Returns:
Staleness in minutes or None if timestamp not available
"""
if not data_timestamp:
return None
now = datetime.now(timezone.utc)
if data_timestamp.tzinfo is None:
data_timestamp = data_timestamp.replace(tzinfo=timezone.utc)
delta = now - data_timestamp
return delta.total_seconds() / 60.0
async def get_coingecko_simple_price() -> Dict[str, Any]:
"""
Fetch BTC, ETH, BNB prices from CoinGecko simple/price endpoint
Returns:
Dict with provider, category, data, timestamp, staleness, success, error
"""
provider = "CoinGecko"
category = "market_data"
endpoint = "/simple/price"
logger.info(f"Fetching simple price from {provider}")
try:
client = get_client()
provider_config = config.get_provider(provider)
if not provider_config:
error_msg = f"Provider {provider} not configured"
log_error(logger, provider, "config_error", error_msg, endpoint)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg
}
# Build request URL
url = f"{provider_config.endpoint_url}{endpoint}"
params = {
"ids": "bitcoin,ethereum,binancecoin",
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
"include_last_updated_at": "true"
}
# Make request
response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000)
# Log request
log_api_request(
logger,
provider,
endpoint,
response.get("response_time_ms", 0),
"success" if response["success"] else "error",
response.get("status_code")
)
if not response["success"]:
error_msg = response.get("error_message", "Unknown error")
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg,
"error_type": response.get("error_type")
}
# Extract data
data = response["data"]
# Parse timestamps from response
data_timestamp = None
if isinstance(data, dict):
# CoinGecko returns last_updated_at as Unix timestamp
for coin_data in data.values():
if isinstance(coin_data, dict) and "last_updated_at" in coin_data:
data_timestamp = datetime.fromtimestamp(
coin_data["last_updated_at"],
tz=timezone.utc
)
break
staleness = calculate_staleness_minutes(data_timestamp)
logger.info(
f"{provider} - {endpoint} - Retrieved {len(data) if isinstance(data, dict) else 0} coins, "
f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A"
)
return {
"provider": provider,
"category": category,
"data": data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data_timestamp": data_timestamp.isoformat() if data_timestamp else None,
"staleness_minutes": staleness,
"success": True,
"error": None,
"response_time_ms": response.get("response_time_ms", 0)
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg,
"error_type": "exception"
}
async def get_coinmarketcap_quotes() -> Dict[str, Any]:
"""
Fetch BTC, ETH, BNB market data from CoinMarketCap quotes endpoint
Returns:
Dict with provider, category, data, timestamp, staleness, success, error
"""
provider = "CoinMarketCap"
category = "market_data"
endpoint = "/cryptocurrency/quotes/latest"
logger.info(f"Fetching quotes from {provider}")
try:
client = get_client()
provider_config = config.get_provider(provider)
if not provider_config:
error_msg = f"Provider {provider} not configured"
log_error(logger, provider, "config_error", error_msg, endpoint)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg
}
# Check if API key is available
if provider_config.requires_key and not provider_config.api_key:
error_msg = f"API key required but not configured for {provider}"
log_error(logger, provider, "auth_error", error_msg, endpoint)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg,
"error_type": "missing_api_key"
}
# Build request
url = f"{provider_config.endpoint_url}{endpoint}"
headers = {
"X-CMC_PRO_API_KEY": provider_config.api_key,
"Accept": "application/json"
}
params = {
"symbol": "BTC,ETH,BNB",
"convert": "USD"
}
# Make request
response = await client.get(
url,
headers=headers,
params=params,
timeout=provider_config.timeout_ms // 1000
)
# Log request
log_api_request(
logger,
provider,
endpoint,
response.get("response_time_ms", 0),
"success" if response["success"] else "error",
response.get("status_code")
)
if not response["success"]:
error_msg = response.get("error_message", "Unknown error")
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg,
"error_type": response.get("error_type")
}
# Extract data
data = response["data"]
# Parse timestamp from response
data_timestamp = None
if isinstance(data, dict) and "data" in data:
# CoinMarketCap response structure
for coin_data in data["data"].values():
if isinstance(coin_data, dict) and "quote" in coin_data:
quote = coin_data.get("quote", {}).get("USD", {})
if "last_updated" in quote:
try:
data_timestamp = datetime.fromisoformat(
quote["last_updated"].replace("Z", "+00:00")
)
break
except:
pass
staleness = calculate_staleness_minutes(data_timestamp)
coin_count = len(data.get("data", {})) if isinstance(data, dict) else 0
logger.info(
f"{provider} - {endpoint} - Retrieved {coin_count} coins, "
f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A"
)
return {
"provider": provider,
"category": category,
"data": data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data_timestamp": data_timestamp.isoformat() if data_timestamp else None,
"staleness_minutes": staleness,
"success": True,
"error": None,
"response_time_ms": response.get("response_time_ms", 0)
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg,
"error_type": "exception"
}
async def get_binance_ticker() -> Dict[str, Any]:
"""
Fetch ticker data from Binance public API (24hr ticker)
Returns:
Dict with provider, category, data, timestamp, staleness, success, error
"""
provider = "Binance"
category = "market_data"
endpoint = "/api/v3/ticker/24hr"
logger.info(f"Fetching 24hr ticker from {provider}")
try:
client = get_client()
# Binance API base URL
url = f"https://api.binance.com{endpoint}"
params = {
"symbols": '["BTCUSDT","ETHUSDT","BNBUSDT"]'
}
# Make request
response = await client.get(url, params=params, timeout=10)
# Log request
log_api_request(
logger,
provider,
endpoint,
response.get("response_time_ms", 0),
"success" if response["success"] else "error",
response.get("status_code")
)
if not response["success"]:
error_msg = response.get("error_message", "Unknown error")
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg,
"error_type": response.get("error_type")
}
# Extract data
data = response["data"]
# Parse timestamp from response
# Binance returns closeTime as Unix timestamp in milliseconds
data_timestamp = None
if isinstance(data, list) and len(data) > 0:
first_ticker = data[0]
if isinstance(first_ticker, dict) and "closeTime" in first_ticker:
try:
data_timestamp = datetime.fromtimestamp(
first_ticker["closeTime"] / 1000,
tz=timezone.utc
)
except:
pass
staleness = calculate_staleness_minutes(data_timestamp)
ticker_count = len(data) if isinstance(data, list) else 0
logger.info(
f"{provider} - {endpoint} - Retrieved {ticker_count} tickers, "
f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A"
)
return {
"provider": provider,
"category": category,
"data": data,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data_timestamp": data_timestamp.isoformat() if data_timestamp else None,
"staleness_minutes": staleness,
"success": True,
"error": None,
"response_time_ms": response.get("response_time_ms", 0)
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True)
return {
"provider": provider,
"category": category,
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": error_msg,
"error_type": "exception"
}
async def collect_market_data() -> List[Dict[str, Any]]:
"""
Main function to collect market data from all sources
Returns:
List of results from all market data collectors
"""
logger.info("Starting market data collection from all sources")
# Run all collectors concurrently
results = await asyncio.gather(
get_coingecko_simple_price(),
get_coinmarketcap_quotes(),
get_binance_ticker(),
return_exceptions=True
)
# Process results
processed_results = []
for result in results:
if isinstance(result, Exception):
logger.error(f"Collector failed with exception: {str(result)}")
processed_results.append({
"provider": "Unknown",
"category": "market_data",
"data": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"staleness_minutes": None,
"success": False,
"error": str(result),
"error_type": "exception"
})
else:
processed_results.append(result)
# Log summary
successful = sum(1 for r in processed_results if r.get("success", False))
logger.info(f"Market data collection complete: {successful}/{len(processed_results)} successful")
return processed_results
class MarketDataCollector:
"""
Market Data Collector class for WebSocket streaming interface
Wraps the standalone market data collection functions
"""
def __init__(self, config: Any = None):
"""
Initialize the market data collector
Args:
config: Configuration object (optional, for compatibility)
"""
self.config = config
self.logger = logger
async def collect(self) -> Dict[str, Any]:
"""
Collect market data from all sources
Returns:
Dict with aggregated market data
"""
results = await collect_market_data()
# Aggregate data for WebSocket streaming
aggregated = {
"prices": {},
"volumes": {},
"market_caps": {},
"price_changes": {},
"sources": [],
"timestamp": datetime.now(timezone.utc).isoformat()
}
for result in results:
if result.get("success") and result.get("data"):
provider = result.get("provider", "unknown")
aggregated["sources"].append(provider)
data = result["data"]
# Parse CoinGecko data
if provider == "CoinGecko" and isinstance(data, dict):
for coin_id, coin_data in data.items():
if isinstance(coin_data, dict):
symbol = coin_id.upper()
if "usd" in coin_data:
aggregated["prices"][symbol] = coin_data["usd"]
if "usd_market_cap" in coin_data:
aggregated["market_caps"][symbol] = coin_data["usd_market_cap"]
if "usd_24h_vol" in coin_data:
aggregated["volumes"][symbol] = coin_data["usd_24h_vol"]
if "usd_24h_change" in coin_data:
aggregated["price_changes"][symbol] = coin_data["usd_24h_change"]
# Parse CoinMarketCap data
elif provider == "CoinMarketCap" and isinstance(data, dict):
if "data" in data:
for symbol, coin_data in data["data"].items():
if isinstance(coin_data, dict) and "quote" in coin_data:
quote = coin_data.get("quote", {}).get("USD", {})
if "price" in quote:
aggregated["prices"][symbol] = quote["price"]
if "market_cap" in quote:
aggregated["market_caps"][symbol] = quote["market_cap"]
if "volume_24h" in quote:
aggregated["volumes"][symbol] = quote["volume_24h"]
if "percent_change_24h" in quote:
aggregated["price_changes"][symbol] = quote["percent_change_24h"]
# Parse Binance data
elif provider == "Binance" and isinstance(data, list):
for ticker in data:
if isinstance(ticker, dict):
symbol = ticker.get("symbol", "").replace("USDT", "")
if "lastPrice" in ticker:
aggregated["prices"][symbol] = float(ticker["lastPrice"])
if "volume" in ticker:
aggregated["volumes"][symbol] = float(ticker["volume"])
if "priceChangePercent" in ticker:
aggregated["price_changes"][symbol] = float(ticker["priceChangePercent"])
return aggregated
# Example usage
if __name__ == "__main__":
async def main():
results = await collect_market_data()
print("\n=== Market Data Collection Results ===")
for result in results:
print(f"\nProvider: {result['provider']}")
print(f"Success: {result['success']}")
print(f"Staleness: {result.get('staleness_minutes', 'N/A')} minutes")
if result['success']:
print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms")
else:
print(f"Error: {result.get('error', 'Unknown')}")
asyncio.run(main())