""" On-Chain Analytics Collectors Placeholder implementations for The Graph and Blockchair data collection These collectors are designed to be extended with actual implementations when on-chain data sources are integrated. """ import asyncio from datetime import datetime, timezone from typing import Dict, List, Optional, Any from utils.api_client import get_client from utils.logger import setup_logger, log_api_request, log_error logger = setup_logger("onchain_collector") def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: """ Calculate staleness in minutes from data timestamp to now Args: data_timestamp: Timestamp of the data Returns: Staleness in minutes or None if timestamp not available """ if not data_timestamp: return None now = datetime.now(timezone.utc) if data_timestamp.tzinfo is None: data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) delta = now - data_timestamp return delta.total_seconds() / 60.0 async def get_the_graph_data() -> Dict[str, Any]: """ Fetch on-chain data from The Graph protocol - Uniswap V3 subgraph The Graph is a decentralized protocol for indexing and querying blockchain data. This implementation queries the Uniswap V3 subgraph for DEX metrics. Returns: Dict with provider, category, data, timestamp, staleness, success, error """ provider = "TheGraph" category = "onchain_analytics" endpoint = "/subgraphs/uniswap-v3" logger.info(f"Fetching on-chain data from {provider}") try: client = get_client() # Uniswap V3 subgraph endpoint url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3" # GraphQL query to get top pools and overall stats query = """ { factories(first: 1) { totalVolumeUSD totalValueLockedUSD txCount } pools(first: 10, orderBy: totalValueLockedUSD, orderDirection: desc) { id token0 { symbol } token1 { symbol } totalValueLockedUSD volumeUSD txCount } } """ payload = {"query": query} headers = {"Content-Type": "application/json"} # Make request response = await client.post(url, json=payload, headers=headers, timeout=15) # Log request log_api_request( logger, provider, endpoint, response.get("response_time_ms", 0), "success" if response["success"] else "error", response.get("status_code") ) if not response["success"]: error_msg = response.get("error_message", "Unknown error") log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": response.get("error_type") } # Extract data raw_data = response["data"] graph_data = None if isinstance(raw_data, dict) and "data" in raw_data: data = raw_data["data"] factories = data.get("factories", []) pools = data.get("pools", []) if factories: factory = factories[0] graph_data = { "protocol": "Uniswap V3", "total_volume_usd": float(factory.get("totalVolumeUSD", 0)), "total_tvl_usd": float(factory.get("totalValueLockedUSD", 0)), "total_transactions": int(factory.get("txCount", 0)), "top_pools": [ { "pair": f"{pool.get('token0', {}).get('symbol', '?')}/{pool.get('token1', {}).get('symbol', '?')}", "tvl_usd": float(pool.get("totalValueLockedUSD", 0)), "volume_usd": float(pool.get("volumeUSD", 0)), "tx_count": int(pool.get("txCount", 0)) } for pool in pools ] } data_timestamp = datetime.now(timezone.utc) staleness = calculate_staleness_minutes(data_timestamp) logger.info( f"{provider} - {endpoint} - TVL: ${graph_data.get('total_tvl_usd', 0):,.0f}" if graph_data else f"{provider} - {endpoint} - No data" ) return { "provider": provider, "category": category, "data": graph_data, "timestamp": datetime.now(timezone.utc).isoformat(), "data_timestamp": data_timestamp.isoformat(), "staleness_minutes": staleness, "success": True, "error": None, "response_time_ms": response.get("response_time_ms", 0) } except Exception as e: error_msg = f"Unexpected error: {str(e)}" log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": "exception" } async def get_blockchair_data() -> Dict[str, Any]: """ Fetch blockchain statistics from Blockchair Blockchair is a blockchain explorer and analytics platform. This implementation fetches Bitcoin and Ethereum network statistics. Returns: Dict with provider, category, data, timestamp, staleness, success, error """ provider = "Blockchair" category = "onchain_analytics" endpoint = "/stats" logger.info(f"Fetching blockchain stats from {provider}") try: client = get_client() # Fetch stats for BTC and ETH btc_url = "https://api.blockchair.com/bitcoin/stats" eth_url = "https://api.blockchair.com/ethereum/stats" # Make concurrent requests btc_response, eth_response = await asyncio.gather( client.get(btc_url, timeout=10), client.get(eth_url, timeout=10), return_exceptions=True ) # Log requests if not isinstance(btc_response, Exception): log_api_request( logger, provider, f"{endpoint}/bitcoin", btc_response.get("response_time_ms", 0), "success" if btc_response["success"] else "error", btc_response.get("status_code") ) if not isinstance(eth_response, Exception): log_api_request( logger, provider, f"{endpoint}/ethereum", eth_response.get("response_time_ms", 0), "success" if eth_response["success"] else "error", eth_response.get("status_code") ) # Process Bitcoin data btc_data = None if not isinstance(btc_response, Exception) and btc_response.get("success"): raw_btc = btc_response.get("data", {}) if isinstance(raw_btc, dict) and "data" in raw_btc: btc_stats = raw_btc["data"] btc_data = { "blocks": btc_stats.get("blocks"), "transactions": btc_stats.get("transactions"), "market_price_usd": btc_stats.get("market_price_usd"), "hashrate_24h": btc_stats.get("hashrate_24h"), "difficulty": btc_stats.get("difficulty"), "mempool_size": btc_stats.get("mempool_size"), "mempool_transactions": btc_stats.get("mempool_transactions") } # Process Ethereum data eth_data = None if not isinstance(eth_response, Exception) and eth_response.get("success"): raw_eth = eth_response.get("data", {}) if isinstance(raw_eth, dict) and "data" in raw_eth: eth_stats = raw_eth["data"] eth_data = { "blocks": eth_stats.get("blocks"), "transactions": eth_stats.get("transactions"), "market_price_usd": eth_stats.get("market_price_usd"), "hashrate_24h": eth_stats.get("hashrate_24h"), "difficulty": eth_stats.get("difficulty"), "mempool_size": eth_stats.get("mempool_tps") } blockchair_data = { "bitcoin": btc_data, "ethereum": eth_data } data_timestamp = datetime.now(timezone.utc) staleness = calculate_staleness_minutes(data_timestamp) logger.info( f"{provider} - {endpoint} - BTC blocks: {btc_data.get('blocks', 'N/A') if btc_data else 'N/A'}, " f"ETH blocks: {eth_data.get('blocks', 'N/A') if eth_data else 'N/A'}" ) return { "provider": provider, "category": category, "data": blockchair_data, "timestamp": datetime.now(timezone.utc).isoformat(), "data_timestamp": data_timestamp.isoformat(), "staleness_minutes": staleness, "success": True, "error": None, "response_time_ms": (btc_response.get("response_time_ms", 0) if not isinstance(btc_response, Exception) else 0) } except Exception as e: error_msg = f"Unexpected error: {str(e)}" log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": "exception" } async def get_glassnode_metrics() -> Dict[str, Any]: """ Fetch advanced on-chain metrics from Glassnode (placeholder) Glassnode provides advanced on-chain analytics and metrics. This is a placeholder implementation that should be extended with: - NUPL (Net Unrealized Profit/Loss) - SOPR (Spent Output Profit Ratio) - Exchange flows - Whale transactions - Active addresses - Realized cap Returns: Dict with provider, category, data, timestamp, staleness, success, error """ provider = "Glassnode" category = "onchain_analytics" endpoint = "/metrics" logger.info(f"Fetching on-chain metrics from {provider} (placeholder)") try: # Placeholder implementation # Glassnode API requires API key and has extensive metrics # Example metrics: NUPL, SOPR, Exchange Flows, Miner Revenue, etc. placeholder_data = { "status": "placeholder", "message": "Glassnode integration not yet implemented", "planned_metrics": [ "NUPL - Net Unrealized Profit/Loss", "SOPR - Spent Output Profit Ratio", "Exchange Net Flows", "Whale Transaction Count", "Active Addresses", "Realized Cap", "MVRV Ratio", "Supply in Profit", "Long/Short Term Holder Supply" ], "note": "Requires Glassnode API key for access" } data_timestamp = datetime.now(timezone.utc) staleness = 0.0 logger.info(f"{provider} - {endpoint} - Placeholder data returned") return { "provider": provider, "category": category, "data": placeholder_data, "timestamp": datetime.now(timezone.utc).isoformat(), "data_timestamp": data_timestamp.isoformat(), "staleness_minutes": staleness, "success": True, "error": None, "is_placeholder": True } except Exception as e: error_msg = f"Unexpected error: {str(e)}" log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": "exception" } async def collect_onchain_data() -> List[Dict[str, Any]]: """ Main function to collect on-chain analytics data from all sources Currently returns placeholder implementations for: - The Graph (GraphQL-based blockchain data) - Blockchair (blockchain explorer and stats) - Glassnode (advanced on-chain metrics) Returns: List of results from all on-chain collectors """ logger.info("Starting on-chain data collection from all sources (placeholder)") # Run all collectors concurrently results = await asyncio.gather( get_the_graph_data(), get_blockchair_data(), get_glassnode_metrics(), return_exceptions=True ) # Process results processed_results = [] for result in results: if isinstance(result, Exception): logger.error(f"Collector failed with exception: {str(result)}") processed_results.append({ "provider": "Unknown", "category": "onchain_analytics", "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": str(result), "error_type": "exception" }) else: processed_results.append(result) # Log summary successful = sum(1 for r in processed_results if r.get("success", False)) placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) logger.info( f"On-chain data collection complete: {successful}/{len(processed_results)} successful " f"({placeholder_count} placeholders)" ) return processed_results class OnChainCollector: """ On-Chain Analytics Collector class for WebSocket streaming interface Wraps the standalone on-chain data collection functions """ def __init__(self, config: Any = None): """ Initialize the on-chain collector Args: config: Configuration object (optional, for compatibility) """ self.config = config self.logger = logger async def collect(self) -> Dict[str, Any]: """ Collect on-chain analytics data from all sources Returns: Dict with aggregated on-chain data """ results = await collect_onchain_data() # Aggregate data for WebSocket streaming aggregated = { "active_addresses": None, "transaction_count": None, "total_fees": None, "gas_price": None, "network_utilization": None, "contract_events": [], "timestamp": datetime.now(timezone.utc).isoformat() } for result in results: if result.get("success") and result.get("data"): provider = result.get("provider", "unknown") data = result["data"] # Skip placeholders but still return basic structure if isinstance(data, dict) and data.get("status") == "placeholder": continue # Parse data from various providers (when implemented) # Currently all are placeholders, so this will be empty pass return aggregated # Example usage if __name__ == "__main__": async def main(): results = await collect_onchain_data() print("\n=== On-Chain Data Collection Results ===") print("Note: These are placeholder implementations") print() for result in results: print(f"\nProvider: {result['provider']}") print(f"Success: {result['success']}") print(f"Is Placeholder: {result.get('is_placeholder', False)}") if result['success']: data = result.get('data', {}) if isinstance(data, dict): print(f"Status: {data.get('status', 'N/A')}") print(f"Message: {data.get('message', 'N/A')}") if 'planned_features' in data: print(f"Planned Features: {len(data['planned_features'])}") else: print(f"Error: {result.get('error', 'Unknown')}") print("\n" + "="*50) print("To implement these collectors:") print("1. The Graph: Add GraphQL queries for specific subgraphs") print("2. Blockchair: Add API key and implement endpoint calls") print("3. Glassnode: Add API key and implement metrics fetching") print("="*50) asyncio.run(main())