|
|
"""
|
|
|
Connection Manager - مدیریت اتصالات WebSocket و Session
|
|
|
"""
|
|
|
import asyncio
|
|
|
import json
|
|
|
import uuid
|
|
|
from typing import Dict, Set, Optional, Any
|
|
|
from datetime import datetime
|
|
|
from dataclasses import dataclass, asdict
|
|
|
from fastapi import WebSocket
|
|
|
import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ClientSession:
|
|
|
"""اطلاعات Session کلاینت"""
|
|
|
session_id: str
|
|
|
client_type: str
|
|
|
connected_at: datetime
|
|
|
last_activity: datetime
|
|
|
ip_address: Optional[str] = None
|
|
|
user_agent: Optional[str] = None
|
|
|
metadata: Dict[str, Any] = None
|
|
|
|
|
|
def to_dict(self):
|
|
|
return {
|
|
|
'session_id': self.session_id,
|
|
|
'client_type': self.client_type,
|
|
|
'connected_at': self.connected_at.isoformat(),
|
|
|
'last_activity': self.last_activity.isoformat(),
|
|
|
'ip_address': self.ip_address,
|
|
|
'user_agent': self.user_agent,
|
|
|
'metadata': self.metadata or {}
|
|
|
}
|
|
|
|
|
|
|
|
|
class ConnectionManager:
|
|
|
"""مدیر اتصالات WebSocket و Session"""
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.active_connections: Dict[str, WebSocket] = {}
|
|
|
|
|
|
|
|
|
self.sessions: Dict[str, ClientSession] = {}
|
|
|
|
|
|
|
|
|
self.subscriptions: Dict[str, Set[str]] = {
|
|
|
'market': set(),
|
|
|
'prices': set(),
|
|
|
'news': set(),
|
|
|
'alerts': set(),
|
|
|
'all': set()
|
|
|
}
|
|
|
|
|
|
|
|
|
self.total_connections = 0
|
|
|
self.total_messages_sent = 0
|
|
|
self.total_messages_received = 0
|
|
|
|
|
|
async def connect(
|
|
|
self,
|
|
|
websocket: WebSocket,
|
|
|
client_type: str = 'browser',
|
|
|
metadata: Optional[Dict] = None
|
|
|
) -> str:
|
|
|
"""
|
|
|
اتصال کلاینت جدید
|
|
|
|
|
|
Returns:
|
|
|
session_id
|
|
|
"""
|
|
|
await websocket.accept()
|
|
|
|
|
|
session_id = str(uuid.uuid4())
|
|
|
|
|
|
|
|
|
self.active_connections[session_id] = websocket
|
|
|
|
|
|
|
|
|
session = ClientSession(
|
|
|
session_id=session_id,
|
|
|
client_type=client_type,
|
|
|
connected_at=datetime.now(),
|
|
|
last_activity=datetime.now(),
|
|
|
metadata=metadata or {}
|
|
|
)
|
|
|
self.sessions[session_id] = session
|
|
|
|
|
|
|
|
|
self.subscriptions['all'].add(session_id)
|
|
|
|
|
|
self.total_connections += 1
|
|
|
|
|
|
logger.info(f"Client connected: {session_id} ({client_type})")
|
|
|
|
|
|
|
|
|
await self.broadcast_stats()
|
|
|
|
|
|
return session_id
|
|
|
|
|
|
def disconnect(self, session_id: str):
|
|
|
"""قطع اتصال کلاینت"""
|
|
|
|
|
|
if session_id in self.active_connections:
|
|
|
del self.active_connections[session_id]
|
|
|
|
|
|
|
|
|
for group in self.subscriptions.values():
|
|
|
group.discard(session_id)
|
|
|
|
|
|
|
|
|
if session_id in self.sessions:
|
|
|
del self.sessions[session_id]
|
|
|
|
|
|
logger.info(f"Client disconnected: {session_id}")
|
|
|
|
|
|
|
|
|
asyncio.create_task(self.broadcast_stats())
|
|
|
|
|
|
async def send_personal_message(
|
|
|
self,
|
|
|
message: Dict[str, Any],
|
|
|
session_id: str
|
|
|
):
|
|
|
"""ارسال پیام به یک کلاینت خاص"""
|
|
|
if session_id in self.active_connections:
|
|
|
try:
|
|
|
websocket = self.active_connections[session_id]
|
|
|
await websocket.send_json(message)
|
|
|
|
|
|
|
|
|
if session_id in self.sessions:
|
|
|
self.sessions[session_id].last_activity = datetime.now()
|
|
|
|
|
|
self.total_messages_sent += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error sending message to {session_id}: {e}")
|
|
|
self.disconnect(session_id)
|
|
|
|
|
|
async def broadcast(
|
|
|
self,
|
|
|
message: Dict[str, Any],
|
|
|
group: str = 'all'
|
|
|
):
|
|
|
"""ارسال پیام به گروهی از کلاینتها"""
|
|
|
if group not in self.subscriptions:
|
|
|
group = 'all'
|
|
|
|
|
|
session_ids = self.subscriptions[group].copy()
|
|
|
|
|
|
disconnected = []
|
|
|
for session_id in session_ids:
|
|
|
if session_id in self.active_connections:
|
|
|
try:
|
|
|
websocket = self.active_connections[session_id]
|
|
|
await websocket.send_json(message)
|
|
|
self.total_messages_sent += 1
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error broadcasting to {session_id}: {e}")
|
|
|
disconnected.append(session_id)
|
|
|
|
|
|
|
|
|
for session_id in disconnected:
|
|
|
self.disconnect(session_id)
|
|
|
|
|
|
async def broadcast_stats(self):
|
|
|
"""ارسال آمار کلی به همه کلاینتها"""
|
|
|
stats = self.get_stats()
|
|
|
await self.broadcast({
|
|
|
'type': 'stats_update',
|
|
|
'data': stats,
|
|
|
'timestamp': datetime.now().isoformat()
|
|
|
})
|
|
|
|
|
|
def subscribe(self, session_id: str, group: str):
|
|
|
"""اضافه کردن به گروه subscription"""
|
|
|
if group in self.subscriptions:
|
|
|
self.subscriptions[group].add(session_id)
|
|
|
logger.info(f"Session {session_id} subscribed to {group}")
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def unsubscribe(self, session_id: str, group: str):
|
|
|
"""حذف از گروه subscription"""
|
|
|
if group in self.subscriptions:
|
|
|
self.subscriptions[group].discard(session_id)
|
|
|
logger.info(f"Session {session_id} unsubscribed from {group}")
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
|
"""دریافت آمار اتصالات"""
|
|
|
|
|
|
client_types = {}
|
|
|
for session in self.sessions.values():
|
|
|
client_type = session.client_type
|
|
|
client_types[client_type] = client_types.get(client_type, 0) + 1
|
|
|
|
|
|
|
|
|
subscription_stats = {
|
|
|
group: len(members)
|
|
|
for group, members in self.subscriptions.items()
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
'active_connections': len(self.active_connections),
|
|
|
'total_sessions': len(self.sessions),
|
|
|
'total_connections_ever': self.total_connections,
|
|
|
'messages_sent': self.total_messages_sent,
|
|
|
'messages_received': self.total_messages_received,
|
|
|
'client_types': client_types,
|
|
|
'subscriptions': subscription_stats,
|
|
|
'timestamp': datetime.now().isoformat()
|
|
|
}
|
|
|
|
|
|
def get_sessions(self) -> Dict[str, Dict[str, Any]]:
|
|
|
"""دریافت لیست sessionهای فعال"""
|
|
|
return {
|
|
|
sid: session.to_dict()
|
|
|
for sid, session in self.sessions.items()
|
|
|
}
|
|
|
|
|
|
async def send_market_update(self, data: Dict[str, Any]):
|
|
|
"""ارسال بهروزرسانی بازار"""
|
|
|
await self.broadcast({
|
|
|
'type': 'market_update',
|
|
|
'data': data,
|
|
|
'timestamp': datetime.now().isoformat()
|
|
|
}, group='market')
|
|
|
|
|
|
async def send_price_update(self, symbol: str, price: float, change: float):
|
|
|
"""ارسال بهروزرسانی قیمت"""
|
|
|
await self.broadcast({
|
|
|
'type': 'price_update',
|
|
|
'data': {
|
|
|
'symbol': symbol,
|
|
|
'price': price,
|
|
|
'change_24h': change
|
|
|
},
|
|
|
'timestamp': datetime.now().isoformat()
|
|
|
}, group='prices')
|
|
|
|
|
|
async def send_alert(self, alert_type: str, message: str, severity: str = 'info'):
|
|
|
"""ارسال هشدار"""
|
|
|
await self.broadcast({
|
|
|
'type': 'alert',
|
|
|
'data': {
|
|
|
'alert_type': alert_type,
|
|
|
'message': message,
|
|
|
'severity': severity
|
|
|
},
|
|
|
'timestamp': datetime.now().isoformat()
|
|
|
}, group='alerts')
|
|
|
|
|
|
async def heartbeat(self):
|
|
|
"""ارسال heartbeat برای check کردن اتصالات"""
|
|
|
await self.broadcast({
|
|
|
'type': 'heartbeat',
|
|
|
'timestamp': datetime.now().isoformat()
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
connection_manager = ConnectionManager()
|
|
|
|
|
|
|
|
|
def get_connection_manager() -> ConnectionManager:
|
|
|
"""دریافت instance مدیر اتصالات"""
|
|
|
return connection_manager
|
|
|
|
|
|
|