WebSocket API Documentation
Comprehensive guide to accessing all services via WebSocket connections.
Table of Contents
- Overview
- Quick Start
- Master Endpoints
- Data Collection Services
- Monitoring Services
- Integration Services
- Message Protocol
- Code Examples
- Available Services
Overview
The Crypto API Monitoring System provides comprehensive WebSocket APIs for real-time streaming of all services. All WebSocket endpoints support:
- Subscription-based routing: Subscribe only to services you need
- Real-time updates: Live data streaming at service-specific intervals
- Bi-directional communication: Send commands and receive responses
- Connection management: Automatic reconnection and heartbeat
- Multiple connection patterns: Master endpoint, service-specific endpoints, or auto-subscribe
Quick Start
Basic Connection
// Connect to the master endpoint
const ws = new WebSocket('ws://localhost:7860/ws/master');
ws.onopen = () => {
console.log('Connected!');
// Subscribe to market data
ws.send(JSON.stringify({
action: 'subscribe',
service: 'market_data'
}));
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log('Received:', message);
};
Python Example
import asyncio
import websockets
import json
async def connect():
uri = "ws://localhost:7860/ws/master"
async with websockets.connect(uri) as websocket:
# Subscribe to whale tracking
await websocket.send(json.dumps({
"action": "subscribe",
"service": "whale_tracking"
}))
# Receive messages
async for message in websocket:
data = json.loads(message)
print(f"Received: {data}")
asyncio.run(connect())
Master Endpoints
/ws - Default WebSocket Endpoint
The default endpoint with subscription management capabilities.
Connection URL: ws://localhost:7860/ws
Features:
- Access to all services
- Manual subscription management
- Connection status tracking
/ws/master - Master WebSocket Endpoint
Full-featured endpoint with comprehensive service access.
Connection URL: ws://localhost:7860/ws/master
Features:
- Complete service catalog on connection
- Detailed usage instructions
- Real-time statistics
Initial Message:
{
"service": "system",
"type": "welcome",
"data": {
"message": "Connected to master WebSocket endpoint",
"available_services": {
"data_collection": [...],
"monitoring": [...],
"integration": [...]
},
"usage": {
"subscribe": {"action": "subscribe", "service": "service_name"}
}
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
/ws/all - Auto-Subscribe to All Services
Automatically subscribes to all available services upon connection.
Connection URL: ws://localhost:7860/ws/all
Features:
- Instant access to all service updates
- No manual subscription needed
- Comprehensive data streaming
Use Case: Monitoring dashboards that need all data
Data Collection Services
/ws/data - Unified Data Collection Endpoint
Unified endpoint for all data collection services with manual subscription.
Connection URL: ws://localhost:7860/ws/data
Available Services:
market_data- Real-time cryptocurrency prices and volumesexplorers- Blockchain explorer datanews- Cryptocurrency news aggregationsentiment- Market sentiment analysiswhale_tracking- Large transaction monitoringrpc_nodes- RPC node status and blockchain eventsonchain- On-chain analytics and metrics
/ws/market_data - Market Data Only
Dedicated endpoint for market data (auto-subscribed).
Connection URL: ws://localhost:7860/ws/market_data
Update Interval: 5 seconds
Message Format:
{
"service": "market_data",
"type": "update",
"data": {
"prices": {
"bitcoin": 45000.00,
"ethereum": 3200.00
},
"volumes": {
"bitcoin": 25000000000,
"ethereum": 15000000000
},
"market_caps": {...},
"price_changes": {...},
"source": "coingecko",
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
/ws/whale_tracking - Whale Tracking Only
Dedicated endpoint for whale transaction monitoring (auto-subscribed).
Connection URL: ws://localhost:7860/ws/whale_tracking
Update Interval: 15 seconds
Message Format:
{
"service": "whale_tracking",
"type": "update",
"data": {
"large_transactions": [
{
"hash": "0x...",
"value": 1000000000,
"from": "0x...",
"to": "0x...",
"timestamp": "2025-11-11T10:29:45.000Z"
}
],
"whale_wallets": [...],
"total_volume": 5000000000,
"alert_threshold": 1000000,
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
/ws/news - News Only
Dedicated endpoint for cryptocurrency news (auto-subscribed).
Connection URL: ws://localhost:7860/ws/news
Update Interval: 60 seconds
Message Format:
{
"service": "news",
"type": "update",
"data": {
"articles": [
{
"title": "Bitcoin reaches new high",
"source": "CoinDesk",
"url": "https://...",
"published_at": "2025-11-11T10:25:00.000Z"
}
],
"sources": ["CoinDesk", "CoinTelegraph"],
"categories": ["Market", "Technology"],
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
/ws/sentiment - Sentiment Analysis Only
Dedicated endpoint for market sentiment (auto-subscribed).
Connection URL: ws://localhost:7860/ws/sentiment
Update Interval: 30 seconds
Message Format:
{
"service": "sentiment",
"type": "update",
"data": {
"overall_sentiment": "bullish",
"sentiment_score": 0.75,
"social_volume": 125000,
"trending_topics": ["Bitcoin", "Ethereum"],
"sentiment_by_source": {
"twitter": 0.80,
"reddit": 0.70
},
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
Monitoring Services
/ws/monitoring - Unified Monitoring Endpoint
Unified endpoint for all monitoring services with manual subscription.
Connection URL: ws://localhost:7860/ws/monitoring
Available Services:
health_checker- Provider health monitoringpool_manager- Source pool management and failoverscheduler- Task scheduler status
/ws/health - Health Monitoring Only
Dedicated endpoint for health checks (auto-subscribed).
Connection URL: ws://localhost:7860/ws/health
Update Interval: 30 seconds
Message Format:
{
"service": "health_checker",
"type": "update",
"data": {
"overall_health": "healthy",
"healthy_count": 45,
"unhealthy_count": 2,
"total_providers": 47,
"providers": {
"coingecko": {
"status": "healthy",
"response_time_ms": 150,
"last_check": "2025-11-11T10:30:00.000Z"
}
},
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
/ws/pool_status - Pool Manager Only
Dedicated endpoint for source pool management (auto-subscribed).
Connection URL: ws://localhost:7860/ws/pool_status
Update Interval: 20 seconds
Message Format:
{
"service": "pool_manager",
"type": "update",
"data": {
"pools": {
"market_data": {
"active_source": "coingecko",
"available_sources": ["coingecko", "coinmarketcap"],
"health": "healthy"
}
},
"active_sources": ["coingecko", "etherscan"],
"inactive_sources": ["blockchair"],
"failover_count": 2,
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
/ws/scheduler_status - Scheduler Only
Dedicated endpoint for task scheduler (auto-subscribed).
Connection URL: ws://localhost:7860/ws/scheduler_status
Update Interval: 15 seconds
Message Format:
{
"service": "scheduler",
"type": "update",
"data": {
"running": true,
"total_jobs": 10,
"active_jobs": 3,
"jobs": [
{
"id": "market_data_collection",
"next_run": "2025-11-11T10:31:00.000Z",
"status": "running"
}
],
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
Integration Services
/ws/integration - Unified Integration Endpoint
Unified endpoint for all integration services with manual subscription.
Connection URL: ws://localhost:7860/ws/integration
Available Services:
huggingface- HuggingFace AI/ML servicespersistence- Data persistence and export services
/ws/huggingface - HuggingFace Services Only
Dedicated endpoint for HuggingFace AI services (auto-subscribed).
Connection URL: ws://localhost:7860/ws/huggingface
Aliases: /ws/ai
Update Interval: 60 seconds
Message Format:
{
"service": "huggingface",
"type": "update",
"data": {
"total_models": 25,
"total_datasets": 10,
"available_models": ["sentiment-model-1", "sentiment-model-2"],
"available_datasets": ["crypto-tweets", "reddit-posts"],
"last_refresh": "2025-11-11T10:00:00.000Z",
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
/ws/persistence - Persistence Services Only
Dedicated endpoint for data persistence (auto-subscribed).
Connection URL: ws://localhost:7860/ws/persistence
Update Interval: 30 seconds
Message Format:
{
"service": "persistence",
"type": "update",
"data": {
"storage_location": "/data/crypto-monitoring",
"total_records": 1500000,
"storage_size": "2.5 GB",
"last_save": "2025-11-11T10:29:55.000Z",
"active_writers": 3,
"timestamp": "2025-11-11T10:30:00.000Z"
},
"timestamp": "2025-11-11T10:30:00.000Z"
}
Message Protocol
Client to Server Messages
Subscribe to a Service
{
"action": "subscribe",
"service": "market_data"
}
Available Services: market_data, explorers, news, sentiment, whale_tracking, rpc_nodes, onchain, health_checker, pool_manager, scheduler, huggingface, persistence, system, all
Unsubscribe from a Service
{
"action": "unsubscribe",
"service": "market_data"
}
Get Connection Status
{
"action": "get_status"
}
Response:
{
"service": "system",
"type": "status",
"data": {
"client_id": "client_1_1731324000",
"connected_at": "2025-11-11T10:30:00.000Z",
"last_activity": "2025-11-11T10:30:05.000Z",
"subscriptions": ["market_data", "whale_tracking"],
"total_clients": 5
},
"timestamp": "2025-11-11T10:30:05.000Z"
}
Ping/Pong
{
"action": "ping",
"data": {"custom": "data"}
}
Response:
{
"service": "system",
"type": "pong",
"data": {"custom": "data"},
"timestamp": "2025-11-11T10:30:05.000Z"
}
Server to Client Messages
All server messages follow this format:
{
"service": "service_name",
"type": "message_type",
"data": { },
"timestamp": "2025-11-11T10:30:00.000Z"
}
Message Types:
connection_established- Initial connection confirmationwelcome- Welcome message with service informationupdate- Service data updatesubscription_confirmed- Subscription confirmationunsubscription_confirmed- Unsubscription confirmationstatus- Connection status responsepong- Ping responseerror- Error message
Code Examples
JavaScript/TypeScript Client
class CryptoWebSocketClient {
constructor(baseUrl = 'ws://localhost:7860') {
this.baseUrl = baseUrl;
this.ws = null;
this.subscriptions = new Set();
}
connect(endpoint = '/ws/master') {
this.ws = new WebSocket(`${this.baseUrl}${endpoint}`);
this.ws.onopen = () => {
console.log('Connected to', endpoint);
this.onConnected();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.ws.onclose = () => {
console.log('Disconnected');
this.onDisconnected();
};
}
subscribe(service) {
this.send({
action: 'subscribe',
service: service
});
this.subscriptions.add(service);
}
unsubscribe(service) {
this.send({
action: 'unsubscribe',
service: service
});
this.subscriptions.delete(service);
}
getStatus() {
this.send({ action: 'get_status' });
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
handleMessage(message) {
console.log('Received:', message);
switch (message.type) {
case 'connection_established':
console.log('Client ID:', message.data.client_id);
break;
case 'update':
this.onUpdate(message.service, message.data);
break;
case 'error':
console.error('Server error:', message.data.message);
break;
}
}
onConnected() {
// Override in subclass
}
onDisconnected() {
// Override in subclass
}
onUpdate(service, data) {
// Override in subclass
console.log(`Update from ${service}:`, data);
}
}
// Usage
const client = new CryptoWebSocketClient();
client.connect('/ws/master');
client.onConnected = () => {
client.subscribe('market_data');
client.subscribe('whale_tracking');
};
client.onUpdate = (service, data) => {
if (service === 'market_data') {
console.log('Prices:', data.prices);
} else if (service === 'whale_tracking') {
console.log('Whale transactions:', data.large_transactions);
}
};
Python Client
import asyncio
import websockets
import json
from typing import Callable, Dict, Any
class CryptoWebSocketClient:
def __init__(self, base_url: str = "ws://localhost:7860"):
self.base_url = base_url
self.ws = None
self.subscriptions = set()
self.message_handlers = {}
async def connect(self, endpoint: str = "/ws/master"):
uri = f"{self.base_url}{endpoint}"
async with websockets.connect(uri) as websocket:
self.ws = websocket
print(f"Connected to {endpoint}")
# Handle incoming messages
async for message in websocket:
data = json.loads(message)
await self.handle_message(data)
async def subscribe(self, service: str):
await self.send({
"action": "subscribe",
"service": service
})
self.subscriptions.add(service)
async def unsubscribe(self, service: str):
await self.send({
"action": "unsubscribe",
"service": service
})
self.subscriptions.discard(service)
async def get_status(self):
await self.send({"action": "get_status"})
async def send(self, data: Dict[str, Any]):
if self.ws:
await self.ws.send(json.dumps(data))
async def handle_message(self, message: Dict[str, Any]):
msg_type = message.get("type")
service = message.get("service")
if msg_type == "connection_established":
print(f"Client ID: {message['data']['client_id']}")
await self.on_connected()
elif msg_type == "update":
await self.on_update(service, message["data"])
elif msg_type == "error":
print(f"Error: {message['data']['message']}")
async def on_connected(self):
# Override in subclass
pass
async def on_update(self, service: str, data: Dict[str, Any]):
# Override in subclass or register handlers
if service in self.message_handlers:
await self.message_handlers[service](data)
else:
print(f"Update from {service}: {data}")
def register_handler(self, service: str, handler: Callable):
self.message_handlers[service] = handler
# Usage
async def main():
client = CryptoWebSocketClient()
# Register handlers
async def handle_market_data(data):
print(f"Prices: {data.get('prices')}")
async def handle_whale_tracking(data):
print(f"Large transactions: {data.get('large_transactions')}")
client.register_handler('market_data', handle_market_data)
client.register_handler('whale_tracking', handle_whale_tracking)
# Connect and subscribe
async def on_connected():
await client.subscribe('market_data')
await client.subscribe('whale_tracking')
client.on_connected = on_connected
await client.connect('/ws/master')
asyncio.run(main())
React Hook Example
import { useEffect, useState, useCallback } from 'react';
interface WebSocketMessage {
service: string;
type: string;
data: any;
timestamp: string;
}
export function useWebSocket(endpoint: string = '/ws/master') {
const [ws, setWs] = useState<WebSocket | null>(null);
const [connected, setConnected] = useState(false);
const [messages, setMessages] = useState<WebSocketMessage[]>([]);
useEffect(() => {
const websocket = new WebSocket(`ws://localhost:7860${endpoint}`);
websocket.onopen = () => {
console.log('WebSocket connected');
setConnected(true);
};
websocket.onmessage = (event) => {
const message: WebSocketMessage = JSON.parse(event.data);
setMessages(prev => [...prev, message]);
};
websocket.onclose = () => {
console.log('WebSocket disconnected');
setConnected(false);
};
setWs(websocket);
return () => {
websocket.close();
};
}, [endpoint]);
const subscribe = useCallback((service: string) => {
if (ws && connected) {
ws.send(JSON.stringify({
action: 'subscribe',
service: service
}));
}
}, [ws, connected]);
const unsubscribe = useCallback((service: string) => {
if (ws && connected) {
ws.send(JSON.stringify({
action: 'unsubscribe',
service: service
}));
}
}, [ws, connected]);
return { connected, messages, subscribe, unsubscribe };
}
// Usage in component
function MarketDataComponent() {
const { connected, messages, subscribe } = useWebSocket('/ws/master');
useEffect(() => {
if (connected) {
subscribe('market_data');
}
}, [connected, subscribe]);
const marketDataMessages = messages.filter(m => m.service === 'market_data');
return (
<div>
<h2>Market Data</h2>
<p>Status: {connected ? 'Connected' : 'Disconnected'}</p>
{marketDataMessages.map((msg, idx) => (
<div key={idx}>
<p>Prices: {JSON.stringify(msg.data.prices)}</p>
</div>
))}
</div>
);
}
Available Services
Data Collection Services
| Service | Description | Update Interval | Endpoint |
|---|---|---|---|
market_data |
Real-time cryptocurrency prices, volumes, and market caps | 5 seconds | /ws/market_data |
explorers |
Blockchain explorer data and network statistics | 10 seconds | /ws/data |
news |
Cryptocurrency news aggregation from multiple sources | 60 seconds | /ws/news |
sentiment |
Market sentiment analysis and social media trends | 30 seconds | /ws/sentiment |
whale_tracking |
Large transaction monitoring and whale wallet tracking | 15 seconds | /ws/whale_tracking |
rpc_nodes |
RPC node status and blockchain events | 20 seconds | /ws/data |
onchain |
On-chain analytics and smart contract events | 30 seconds | /ws/data |
Monitoring Services
| Service | Description | Update Interval | Endpoint |
|---|---|---|---|
health_checker |
Provider health monitoring and status checks | 30 seconds | /ws/health |
pool_manager |
Source pool management and automatic failover | 20 seconds | /ws/pool_status |
scheduler |
Task scheduler status and job execution tracking | 15 seconds | /ws/scheduler_status |
Integration Services
| Service | Description | Update Interval | Endpoint |
|---|---|---|---|
huggingface |
HuggingFace AI model registry and sentiment analysis | 60 seconds | /ws/huggingface |
persistence |
Data persistence, exports, and backup operations | 30 seconds | /ws/persistence |
System Services
| Service | Description | Endpoint |
|---|---|---|
system |
System messages and connection management | All endpoints |
all |
Subscribe to all services at once | /ws/all |
REST API Endpoints
Get WebSocket Statistics
GET /ws/stats
Returns information about active connections and subscriptions.
Response:
{
"status": "success",
"data": {
"total_connections": 5,
"clients": [
{
"client_id": "client_1_1731324000",
"connected_at": "2025-11-11T10:30:00.000Z",
"last_activity": "2025-11-11T10:35:00.000Z",
"subscriptions": ["market_data", "whale_tracking"]
}
],
"subscription_counts": {
"market_data": 3,
"whale_tracking": 2,
"news": 1
}
},
"timestamp": "2025-11-11T10:35:00.000Z"
}
Get Available Services
GET /ws/services
Returns a comprehensive list of all available services with descriptions.
Get WebSocket Endpoints
GET /ws/endpoints
Returns a list of all WebSocket connection URLs.
Error Handling
Connection Errors
If a connection fails or is lost, implement exponential backoff:
class ReconnectingWebSocket {
constructor(url) {
this.url = url;
this.reconnectDelay = 1000;
this.maxReconnectDelay = 30000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onclose = () => {
console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
setTimeout(() => {
this.reconnectDelay = Math.min(
this.reconnectDelay * 2,
this.maxReconnectDelay
);
this.connect();
}, this.reconnectDelay);
};
this.ws.onopen = () => {
console.log('Connected');
this.reconnectDelay = 1000; // Reset delay on successful connection
};
}
}
Message Errors
Handle error messages from the server:
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'error') {
console.error('Server error:', message.data.message);
// Handle specific errors
if (message.data.message.includes('Invalid service')) {
console.log('Available services:', message.data.available_services);
}
}
};
Best Practices
- Subscribe Only to What You Need: Minimize bandwidth by subscribing only to required services
- Implement Reconnection Logic: Handle network interruptions gracefully
- Use Heartbeats: Implement ping/pong to detect connection issues early
- Handle Backpressure: Process messages efficiently to avoid queue buildup
- Clean Up Subscriptions: Unsubscribe when components unmount or services are no longer needed
- Use Service-Specific Endpoints: For single-service needs, use dedicated endpoints to reduce initial setup
- Monitor Connection Status: Track connection state and subscriptions in your application
- Implement Error Boundaries: Gracefully handle and display connection/data errors
Support
For issues or questions:
- GitHub Issues: https://github.com/nimazasinich/crypto-dt-source/issues
- API Documentation: http://localhost:7860/docs
Version
API Version: 2.0.0 Last Updated: 2025-11-11