|
|
""" |
|
|
WebSocket API for Data Collection Services |
|
|
|
|
|
This module provides WebSocket endpoints for real-time data streaming |
|
|
from all data collection services. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
from typing import Any, Dict, Optional |
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect |
|
|
import logging |
|
|
|
|
|
from backend.services.ws_service_manager import ws_manager, ServiceType |
|
|
from collectors.market_data import MarketDataCollector |
|
|
from collectors.explorers import ExplorerDataCollector |
|
|
from collectors.news import NewsCollector |
|
|
from collectors.sentiment import SentimentCollector |
|
|
from collectors.whale_tracking import WhaleTrackingCollector |
|
|
from collectors.rpc_nodes import RPCNodeCollector |
|
|
from collectors.onchain import OnChainCollector |
|
|
from config import Config |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DataCollectionStreamers: |
|
|
"""Handles data streaming for all collection services""" |
|
|
|
|
|
def __init__(self): |
|
|
self.config = Config() |
|
|
self.market_data_collector = MarketDataCollector(self.config) |
|
|
self.explorer_collector = ExplorerDataCollector(self.config) |
|
|
self.news_collector = NewsCollector(self.config) |
|
|
self.sentiment_collector = SentimentCollector(self.config) |
|
|
self.whale_collector = WhaleTrackingCollector(self.config) |
|
|
self.rpc_collector = RPCNodeCollector(self.config) |
|
|
self.onchain_collector = OnChainCollector(self.config) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_market_data(self): |
|
|
"""Stream real-time market data""" |
|
|
try: |
|
|
data = await self.market_data_collector.collect() |
|
|
if data: |
|
|
return { |
|
|
"prices": data.get("prices", {}), |
|
|
"volumes": data.get("volumes", {}), |
|
|
"market_caps": data.get("market_caps", {}), |
|
|
"price_changes": data.get("price_changes", {}), |
|
|
"source": data.get("source", "unknown"), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming market data: {e}") |
|
|
return None |
|
|
|
|
|
async def stream_order_books(self): |
|
|
"""Stream order book data""" |
|
|
try: |
|
|
|
|
|
data = await self.market_data_collector.collect() |
|
|
if data and "order_book" in data: |
|
|
return { |
|
|
"bids": data["order_book"].get("bids", []), |
|
|
"asks": data["order_book"].get("asks", []), |
|
|
"spread": data["order_book"].get("spread"), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming order books: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_explorer_data(self): |
|
|
"""Stream blockchain explorer data""" |
|
|
try: |
|
|
data = await self.explorer_collector.collect() |
|
|
if data: |
|
|
return { |
|
|
"latest_block": data.get("latest_block"), |
|
|
"network_hashrate": data.get("network_hashrate"), |
|
|
"difficulty": data.get("difficulty"), |
|
|
"mempool_size": data.get("mempool_size"), |
|
|
"transactions_count": data.get("transactions_count"), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming explorer data: {e}") |
|
|
return None |
|
|
|
|
|
async def stream_transactions(self): |
|
|
"""Stream recent transactions""" |
|
|
try: |
|
|
data = await self.explorer_collector.collect() |
|
|
if data and "recent_transactions" in data: |
|
|
return { |
|
|
"transactions": data["recent_transactions"], |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming transactions: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_news(self): |
|
|
"""Stream news updates""" |
|
|
try: |
|
|
data = await self.news_collector.collect() |
|
|
if data and "articles" in data: |
|
|
return { |
|
|
"articles": data["articles"][:10], |
|
|
"sources": data.get("sources", []), |
|
|
"categories": data.get("categories", []), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming news: {e}") |
|
|
return None |
|
|
|
|
|
async def stream_breaking_news(self): |
|
|
"""Stream breaking news alerts""" |
|
|
try: |
|
|
data = await self.news_collector.collect() |
|
|
if data and "breaking" in data: |
|
|
return { |
|
|
"breaking_news": data["breaking"], |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming breaking news: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_sentiment(self): |
|
|
"""Stream sentiment analysis data""" |
|
|
try: |
|
|
data = await self.sentiment_collector.collect() |
|
|
if data: |
|
|
return { |
|
|
"overall_sentiment": data.get("overall_sentiment"), |
|
|
"sentiment_score": data.get("sentiment_score"), |
|
|
"social_volume": data.get("social_volume"), |
|
|
"trending_topics": data.get("trending_topics", []), |
|
|
"sentiment_by_source": data.get("by_source", {}), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming sentiment: {e}") |
|
|
return None |
|
|
|
|
|
async def stream_social_trends(self): |
|
|
"""Stream social media trends""" |
|
|
try: |
|
|
data = await self.sentiment_collector.collect() |
|
|
if data and "social_trends" in data: |
|
|
return { |
|
|
"trends": data["social_trends"], |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming social trends: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_whale_activity(self): |
|
|
"""Stream whale transaction data""" |
|
|
try: |
|
|
data = await self.whale_collector.collect() |
|
|
if data: |
|
|
return { |
|
|
"large_transactions": data.get("large_transactions", []), |
|
|
"whale_wallets": data.get("whale_wallets", []), |
|
|
"total_volume": data.get("total_volume"), |
|
|
"alert_threshold": data.get("alert_threshold"), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming whale activity: {e}") |
|
|
return None |
|
|
|
|
|
async def stream_whale_alerts(self): |
|
|
"""Stream whale transaction alerts""" |
|
|
try: |
|
|
data = await self.whale_collector.collect() |
|
|
if data and "alerts" in data: |
|
|
return { |
|
|
"alerts": data["alerts"], |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming whale alerts: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_rpc_status(self): |
|
|
"""Stream RPC node status""" |
|
|
try: |
|
|
data = await self.rpc_collector.collect() |
|
|
if data: |
|
|
return { |
|
|
"nodes": data.get("nodes", []), |
|
|
"active_nodes": data.get("active_nodes"), |
|
|
"total_nodes": data.get("total_nodes"), |
|
|
"average_latency": data.get("average_latency"), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming RPC status: {e}") |
|
|
return None |
|
|
|
|
|
async def stream_blockchain_events(self): |
|
|
"""Stream blockchain events from RPC nodes""" |
|
|
try: |
|
|
data = await self.rpc_collector.collect() |
|
|
if data and "events" in data: |
|
|
return { |
|
|
"events": data["events"], |
|
|
"block_number": data.get("block_number"), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming blockchain events: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_onchain_metrics(self): |
|
|
"""Stream on-chain analytics""" |
|
|
try: |
|
|
data = await self.onchain_collector.collect() |
|
|
if data: |
|
|
return { |
|
|
"active_addresses": data.get("active_addresses"), |
|
|
"transaction_count": data.get("transaction_count"), |
|
|
"total_fees": data.get("total_fees"), |
|
|
"gas_price": data.get("gas_price"), |
|
|
"network_utilization": data.get("network_utilization"), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming on-chain metrics: {e}") |
|
|
return None |
|
|
|
|
|
async def stream_contract_events(self): |
|
|
"""Stream smart contract events""" |
|
|
try: |
|
|
data = await self.onchain_collector.collect() |
|
|
if data and "contract_events" in data: |
|
|
return { |
|
|
"events": data["contract_events"], |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error streaming contract events: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
data_streamers = DataCollectionStreamers() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def start_data_collection_streams(): |
|
|
"""Start all data collection stream tasks""" |
|
|
logger.info("Starting data collection WebSocket streams") |
|
|
|
|
|
tasks = [ |
|
|
|
|
|
asyncio.create_task(ws_manager.start_service_stream( |
|
|
ServiceType.MARKET_DATA, |
|
|
data_streamers.stream_market_data, |
|
|
interval=5.0 |
|
|
)), |
|
|
|
|
|
|
|
|
asyncio.create_task(ws_manager.start_service_stream( |
|
|
ServiceType.EXPLORERS, |
|
|
data_streamers.stream_explorer_data, |
|
|
interval=10.0 |
|
|
)), |
|
|
|
|
|
|
|
|
asyncio.create_task(ws_manager.start_service_stream( |
|
|
ServiceType.NEWS, |
|
|
data_streamers.stream_news, |
|
|
interval=60.0 |
|
|
)), |
|
|
|
|
|
|
|
|
asyncio.create_task(ws_manager.start_service_stream( |
|
|
ServiceType.SENTIMENT, |
|
|
data_streamers.stream_sentiment, |
|
|
interval=30.0 |
|
|
)), |
|
|
|
|
|
|
|
|
asyncio.create_task(ws_manager.start_service_stream( |
|
|
ServiceType.WHALE_TRACKING, |
|
|
data_streamers.stream_whale_activity, |
|
|
interval=15.0 |
|
|
)), |
|
|
|
|
|
|
|
|
asyncio.create_task(ws_manager.start_service_stream( |
|
|
ServiceType.RPC_NODES, |
|
|
data_streamers.stream_rpc_status, |
|
|
interval=20.0 |
|
|
)), |
|
|
|
|
|
|
|
|
asyncio.create_task(ws_manager.start_service_stream( |
|
|
ServiceType.ONCHAIN, |
|
|
data_streamers.stream_onchain_metrics, |
|
|
interval=30.0 |
|
|
)), |
|
|
] |
|
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.websocket("/ws/data") |
|
|
async def websocket_data_endpoint(websocket: WebSocket): |
|
|
""" |
|
|
Unified WebSocket endpoint for all data collection services |
|
|
|
|
|
Connection URL: ws://host:port/ws/data |
|
|
|
|
|
After connecting, send subscription messages: |
|
|
{ |
|
|
"action": "subscribe", |
|
|
"service": "market_data" | "explorers" | "news" | "sentiment" | |
|
|
"whale_tracking" | "rpc_nodes" | "onchain" | "all" |
|
|
} |
|
|
|
|
|
To unsubscribe: |
|
|
{ |
|
|
"action": "unsubscribe", |
|
|
"service": "service_name" |
|
|
} |
|
|
|
|
|
To get status: |
|
|
{ |
|
|
"action": "get_status" |
|
|
} |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
|
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"Client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"WebSocket error for client {connection.client_id}: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|
|
|
|
|
|
@router.websocket("/ws/market_data") |
|
|
async def websocket_market_data(websocket: WebSocket): |
|
|
""" |
|
|
Dedicated WebSocket endpoint for market data |
|
|
|
|
|
Auto-subscribes to market_data service |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
connection.subscribe(ServiceType.MARKET_DATA) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"Market data client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Market data WebSocket error: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|
|
|
|
|
|
@router.websocket("/ws/whale_tracking") |
|
|
async def websocket_whale_tracking(websocket: WebSocket): |
|
|
""" |
|
|
Dedicated WebSocket endpoint for whale tracking |
|
|
|
|
|
Auto-subscribes to whale_tracking service |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
connection.subscribe(ServiceType.WHALE_TRACKING) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"Whale tracking client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Whale tracking WebSocket error: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|
|
|
|
|
|
@router.websocket("/ws/news") |
|
|
async def websocket_news(websocket: WebSocket): |
|
|
""" |
|
|
Dedicated WebSocket endpoint for news |
|
|
|
|
|
Auto-subscribes to news service |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
connection.subscribe(ServiceType.NEWS) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"News client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"News WebSocket error: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|
|
|
|
|
|
@router.websocket("/ws/sentiment") |
|
|
async def websocket_sentiment(websocket: WebSocket): |
|
|
""" |
|
|
Dedicated WebSocket endpoint for sentiment analysis |
|
|
|
|
|
Auto-subscribes to sentiment service |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
connection.subscribe(ServiceType.SENTIMENT) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"Sentiment client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Sentiment WebSocket error: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|