|
|
""" |
|
|
Intelligent Source Pool Manager |
|
|
Manages source pools, rotation, and automatic failover |
|
|
""" |
|
|
|
|
|
import json |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, List, Dict, Any |
|
|
from threading import Lock |
|
|
from sqlalchemy.orm import Session |
|
|
|
|
|
from database.models import ( |
|
|
SourcePool, PoolMember, RotationHistory, RotationState, |
|
|
Provider, RateLimitUsage |
|
|
) |
|
|
from monitoring.rate_limiter import rate_limiter |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger("source_pool_manager") |
|
|
|
|
|
|
|
|
class SourcePoolManager: |
|
|
""" |
|
|
Manages source pools and intelligent rotation |
|
|
""" |
|
|
|
|
|
def __init__(self, db_session: Session): |
|
|
""" |
|
|
Initialize source pool manager |
|
|
|
|
|
Args: |
|
|
db_session: Database session |
|
|
""" |
|
|
self.db = db_session |
|
|
self.lock = Lock() |
|
|
logger.info("Source Pool Manager initialized") |
|
|
|
|
|
def create_pool( |
|
|
self, |
|
|
name: str, |
|
|
category: str, |
|
|
description: Optional[str] = None, |
|
|
rotation_strategy: str = "round_robin" |
|
|
) -> SourcePool: |
|
|
""" |
|
|
Create a new source pool |
|
|
|
|
|
Args: |
|
|
name: Pool name |
|
|
category: Pool category |
|
|
description: Pool description |
|
|
rotation_strategy: Rotation strategy (round_robin, least_used, priority) |
|
|
|
|
|
Returns: |
|
|
Created SourcePool |
|
|
""" |
|
|
with self.lock: |
|
|
pool = SourcePool( |
|
|
name=name, |
|
|
category=category, |
|
|
description=description, |
|
|
rotation_strategy=rotation_strategy, |
|
|
enabled=True |
|
|
) |
|
|
self.db.add(pool) |
|
|
self.db.commit() |
|
|
self.db.refresh(pool) |
|
|
|
|
|
|
|
|
state = RotationState( |
|
|
pool_id=pool.id, |
|
|
current_provider_id=None, |
|
|
rotation_count=0 |
|
|
) |
|
|
self.db.add(state) |
|
|
self.db.commit() |
|
|
|
|
|
logger.info(f"Created source pool: {name} (strategy: {rotation_strategy})") |
|
|
return pool |
|
|
|
|
|
def add_to_pool( |
|
|
self, |
|
|
pool_id: int, |
|
|
provider_id: int, |
|
|
priority: int = 1, |
|
|
weight: int = 1 |
|
|
) -> PoolMember: |
|
|
""" |
|
|
Add a provider to a pool |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
provider_id: Provider ID |
|
|
priority: Provider priority (higher = better) |
|
|
weight: Provider weight for weighted rotation |
|
|
|
|
|
Returns: |
|
|
Created PoolMember |
|
|
""" |
|
|
with self.lock: |
|
|
member = PoolMember( |
|
|
pool_id=pool_id, |
|
|
provider_id=provider_id, |
|
|
priority=priority, |
|
|
weight=weight, |
|
|
enabled=True, |
|
|
use_count=0, |
|
|
success_count=0, |
|
|
failure_count=0 |
|
|
) |
|
|
self.db.add(member) |
|
|
self.db.commit() |
|
|
self.db.refresh(member) |
|
|
|
|
|
logger.info(f"Added provider {provider_id} to pool {pool_id}") |
|
|
return member |
|
|
|
|
|
def get_next_provider( |
|
|
self, |
|
|
pool_id: int, |
|
|
exclude_rate_limited: bool = True |
|
|
) -> Optional[Provider]: |
|
|
""" |
|
|
Get next provider from pool based on rotation strategy |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
exclude_rate_limited: Exclude rate-limited providers |
|
|
|
|
|
Returns: |
|
|
Next Provider or None if none available |
|
|
""" |
|
|
with self.lock: |
|
|
|
|
|
pool = self.db.query(SourcePool).filter_by(id=pool_id).first() |
|
|
if not pool or not pool.enabled: |
|
|
logger.warning(f"Pool {pool_id} not found or disabled") |
|
|
return None |
|
|
|
|
|
|
|
|
members = ( |
|
|
self.db.query(PoolMember) |
|
|
.filter_by(pool_id=pool_id, enabled=True) |
|
|
.join(Provider) |
|
|
.filter(Provider.id == PoolMember.provider_id) |
|
|
.all() |
|
|
) |
|
|
|
|
|
if not members: |
|
|
logger.warning(f"No enabled members in pool {pool_id}") |
|
|
return None |
|
|
|
|
|
|
|
|
if exclude_rate_limited: |
|
|
available_members = [] |
|
|
for member in members: |
|
|
provider = self.db.query(Provider).get(member.provider_id) |
|
|
can_use, _ = rate_limiter.can_make_request(provider.name) |
|
|
if can_use: |
|
|
available_members.append(member) |
|
|
|
|
|
if not available_members: |
|
|
logger.warning(f"All providers in pool {pool_id} are rate-limited") |
|
|
|
|
|
available_members = members |
|
|
else: |
|
|
available_members = members |
|
|
|
|
|
|
|
|
selected_member = self._select_by_strategy( |
|
|
pool.rotation_strategy, |
|
|
available_members |
|
|
) |
|
|
|
|
|
if not selected_member: |
|
|
return None |
|
|
|
|
|
|
|
|
state = self.db.query(RotationState).filter_by(pool_id=pool_id).first() |
|
|
if not state: |
|
|
state = RotationState(pool_id=pool_id) |
|
|
self.db.add(state) |
|
|
|
|
|
|
|
|
old_provider_id = state.current_provider_id |
|
|
if old_provider_id != selected_member.provider_id: |
|
|
self._record_rotation( |
|
|
pool_id=pool_id, |
|
|
from_provider_id=old_provider_id, |
|
|
to_provider_id=selected_member.provider_id, |
|
|
reason="rotation" |
|
|
) |
|
|
|
|
|
|
|
|
state.current_provider_id = selected_member.provider_id |
|
|
state.last_rotation = datetime.utcnow() |
|
|
state.rotation_count += 1 |
|
|
|
|
|
|
|
|
selected_member.last_used = datetime.utcnow() |
|
|
selected_member.use_count += 1 |
|
|
|
|
|
self.db.commit() |
|
|
|
|
|
provider = self.db.query(Provider).get(selected_member.provider_id) |
|
|
logger.info( |
|
|
f"Selected provider {provider.name} from pool {pool.name} " |
|
|
f"(strategy: {pool.rotation_strategy})" |
|
|
) |
|
|
return provider |
|
|
|
|
|
def _select_by_strategy( |
|
|
self, |
|
|
strategy: str, |
|
|
members: List[PoolMember] |
|
|
) -> Optional[PoolMember]: |
|
|
""" |
|
|
Select a pool member based on rotation strategy |
|
|
|
|
|
Args: |
|
|
strategy: Rotation strategy |
|
|
members: Available pool members |
|
|
|
|
|
Returns: |
|
|
Selected PoolMember |
|
|
""" |
|
|
if not members: |
|
|
return None |
|
|
|
|
|
if strategy == "priority": |
|
|
|
|
|
return max(members, key=lambda m: m.priority) |
|
|
|
|
|
elif strategy == "least_used": |
|
|
|
|
|
return min(members, key=lambda m: m.use_count) |
|
|
|
|
|
elif strategy == "weighted": |
|
|
|
|
|
|
|
|
return max(members, key=lambda m: m.weight * (1.0 / (m.use_count + 1))) |
|
|
|
|
|
else: |
|
|
|
|
|
never_used = [m for m in members if m.last_used is None] |
|
|
if never_used: |
|
|
return never_used[0] |
|
|
return min(members, key=lambda m: m.last_used) |
|
|
|
|
|
def _record_rotation( |
|
|
self, |
|
|
pool_id: int, |
|
|
from_provider_id: Optional[int], |
|
|
to_provider_id: int, |
|
|
reason: str, |
|
|
notes: Optional[str] = None |
|
|
): |
|
|
""" |
|
|
Record a rotation event |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
from_provider_id: Previous provider ID |
|
|
to_provider_id: New provider ID |
|
|
reason: Rotation reason |
|
|
notes: Additional notes |
|
|
""" |
|
|
rotation = RotationHistory( |
|
|
pool_id=pool_id, |
|
|
from_provider_id=from_provider_id, |
|
|
to_provider_id=to_provider_id, |
|
|
rotation_reason=reason, |
|
|
success=True, |
|
|
notes=notes |
|
|
) |
|
|
self.db.add(rotation) |
|
|
self.db.commit() |
|
|
|
|
|
def failover( |
|
|
self, |
|
|
pool_id: int, |
|
|
failed_provider_id: int, |
|
|
reason: str = "failure" |
|
|
) -> Optional[Provider]: |
|
|
""" |
|
|
Perform failover from a failed provider |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
failed_provider_id: Failed provider ID |
|
|
reason: Failure reason |
|
|
|
|
|
Returns: |
|
|
Next available provider |
|
|
""" |
|
|
with self.lock: |
|
|
logger.warning( |
|
|
f"Failover triggered for provider {failed_provider_id} " |
|
|
f"in pool {pool_id}. Reason: {reason}" |
|
|
) |
|
|
|
|
|
|
|
|
member = ( |
|
|
self.db.query(PoolMember) |
|
|
.filter_by(pool_id=pool_id, provider_id=failed_provider_id) |
|
|
.first() |
|
|
) |
|
|
if member: |
|
|
member.failure_count += 1 |
|
|
self.db.commit() |
|
|
|
|
|
|
|
|
pool = self.db.query(SourcePool).filter_by(id=pool_id).first() |
|
|
if not pool: |
|
|
return None |
|
|
|
|
|
members = ( |
|
|
self.db.query(PoolMember) |
|
|
.filter_by(pool_id=pool_id, enabled=True) |
|
|
.filter(PoolMember.provider_id != failed_provider_id) |
|
|
.all() |
|
|
) |
|
|
|
|
|
if not members: |
|
|
logger.error(f"No alternative providers available in pool {pool_id}") |
|
|
return None |
|
|
|
|
|
|
|
|
selected_member = self._select_by_strategy( |
|
|
pool.rotation_strategy, |
|
|
members |
|
|
) |
|
|
|
|
|
if not selected_member: |
|
|
return None |
|
|
|
|
|
|
|
|
self._record_rotation( |
|
|
pool_id=pool_id, |
|
|
from_provider_id=failed_provider_id, |
|
|
to_provider_id=selected_member.provider_id, |
|
|
reason=reason, |
|
|
notes=f"Automatic failover from provider {failed_provider_id}" |
|
|
) |
|
|
|
|
|
|
|
|
state = self.db.query(RotationState).filter_by(pool_id=pool_id).first() |
|
|
if state: |
|
|
state.current_provider_id = selected_member.provider_id |
|
|
state.last_rotation = datetime.utcnow() |
|
|
state.rotation_count += 1 |
|
|
|
|
|
|
|
|
selected_member.last_used = datetime.utcnow() |
|
|
selected_member.use_count += 1 |
|
|
|
|
|
self.db.commit() |
|
|
|
|
|
provider = self.db.query(Provider).get(selected_member.provider_id) |
|
|
logger.info(f"Failover successful: switched to provider {provider.name}") |
|
|
return provider |
|
|
|
|
|
def record_success(self, pool_id: int, provider_id: int): |
|
|
""" |
|
|
Record successful use of a provider |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
provider_id: Provider ID |
|
|
""" |
|
|
with self.lock: |
|
|
member = ( |
|
|
self.db.query(PoolMember) |
|
|
.filter_by(pool_id=pool_id, provider_id=provider_id) |
|
|
.first() |
|
|
) |
|
|
if member: |
|
|
member.success_count += 1 |
|
|
self.db.commit() |
|
|
|
|
|
def record_failure(self, pool_id: int, provider_id: int): |
|
|
""" |
|
|
Record failed use of a provider |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
provider_id: Provider ID |
|
|
""" |
|
|
with self.lock: |
|
|
member = ( |
|
|
self.db.query(PoolMember) |
|
|
.filter_by(pool_id=pool_id, provider_id=provider_id) |
|
|
.first() |
|
|
) |
|
|
if member: |
|
|
member.failure_count += 1 |
|
|
self.db.commit() |
|
|
|
|
|
def get_pool_status(self, pool_id: int) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Get comprehensive pool status |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
|
|
|
Returns: |
|
|
Pool status dictionary |
|
|
""" |
|
|
with self.lock: |
|
|
pool = self.db.query(SourcePool).filter_by(id=pool_id).first() |
|
|
if not pool: |
|
|
return None |
|
|
|
|
|
|
|
|
state = self.db.query(RotationState).filter_by(pool_id=pool_id).first() |
|
|
|
|
|
|
|
|
current_provider = None |
|
|
if state and state.current_provider_id: |
|
|
provider = self.db.query(Provider).get(state.current_provider_id) |
|
|
if provider: |
|
|
current_provider = { |
|
|
"id": provider.id, |
|
|
"name": provider.name, |
|
|
"status": "active" |
|
|
} |
|
|
|
|
|
|
|
|
members = [] |
|
|
pool_members = self.db.query(PoolMember).filter_by(pool_id=pool_id).all() |
|
|
|
|
|
for member in pool_members: |
|
|
provider = self.db.query(Provider).get(member.provider_id) |
|
|
if not provider: |
|
|
continue |
|
|
|
|
|
|
|
|
rate_status = rate_limiter.get_status(provider.name) |
|
|
rate_limit_info = None |
|
|
if rate_status: |
|
|
rate_limit_info = { |
|
|
"usage": rate_status['current_usage'], |
|
|
"limit": rate_status['limit_value'], |
|
|
"percentage": rate_status['percentage'], |
|
|
"status": rate_status['status'] |
|
|
} |
|
|
|
|
|
success_rate = 0 |
|
|
if member.use_count > 0: |
|
|
success_rate = (member.success_count / member.use_count) * 100 |
|
|
|
|
|
members.append({ |
|
|
"provider_id": provider.id, |
|
|
"provider_name": provider.name, |
|
|
"priority": member.priority, |
|
|
"weight": member.weight, |
|
|
"enabled": member.enabled, |
|
|
"use_count": member.use_count, |
|
|
"success_count": member.success_count, |
|
|
"failure_count": member.failure_count, |
|
|
"success_rate": round(success_rate, 2), |
|
|
"last_used": member.last_used.isoformat() if member.last_used else None, |
|
|
"rate_limit": rate_limit_info |
|
|
}) |
|
|
|
|
|
|
|
|
recent_rotations = ( |
|
|
self.db.query(RotationHistory) |
|
|
.filter_by(pool_id=pool_id) |
|
|
.order_by(RotationHistory.timestamp.desc()) |
|
|
.limit(10) |
|
|
.all() |
|
|
) |
|
|
|
|
|
rotation_list = [] |
|
|
for rotation in recent_rotations: |
|
|
from_provider = None |
|
|
if rotation.from_provider_id: |
|
|
from_prov = self.db.query(Provider).get(rotation.from_provider_id) |
|
|
from_provider = from_prov.name if from_prov else None |
|
|
|
|
|
to_prov = self.db.query(Provider).get(rotation.to_provider_id) |
|
|
to_provider = to_prov.name if to_prov else None |
|
|
|
|
|
rotation_list.append({ |
|
|
"timestamp": rotation.timestamp.isoformat(), |
|
|
"from_provider": from_provider, |
|
|
"to_provider": to_provider, |
|
|
"reason": rotation.rotation_reason, |
|
|
"success": rotation.success |
|
|
}) |
|
|
|
|
|
return { |
|
|
"pool_id": pool.id, |
|
|
"pool_name": pool.name, |
|
|
"category": pool.category, |
|
|
"description": pool.description, |
|
|
"rotation_strategy": pool.rotation_strategy, |
|
|
"enabled": pool.enabled, |
|
|
"current_provider": current_provider, |
|
|
"total_rotations": state.rotation_count if state else 0, |
|
|
"last_rotation": state.last_rotation.isoformat() if state and state.last_rotation else None, |
|
|
"members": members, |
|
|
"recent_rotations": rotation_list |
|
|
} |
|
|
|
|
|
def get_all_pools_status(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get status of all pools |
|
|
|
|
|
Returns: |
|
|
List of pool status dictionaries |
|
|
""" |
|
|
pools = self.db.query(SourcePool).all() |
|
|
return [ |
|
|
self.get_pool_status(pool.id) |
|
|
for pool in pools |
|
|
if self.get_pool_status(pool.id) |
|
|
] |
|
|
|