|
|
""" |
|
|
On-Chain Analytics Collectors |
|
|
Placeholder implementations for The Graph and Blockchair data collection |
|
|
|
|
|
These collectors are designed to be extended with actual implementations |
|
|
when on-chain data sources are integrated. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timezone |
|
|
from typing import Dict, List, Optional, Any |
|
|
from utils.api_client import get_client |
|
|
from utils.logger import setup_logger, log_api_request, log_error |
|
|
|
|
|
logger = setup_logger("onchain_collector") |
|
|
|
|
|
|
|
|
def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: |
|
|
""" |
|
|
Calculate staleness in minutes from data timestamp to now |
|
|
|
|
|
Args: |
|
|
data_timestamp: Timestamp of the data |
|
|
|
|
|
Returns: |
|
|
Staleness in minutes or None if timestamp not available |
|
|
""" |
|
|
if not data_timestamp: |
|
|
return None |
|
|
|
|
|
now = datetime.now(timezone.utc) |
|
|
if data_timestamp.tzinfo is None: |
|
|
data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) |
|
|
|
|
|
delta = now - data_timestamp |
|
|
return delta.total_seconds() / 60.0 |
|
|
|
|
|
|
|
|
async def get_the_graph_data() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch on-chain data from The Graph protocol - Uniswap V3 subgraph |
|
|
|
|
|
The Graph is a decentralized protocol for indexing and querying blockchain data. |
|
|
This implementation queries the Uniswap V3 subgraph for DEX metrics. |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, staleness, success, error |
|
|
""" |
|
|
provider = "TheGraph" |
|
|
category = "onchain_analytics" |
|
|
endpoint = "/subgraphs/uniswap-v3" |
|
|
|
|
|
logger.info(f"Fetching on-chain data from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3" |
|
|
|
|
|
|
|
|
query = """ |
|
|
{ |
|
|
factories(first: 1) { |
|
|
totalVolumeUSD |
|
|
totalValueLockedUSD |
|
|
txCount |
|
|
} |
|
|
pools(first: 10, orderBy: totalValueLockedUSD, orderDirection: desc) { |
|
|
id |
|
|
token0 { |
|
|
symbol |
|
|
} |
|
|
token1 { |
|
|
symbol |
|
|
} |
|
|
totalValueLockedUSD |
|
|
volumeUSD |
|
|
txCount |
|
|
} |
|
|
} |
|
|
""" |
|
|
|
|
|
payload = {"query": query} |
|
|
headers = {"Content-Type": "application/json"} |
|
|
|
|
|
|
|
|
response = await client.post(url, json=payload, headers=headers, timeout=15) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"staleness_minutes": None, |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
raw_data = response["data"] |
|
|
|
|
|
graph_data = None |
|
|
if isinstance(raw_data, dict) and "data" in raw_data: |
|
|
data = raw_data["data"] |
|
|
factories = data.get("factories", []) |
|
|
pools = data.get("pools", []) |
|
|
|
|
|
if factories: |
|
|
factory = factories[0] |
|
|
graph_data = { |
|
|
"protocol": "Uniswap V3", |
|
|
"total_volume_usd": float(factory.get("totalVolumeUSD", 0)), |
|
|
"total_tvl_usd": float(factory.get("totalValueLockedUSD", 0)), |
|
|
"total_transactions": int(factory.get("txCount", 0)), |
|
|
"top_pools": [ |
|
|
{ |
|
|
"pair": f"{pool.get('token0', {}).get('symbol', '?')}/{pool.get('token1', {}).get('symbol', '?')}", |
|
|
"tvl_usd": float(pool.get("totalValueLockedUSD", 0)), |
|
|
"volume_usd": float(pool.get("volumeUSD", 0)), |
|
|
"tx_count": int(pool.get("txCount", 0)) |
|
|
} |
|
|
for pool in pools |
|
|
] |
|
|
} |
|
|
|
|
|
data_timestamp = datetime.now(timezone.utc) |
|
|
staleness = calculate_staleness_minutes(data_timestamp) |
|
|
|
|
|
logger.info( |
|
|
f"{provider} - {endpoint} - TVL: ${graph_data.get('total_tvl_usd', 0):,.0f}" |
|
|
if graph_data else f"{provider} - {endpoint} - No data" |
|
|
) |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": graph_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"data_timestamp": data_timestamp.isoformat(), |
|
|
"staleness_minutes": staleness, |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"staleness_minutes": None, |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_blockchair_data() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch blockchain statistics from Blockchair |
|
|
|
|
|
Blockchair is a blockchain explorer and analytics platform. |
|
|
This implementation fetches Bitcoin and Ethereum network statistics. |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, staleness, success, error |
|
|
""" |
|
|
provider = "Blockchair" |
|
|
category = "onchain_analytics" |
|
|
endpoint = "/stats" |
|
|
|
|
|
logger.info(f"Fetching blockchain stats from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
btc_url = "https://api.blockchair.com/bitcoin/stats" |
|
|
eth_url = "https://api.blockchair.com/ethereum/stats" |
|
|
|
|
|
|
|
|
btc_response, eth_response = await asyncio.gather( |
|
|
client.get(btc_url, timeout=10), |
|
|
client.get(eth_url, timeout=10), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
if not isinstance(btc_response, Exception): |
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
f"{endpoint}/bitcoin", |
|
|
btc_response.get("response_time_ms", 0), |
|
|
"success" if btc_response["success"] else "error", |
|
|
btc_response.get("status_code") |
|
|
) |
|
|
|
|
|
if not isinstance(eth_response, Exception): |
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
f"{endpoint}/ethereum", |
|
|
eth_response.get("response_time_ms", 0), |
|
|
"success" if eth_response["success"] else "error", |
|
|
eth_response.get("status_code") |
|
|
) |
|
|
|
|
|
|
|
|
btc_data = None |
|
|
if not isinstance(btc_response, Exception) and btc_response.get("success"): |
|
|
raw_btc = btc_response.get("data", {}) |
|
|
if isinstance(raw_btc, dict) and "data" in raw_btc: |
|
|
btc_stats = raw_btc["data"] |
|
|
btc_data = { |
|
|
"blocks": btc_stats.get("blocks"), |
|
|
"transactions": btc_stats.get("transactions"), |
|
|
"market_price_usd": btc_stats.get("market_price_usd"), |
|
|
"hashrate_24h": btc_stats.get("hashrate_24h"), |
|
|
"difficulty": btc_stats.get("difficulty"), |
|
|
"mempool_size": btc_stats.get("mempool_size"), |
|
|
"mempool_transactions": btc_stats.get("mempool_transactions") |
|
|
} |
|
|
|
|
|
|
|
|
eth_data = None |
|
|
if not isinstance(eth_response, Exception) and eth_response.get("success"): |
|
|
raw_eth = eth_response.get("data", {}) |
|
|
if isinstance(raw_eth, dict) and "data" in raw_eth: |
|
|
eth_stats = raw_eth["data"] |
|
|
eth_data = { |
|
|
"blocks": eth_stats.get("blocks"), |
|
|
"transactions": eth_stats.get("transactions"), |
|
|
"market_price_usd": eth_stats.get("market_price_usd"), |
|
|
"hashrate_24h": eth_stats.get("hashrate_24h"), |
|
|
"difficulty": eth_stats.get("difficulty"), |
|
|
"mempool_size": eth_stats.get("mempool_tps") |
|
|
} |
|
|
|
|
|
blockchair_data = { |
|
|
"bitcoin": btc_data, |
|
|
"ethereum": eth_data |
|
|
} |
|
|
|
|
|
data_timestamp = datetime.now(timezone.utc) |
|
|
staleness = calculate_staleness_minutes(data_timestamp) |
|
|
|
|
|
logger.info( |
|
|
f"{provider} - {endpoint} - BTC blocks: {btc_data.get('blocks', 'N/A') if btc_data else 'N/A'}, " |
|
|
f"ETH blocks: {eth_data.get('blocks', 'N/A') if eth_data else 'N/A'}" |
|
|
) |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": blockchair_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"data_timestamp": data_timestamp.isoformat(), |
|
|
"staleness_minutes": staleness, |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": (btc_response.get("response_time_ms", 0) if not isinstance(btc_response, Exception) else 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"staleness_minutes": None, |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_glassnode_metrics() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch advanced on-chain metrics from Glassnode (placeholder) |
|
|
|
|
|
Glassnode provides advanced on-chain analytics and metrics. |
|
|
This is a placeholder implementation that should be extended with: |
|
|
- NUPL (Net Unrealized Profit/Loss) |
|
|
- SOPR (Spent Output Profit Ratio) |
|
|
- Exchange flows |
|
|
- Whale transactions |
|
|
- Active addresses |
|
|
- Realized cap |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, staleness, success, error |
|
|
""" |
|
|
provider = "Glassnode" |
|
|
category = "onchain_analytics" |
|
|
endpoint = "/metrics" |
|
|
|
|
|
logger.info(f"Fetching on-chain metrics from {provider} (placeholder)") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
placeholder_data = { |
|
|
"status": "placeholder", |
|
|
"message": "Glassnode integration not yet implemented", |
|
|
"planned_metrics": [ |
|
|
"NUPL - Net Unrealized Profit/Loss", |
|
|
"SOPR - Spent Output Profit Ratio", |
|
|
"Exchange Net Flows", |
|
|
"Whale Transaction Count", |
|
|
"Active Addresses", |
|
|
"Realized Cap", |
|
|
"MVRV Ratio", |
|
|
"Supply in Profit", |
|
|
"Long/Short Term Holder Supply" |
|
|
], |
|
|
"note": "Requires Glassnode API key for access" |
|
|
} |
|
|
|
|
|
data_timestamp = datetime.now(timezone.utc) |
|
|
staleness = 0.0 |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Placeholder data returned") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": placeholder_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"data_timestamp": data_timestamp.isoformat(), |
|
|
"staleness_minutes": staleness, |
|
|
"success": True, |
|
|
"error": None, |
|
|
"is_placeholder": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"staleness_minutes": None, |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def collect_onchain_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main function to collect on-chain analytics data from all sources |
|
|
|
|
|
Currently returns placeholder implementations for: |
|
|
- The Graph (GraphQL-based blockchain data) |
|
|
- Blockchair (blockchain explorer and stats) |
|
|
- Glassnode (advanced on-chain metrics) |
|
|
|
|
|
Returns: |
|
|
List of results from all on-chain collectors |
|
|
""" |
|
|
logger.info("Starting on-chain data collection from all sources (placeholder)") |
|
|
|
|
|
|
|
|
results = await asyncio.gather( |
|
|
get_the_graph_data(), |
|
|
get_blockchair_data(), |
|
|
get_glassnode_metrics(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
processed_results = [] |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Collector failed with exception: {str(result)}") |
|
|
processed_results.append({ |
|
|
"provider": "Unknown", |
|
|
"category": "onchain_analytics", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"staleness_minutes": None, |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
processed_results.append(result) |
|
|
|
|
|
|
|
|
successful = sum(1 for r in processed_results if r.get("success", False)) |
|
|
placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) |
|
|
|
|
|
logger.info( |
|
|
f"On-chain data collection complete: {successful}/{len(processed_results)} successful " |
|
|
f"({placeholder_count} placeholders)" |
|
|
) |
|
|
|
|
|
return processed_results |
|
|
|
|
|
|
|
|
class OnChainCollector: |
|
|
""" |
|
|
On-Chain Analytics Collector class for WebSocket streaming interface |
|
|
Wraps the standalone on-chain data collection functions |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Any = None): |
|
|
""" |
|
|
Initialize the on-chain collector |
|
|
|
|
|
Args: |
|
|
config: Configuration object (optional, for compatibility) |
|
|
""" |
|
|
self.config = config |
|
|
self.logger = logger |
|
|
|
|
|
async def collect(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Collect on-chain analytics data from all sources |
|
|
|
|
|
Returns: |
|
|
Dict with aggregated on-chain data |
|
|
""" |
|
|
results = await collect_onchain_data() |
|
|
|
|
|
|
|
|
aggregated = { |
|
|
"active_addresses": None, |
|
|
"transaction_count": None, |
|
|
"total_fees": None, |
|
|
"gas_price": None, |
|
|
"network_utilization": None, |
|
|
"contract_events": [], |
|
|
"timestamp": datetime.now(timezone.utc).isoformat() |
|
|
} |
|
|
|
|
|
for result in results: |
|
|
if result.get("success") and result.get("data"): |
|
|
provider = result.get("provider", "unknown") |
|
|
data = result["data"] |
|
|
|
|
|
|
|
|
if isinstance(data, dict) and data.get("status") == "placeholder": |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
return aggregated |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
results = await collect_onchain_data() |
|
|
|
|
|
print("\n=== On-Chain Data Collection Results ===") |
|
|
print("Note: These are placeholder implementations") |
|
|
print() |
|
|
|
|
|
for result in results: |
|
|
print(f"\nProvider: {result['provider']}") |
|
|
print(f"Success: {result['success']}") |
|
|
print(f"Is Placeholder: {result.get('is_placeholder', False)}") |
|
|
if result['success']: |
|
|
data = result.get('data', {}) |
|
|
if isinstance(data, dict): |
|
|
print(f"Status: {data.get('status', 'N/A')}") |
|
|
print(f"Message: {data.get('message', 'N/A')}") |
|
|
if 'planned_features' in data: |
|
|
print(f"Planned Features: {len(data['planned_features'])}") |
|
|
else: |
|
|
print(f"Error: {result.get('error', 'Unknown')}") |
|
|
|
|
|
print("\n" + "="*50) |
|
|
print("To implement these collectors:") |
|
|
print("1. The Graph: Add GraphQL queries for specific subgraphs") |
|
|
print("2. Blockchair: Add API key and implement endpoint calls") |
|
|
print("3. Glassnode: Add API key and implement metrics fetching") |
|
|
print("="*50) |
|
|
|
|
|
asyncio.run(main()) |
|
|
|