|
|
""" |
|
|
Unified WebSocket Router |
|
|
|
|
|
This module provides a master WebSocket endpoint that can access all services |
|
|
and manage subscriptions across data collection, monitoring, and integration services. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
from typing import Any, Dict |
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query |
|
|
import logging |
|
|
|
|
|
from backend.services.ws_service_manager import ws_manager, ServiceType |
|
|
from api.ws_data_services import start_data_collection_streams |
|
|
from api.ws_monitoring_services import start_monitoring_streams |
|
|
from api.ws_integration_services import start_integration_streams |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.websocket("/ws/master") |
|
|
async def websocket_master_endpoint(websocket: WebSocket): |
|
|
""" |
|
|
Master WebSocket endpoint with access to ALL services |
|
|
|
|
|
Connection URL: ws://host:port/ws/master |
|
|
|
|
|
After connecting, send subscription messages: |
|
|
{ |
|
|
"action": "subscribe", |
|
|
"service": "market_data" | "explorers" | "news" | "sentiment" | |
|
|
"whale_tracking" | "rpc_nodes" | "onchain" | |
|
|
"health_checker" | "pool_manager" | "scheduler" | |
|
|
"huggingface" | "persistence" | "system" | "all" |
|
|
} |
|
|
|
|
|
To unsubscribe: |
|
|
{ |
|
|
"action": "unsubscribe", |
|
|
"service": "service_name" |
|
|
} |
|
|
|
|
|
To get status: |
|
|
{ |
|
|
"action": "get_status" |
|
|
} |
|
|
|
|
|
To ping: |
|
|
{ |
|
|
"action": "ping", |
|
|
"data": {"your": "data"} |
|
|
} |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
|
|
|
|
|
|
await connection.send_message({ |
|
|
"service": "system", |
|
|
"type": "welcome", |
|
|
"data": { |
|
|
"message": "Connected to master WebSocket endpoint", |
|
|
"available_services": { |
|
|
"data_collection": [ |
|
|
ServiceType.MARKET_DATA.value, |
|
|
ServiceType.EXPLORERS.value, |
|
|
ServiceType.NEWS.value, |
|
|
ServiceType.SENTIMENT.value, |
|
|
ServiceType.WHALE_TRACKING.value, |
|
|
ServiceType.RPC_NODES.value, |
|
|
ServiceType.ONCHAIN.value |
|
|
], |
|
|
"monitoring": [ |
|
|
ServiceType.HEALTH_CHECKER.value, |
|
|
ServiceType.POOL_MANAGER.value, |
|
|
ServiceType.SCHEDULER.value |
|
|
], |
|
|
"integration": [ |
|
|
ServiceType.HUGGINGFACE.value, |
|
|
ServiceType.PERSISTENCE.value |
|
|
], |
|
|
"system": [ |
|
|
ServiceType.SYSTEM.value, |
|
|
ServiceType.ALL.value |
|
|
] |
|
|
}, |
|
|
"usage": { |
|
|
"subscribe": {"action": "subscribe", "service": "service_name"}, |
|
|
"unsubscribe": {"action": "unsubscribe", "service": "service_name"}, |
|
|
"get_status": {"action": "get_status"}, |
|
|
"ping": {"action": "ping"} |
|
|
} |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
}) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"Master client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Master WebSocket error: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|
|
|
|
|
|
@router.websocket("/ws/all") |
|
|
async def websocket_all_services(websocket: WebSocket): |
|
|
""" |
|
|
WebSocket endpoint with automatic subscription to ALL services |
|
|
|
|
|
Connection URL: ws://host:port/ws/all |
|
|
|
|
|
Automatically subscribes to all available services. |
|
|
You'll receive updates from all data collection, monitoring, and integration services. |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
connection.subscribe(ServiceType.ALL) |
|
|
|
|
|
await connection.send_message({ |
|
|
"service": "system", |
|
|
"type": "auto_subscribed", |
|
|
"data": { |
|
|
"message": "Automatically subscribed to all services", |
|
|
"subscription": ServiceType.ALL.value |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
}) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"All-services client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"All-services WebSocket error: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|
|
|
|
|
|
@router.websocket("/ws") |
|
|
async def websocket_default_endpoint(websocket: WebSocket): |
|
|
""" |
|
|
Default WebSocket endpoint (alias for master endpoint) |
|
|
|
|
|
Connection URL: ws://host:port/ws |
|
|
|
|
|
Provides access to all services with subscription management. |
|
|
""" |
|
|
connection = await ws_manager.connect(websocket) |
|
|
|
|
|
await connection.send_message({ |
|
|
"service": "system", |
|
|
"type": "welcome", |
|
|
"data": { |
|
|
"message": "Connected to default WebSocket endpoint", |
|
|
"hint": "Send subscription messages to receive updates", |
|
|
"example": {"action": "subscribe", "service": "market_data"} |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
}) |
|
|
|
|
|
try: |
|
|
while True: |
|
|
data = await websocket.receive_json() |
|
|
await ws_manager.handle_client_message(connection, data) |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
logger.info(f"Default client disconnected: {connection.client_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Default WebSocket error: {e}") |
|
|
finally: |
|
|
await ws_manager.disconnect(connection.client_id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/ws/stats") |
|
|
async def get_websocket_stats(): |
|
|
""" |
|
|
Get WebSocket statistics |
|
|
|
|
|
Returns information about active connections, subscriptions, and services. |
|
|
""" |
|
|
stats = ws_manager.get_stats() |
|
|
return { |
|
|
"status": "success", |
|
|
"data": stats, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
@router.get("/ws/services") |
|
|
async def get_available_services(): |
|
|
""" |
|
|
Get list of all available WebSocket services |
|
|
|
|
|
Returns categorized list of services that can be subscribed to. |
|
|
""" |
|
|
return { |
|
|
"status": "success", |
|
|
"data": { |
|
|
"services": { |
|
|
"data_collection": { |
|
|
"market_data": { |
|
|
"name": "Market Data", |
|
|
"description": "Real-time cryptocurrency prices, volumes, and market caps", |
|
|
"update_interval": "5 seconds", |
|
|
"endpoints": ["/ws/data", "/ws/market_data"] |
|
|
}, |
|
|
"explorers": { |
|
|
"name": "Blockchain Explorers", |
|
|
"description": "Blockchain data, transactions, and network stats", |
|
|
"update_interval": "10 seconds", |
|
|
"endpoints": ["/ws/data"] |
|
|
}, |
|
|
"news": { |
|
|
"name": "News Aggregation", |
|
|
"description": "Cryptocurrency news from multiple sources", |
|
|
"update_interval": "60 seconds", |
|
|
"endpoints": ["/ws/data", "/ws/news"] |
|
|
}, |
|
|
"sentiment": { |
|
|
"name": "Sentiment Analysis", |
|
|
"description": "Market sentiment and social media trends", |
|
|
"update_interval": "30 seconds", |
|
|
"endpoints": ["/ws/data", "/ws/sentiment"] |
|
|
}, |
|
|
"whale_tracking": { |
|
|
"name": "Whale Tracking", |
|
|
"description": "Large transaction monitoring and whale wallet tracking", |
|
|
"update_interval": "15 seconds", |
|
|
"endpoints": ["/ws/data", "/ws/whale_tracking"] |
|
|
}, |
|
|
"rpc_nodes": { |
|
|
"name": "RPC Nodes", |
|
|
"description": "Blockchain RPC node status and events", |
|
|
"update_interval": "20 seconds", |
|
|
"endpoints": ["/ws/data"] |
|
|
}, |
|
|
"onchain": { |
|
|
"name": "On-Chain Analytics", |
|
|
"description": "On-chain metrics and smart contract events", |
|
|
"update_interval": "30 seconds", |
|
|
"endpoints": ["/ws/data"] |
|
|
} |
|
|
}, |
|
|
"monitoring": { |
|
|
"health_checker": { |
|
|
"name": "Health Monitoring", |
|
|
"description": "Provider health checks and system status", |
|
|
"update_interval": "30 seconds", |
|
|
"endpoints": ["/ws/monitoring", "/ws/health"] |
|
|
}, |
|
|
"pool_manager": { |
|
|
"name": "Pool Management", |
|
|
"description": "Source pool status and failover events", |
|
|
"update_interval": "20 seconds", |
|
|
"endpoints": ["/ws/monitoring", "/ws/pool_status"] |
|
|
}, |
|
|
"scheduler": { |
|
|
"name": "Task Scheduler", |
|
|
"description": "Scheduled task execution and status", |
|
|
"update_interval": "15 seconds", |
|
|
"endpoints": ["/ws/monitoring", "/ws/scheduler_status"] |
|
|
} |
|
|
}, |
|
|
"integration": { |
|
|
"huggingface": { |
|
|
"name": "HuggingFace AI", |
|
|
"description": "AI model registry and sentiment analysis", |
|
|
"update_interval": "60 seconds", |
|
|
"endpoints": ["/ws/integration", "/ws/huggingface", "/ws/ai"] |
|
|
}, |
|
|
"persistence": { |
|
|
"name": "Data Persistence", |
|
|
"description": "Data storage, exports, and backups", |
|
|
"update_interval": "30 seconds", |
|
|
"endpoints": ["/ws/integration", "/ws/persistence"] |
|
|
} |
|
|
}, |
|
|
"system": { |
|
|
"all": { |
|
|
"name": "All Services", |
|
|
"description": "Subscribe to all available services", |
|
|
"endpoints": ["/ws/all"] |
|
|
} |
|
|
} |
|
|
}, |
|
|
"master_endpoints": { |
|
|
"/ws": "Default endpoint with subscription management", |
|
|
"/ws/master": "Master endpoint with all service access", |
|
|
"/ws/all": "Auto-subscribe to all services" |
|
|
} |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
@router.get("/ws/endpoints") |
|
|
async def get_websocket_endpoints(): |
|
|
""" |
|
|
Get list of all WebSocket endpoints |
|
|
|
|
|
Returns all available WebSocket connection URLs. |
|
|
""" |
|
|
return { |
|
|
"status": "success", |
|
|
"data": { |
|
|
"master_endpoints": { |
|
|
"/ws": "Default WebSocket endpoint", |
|
|
"/ws/master": "Master endpoint with all services", |
|
|
"/ws/all": "Auto-subscribe to all services" |
|
|
}, |
|
|
"data_collection_endpoints": { |
|
|
"/ws/data": "Unified data collection endpoint", |
|
|
"/ws/market_data": "Market data only", |
|
|
"/ws/whale_tracking": "Whale tracking only", |
|
|
"/ws/news": "News only", |
|
|
"/ws/sentiment": "Sentiment analysis only" |
|
|
}, |
|
|
"monitoring_endpoints": { |
|
|
"/ws/monitoring": "Unified monitoring endpoint", |
|
|
"/ws/health": "Health monitoring only", |
|
|
"/ws/pool_status": "Pool manager only", |
|
|
"/ws/scheduler_status": "Scheduler only" |
|
|
}, |
|
|
"integration_endpoints": { |
|
|
"/ws/integration": "Unified integration endpoint", |
|
|
"/ws/huggingface": "HuggingFace services only", |
|
|
"/ws/ai": "AI/ML services (alias for HuggingFace)", |
|
|
"/ws/persistence": "Persistence services only" |
|
|
} |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def start_all_websocket_streams(): |
|
|
""" |
|
|
Start all WebSocket streaming tasks |
|
|
|
|
|
This should be called on application startup to initialize all |
|
|
background streaming services. |
|
|
""" |
|
|
logger.info("Starting all WebSocket streaming services") |
|
|
|
|
|
|
|
|
await asyncio.gather( |
|
|
start_data_collection_streams(), |
|
|
start_monitoring_streams(), |
|
|
start_integration_streams(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
logger.info("All WebSocket streaming services started") |
|
|
|