|
|
""" |
|
|
WebSocket Data Broadcaster |
|
|
Broadcasts real-time cryptocurrency data from database to connected clients |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from typing import Dict, Any |
|
|
|
|
|
from database.db_manager import db_manager |
|
|
from backend.services.ws_service_manager import ws_manager, ServiceType |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger("ws_data_broadcaster") |
|
|
|
|
|
|
|
|
class DataBroadcaster: |
|
|
""" |
|
|
Broadcasts cryptocurrency data updates to WebSocket clients |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the broadcaster""" |
|
|
self.last_broadcast = {} |
|
|
self.broadcast_interval = 5 |
|
|
self.is_running = False |
|
|
logger.info("DataBroadcaster initialized") |
|
|
|
|
|
async def start_broadcasting(self): |
|
|
"""Start all broadcast tasks""" |
|
|
logger.info("Starting WebSocket data broadcaster...") |
|
|
|
|
|
self.is_running = True |
|
|
|
|
|
tasks = [ |
|
|
self.broadcast_market_data(), |
|
|
self.broadcast_news(), |
|
|
self.broadcast_sentiment(), |
|
|
self.broadcast_whales(), |
|
|
self.broadcast_gas_prices() |
|
|
] |
|
|
|
|
|
try: |
|
|
await asyncio.gather(*tasks, return_exceptions=True) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in broadcasting tasks: {e}", exc_info=True) |
|
|
finally: |
|
|
self.is_running = False |
|
|
|
|
|
async def stop_broadcasting(self): |
|
|
"""Stop broadcasting""" |
|
|
logger.info("Stopping WebSocket data broadcaster...") |
|
|
self.is_running = False |
|
|
|
|
|
async def broadcast_market_data(self): |
|
|
"""Broadcast market price updates""" |
|
|
logger.info("Starting market data broadcast...") |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
prices = db_manager.get_latest_prices(limit=50) |
|
|
|
|
|
if prices: |
|
|
|
|
|
data = { |
|
|
"type": "market_data", |
|
|
"data": { |
|
|
"prices": {p.symbol: p.price_usd for p in prices}, |
|
|
"volumes": {p.symbol: p.volume_24h for p in prices if p.volume_24h}, |
|
|
"market_caps": {p.symbol: p.market_cap for p in prices if p.market_cap}, |
|
|
"price_changes": {p.symbol: p.price_change_24h for p in prices if p.price_change_24h} |
|
|
}, |
|
|
"count": len(prices), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data) |
|
|
logger.debug(f"Broadcasted {len(prices)} price updates") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting market data: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(self.broadcast_interval) |
|
|
|
|
|
async def broadcast_news(self): |
|
|
"""Broadcast news updates""" |
|
|
logger.info("Starting news broadcast...") |
|
|
last_news_id = 0 |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
news = db_manager.get_latest_news(limit=10) |
|
|
|
|
|
if news and (not last_news_id or news[0].id != last_news_id): |
|
|
|
|
|
last_news_id = news[0].id |
|
|
|
|
|
data = { |
|
|
"type": "news", |
|
|
"data": { |
|
|
"articles": [ |
|
|
{ |
|
|
"id": article.id, |
|
|
"title": article.title, |
|
|
"source": article.source, |
|
|
"url": article.url, |
|
|
"published_at": article.published_at.isoformat(), |
|
|
"sentiment": article.sentiment |
|
|
} |
|
|
for article in news[:5] |
|
|
] |
|
|
}, |
|
|
"count": len(news[:5]), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.NEWS, data) |
|
|
logger.info(f"Broadcasted {len(news[:5])} news articles") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting news: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(30) |
|
|
|
|
|
async def broadcast_sentiment(self): |
|
|
"""Broadcast sentiment updates""" |
|
|
logger.info("Starting sentiment broadcast...") |
|
|
last_sentiment_value = None |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
sentiment = db_manager.get_latest_sentiment() |
|
|
|
|
|
if sentiment and sentiment.value != last_sentiment_value: |
|
|
last_sentiment_value = sentiment.value |
|
|
|
|
|
data = { |
|
|
"type": "sentiment", |
|
|
"data": { |
|
|
"fear_greed_index": sentiment.value, |
|
|
"classification": sentiment.classification, |
|
|
"metric_name": sentiment.metric_name, |
|
|
"source": sentiment.source, |
|
|
"timestamp": sentiment.timestamp.isoformat() |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, data) |
|
|
logger.info(f"Broadcasted sentiment: {sentiment.value} ({sentiment.classification})") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting sentiment: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
async def broadcast_whales(self): |
|
|
"""Broadcast whale transaction updates""" |
|
|
logger.info("Starting whale transaction broadcast...") |
|
|
last_whale_id = 0 |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
whales = db_manager.get_whale_transactions(limit=5) |
|
|
|
|
|
if whales and (not last_whale_id or whales[0].id != last_whale_id): |
|
|
last_whale_id = whales[0].id |
|
|
|
|
|
data = { |
|
|
"type": "whale_transaction", |
|
|
"data": { |
|
|
"transactions": [ |
|
|
{ |
|
|
"id": tx.id, |
|
|
"blockchain": tx.blockchain, |
|
|
"amount_usd": tx.amount_usd, |
|
|
"from_address": tx.from_address[:20] + "...", |
|
|
"to_address": tx.to_address[:20] + "...", |
|
|
"timestamp": tx.timestamp.isoformat() |
|
|
} |
|
|
for tx in whales |
|
|
] |
|
|
}, |
|
|
"count": len(whales), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.WHALE_TRACKING, data) |
|
|
logger.info(f"Broadcasted {len(whales)} whale transactions") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting whales: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(15) |
|
|
|
|
|
async def broadcast_gas_prices(self): |
|
|
"""Broadcast gas price updates""" |
|
|
logger.info("Starting gas price broadcast...") |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
gas_prices = db_manager.get_latest_gas_prices() |
|
|
|
|
|
if gas_prices: |
|
|
data = { |
|
|
"type": "gas_prices", |
|
|
"data": gas_prices, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, data) |
|
|
logger.debug("Broadcasted gas prices") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting gas prices: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(30) |
|
|
|
|
|
|
|
|
|
|
|
broadcaster = DataBroadcaster() |
|
|
|