|
|
""" |
|
|
API Endpoints for Source Pool Management |
|
|
Provides endpoints for managing source pools, rotation, and monitoring |
|
|
""" |
|
|
|
|
|
from datetime import datetime |
|
|
from typing import Optional, List |
|
|
from fastapi import APIRouter, HTTPException, Body |
|
|
from pydantic import BaseModel, Field |
|
|
|
|
|
from database.db_manager import db_manager |
|
|
from monitoring.source_pool_manager import SourcePoolManager |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger("pool_api") |
|
|
|
|
|
|
|
|
router = APIRouter(prefix="/api/pools", tags=["source_pools"]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CreatePoolRequest(BaseModel): |
|
|
"""Request model for creating a pool""" |
|
|
name: str = Field(..., description="Pool name") |
|
|
category: str = Field(..., description="Pool category") |
|
|
description: Optional[str] = Field(None, description="Pool description") |
|
|
rotation_strategy: str = Field("round_robin", description="Rotation strategy") |
|
|
|
|
|
|
|
|
class AddMemberRequest(BaseModel): |
|
|
"""Request model for adding a member to a pool""" |
|
|
provider_id: int = Field(..., description="Provider ID") |
|
|
priority: int = Field(1, description="Provider priority") |
|
|
weight: int = Field(1, description="Provider weight") |
|
|
|
|
|
|
|
|
class UpdatePoolRequest(BaseModel): |
|
|
"""Request model for updating a pool""" |
|
|
rotation_strategy: Optional[str] = Field(None, description="Rotation strategy") |
|
|
enabled: Optional[bool] = Field(None, description="Pool enabled status") |
|
|
description: Optional[str] = Field(None, description="Pool description") |
|
|
|
|
|
|
|
|
class UpdateMemberRequest(BaseModel): |
|
|
"""Request model for updating a pool member""" |
|
|
priority: Optional[int] = Field(None, description="Provider priority") |
|
|
weight: Optional[int] = Field(None, description="Provider weight") |
|
|
enabled: Optional[bool] = Field(None, description="Member enabled status") |
|
|
|
|
|
|
|
|
class TriggerRotationRequest(BaseModel): |
|
|
"""Request model for triggering manual rotation""" |
|
|
reason: str = Field("manual", description="Rotation reason") |
|
|
|
|
|
|
|
|
class FailoverRequest(BaseModel): |
|
|
"""Request model for triggering failover""" |
|
|
failed_provider_id: int = Field(..., description="Failed provider ID") |
|
|
reason: str = Field("manual_failover", description="Failover reason") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("") |
|
|
async def list_pools(): |
|
|
""" |
|
|
Get list of all source pools with their status |
|
|
|
|
|
Returns: |
|
|
List of source pools with status information |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
pool_manager = SourcePoolManager(session) |
|
|
|
|
|
pools_status = pool_manager.get_all_pools_status() |
|
|
|
|
|
session.close() |
|
|
|
|
|
return { |
|
|
"pools": pools_status, |
|
|
"total": len(pools_status), |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error listing pools: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to list pools: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("") |
|
|
async def create_pool(request: CreatePoolRequest): |
|
|
""" |
|
|
Create a new source pool |
|
|
|
|
|
Args: |
|
|
request: Pool creation request |
|
|
|
|
|
Returns: |
|
|
Created pool information |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
pool_manager = SourcePoolManager(session) |
|
|
|
|
|
pool = pool_manager.create_pool( |
|
|
name=request.name, |
|
|
category=request.category, |
|
|
description=request.description, |
|
|
rotation_strategy=request.rotation_strategy |
|
|
) |
|
|
|
|
|
session.close() |
|
|
|
|
|
return { |
|
|
"pool_id": pool.id, |
|
|
"name": pool.name, |
|
|
"category": pool.category, |
|
|
"rotation_strategy": pool.rotation_strategy, |
|
|
"created_at": pool.created_at.isoformat(), |
|
|
"message": f"Pool '{pool.name}' created successfully" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating pool: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to create pool: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/{pool_id}") |
|
|
async def get_pool_status(pool_id: int): |
|
|
""" |
|
|
Get detailed status of a specific pool |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
|
|
|
Returns: |
|
|
Detailed pool status |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
pool_manager = SourcePoolManager(session) |
|
|
|
|
|
pool_status = pool_manager.get_pool_status(pool_id) |
|
|
|
|
|
session.close() |
|
|
|
|
|
if not pool_status: |
|
|
raise HTTPException(status_code=404, detail=f"Pool {pool_id} not found") |
|
|
|
|
|
return pool_status |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting pool status: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to get pool status: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.put("/{pool_id}") |
|
|
async def update_pool(pool_id: int, request: UpdatePoolRequest): |
|
|
""" |
|
|
Update pool configuration |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
request: Update request |
|
|
|
|
|
Returns: |
|
|
Updated pool information |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
|
|
|
|
|
|
from database.models import SourcePool |
|
|
pool = session.query(SourcePool).filter_by(id=pool_id).first() |
|
|
|
|
|
if not pool: |
|
|
session.close() |
|
|
raise HTTPException(status_code=404, detail=f"Pool {pool_id} not found") |
|
|
|
|
|
|
|
|
if request.rotation_strategy is not None: |
|
|
pool.rotation_strategy = request.rotation_strategy |
|
|
if request.enabled is not None: |
|
|
pool.enabled = request.enabled |
|
|
if request.description is not None: |
|
|
pool.description = request.description |
|
|
|
|
|
pool.updated_at = datetime.utcnow() |
|
|
|
|
|
session.commit() |
|
|
session.refresh(pool) |
|
|
|
|
|
result = { |
|
|
"pool_id": pool.id, |
|
|
"name": pool.name, |
|
|
"rotation_strategy": pool.rotation_strategy, |
|
|
"enabled": pool.enabled, |
|
|
"updated_at": pool.updated_at.isoformat(), |
|
|
"message": f"Pool '{pool.name}' updated successfully" |
|
|
} |
|
|
|
|
|
session.close() |
|
|
|
|
|
return result |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error updating pool: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to update pool: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.delete("/{pool_id}") |
|
|
async def delete_pool(pool_id: int): |
|
|
""" |
|
|
Delete a source pool |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
|
|
|
Returns: |
|
|
Deletion confirmation |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
|
|
|
from database.models import SourcePool |
|
|
pool = session.query(SourcePool).filter_by(id=pool_id).first() |
|
|
|
|
|
if not pool: |
|
|
session.close() |
|
|
raise HTTPException(status_code=404, detail=f"Pool {pool_id} not found") |
|
|
|
|
|
pool_name = pool.name |
|
|
session.delete(pool) |
|
|
session.commit() |
|
|
session.close() |
|
|
|
|
|
return { |
|
|
"message": f"Pool '{pool_name}' deleted successfully", |
|
|
"pool_id": pool_id |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error deleting pool: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to delete pool: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/{pool_id}/members") |
|
|
async def add_pool_member(pool_id: int, request: AddMemberRequest): |
|
|
""" |
|
|
Add a provider to a pool |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
request: Add member request |
|
|
|
|
|
Returns: |
|
|
Created member information |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
pool_manager = SourcePoolManager(session) |
|
|
|
|
|
member = pool_manager.add_to_pool( |
|
|
pool_id=pool_id, |
|
|
provider_id=request.provider_id, |
|
|
priority=request.priority, |
|
|
weight=request.weight |
|
|
) |
|
|
|
|
|
|
|
|
from database.models import Provider |
|
|
provider = session.query(Provider).get(request.provider_id) |
|
|
|
|
|
session.close() |
|
|
|
|
|
return { |
|
|
"member_id": member.id, |
|
|
"pool_id": pool_id, |
|
|
"provider_id": request.provider_id, |
|
|
"provider_name": provider.name if provider else None, |
|
|
"priority": member.priority, |
|
|
"weight": member.weight, |
|
|
"message": f"Provider added to pool successfully" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error adding pool member: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to add pool member: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.put("/{pool_id}/members/{provider_id}") |
|
|
async def update_pool_member( |
|
|
pool_id: int, |
|
|
provider_id: int, |
|
|
request: UpdateMemberRequest |
|
|
): |
|
|
""" |
|
|
Update a pool member configuration |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
provider_id: Provider ID |
|
|
request: Update request |
|
|
|
|
|
Returns: |
|
|
Updated member information |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
|
|
|
from database.models import PoolMember |
|
|
member = ( |
|
|
session.query(PoolMember) |
|
|
.filter_by(pool_id=pool_id, provider_id=provider_id) |
|
|
.first() |
|
|
) |
|
|
|
|
|
if not member: |
|
|
session.close() |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"Member not found in pool {pool_id}" |
|
|
) |
|
|
|
|
|
|
|
|
if request.priority is not None: |
|
|
member.priority = request.priority |
|
|
if request.weight is not None: |
|
|
member.weight = request.weight |
|
|
if request.enabled is not None: |
|
|
member.enabled = request.enabled |
|
|
|
|
|
session.commit() |
|
|
session.refresh(member) |
|
|
|
|
|
result = { |
|
|
"pool_id": pool_id, |
|
|
"provider_id": provider_id, |
|
|
"priority": member.priority, |
|
|
"weight": member.weight, |
|
|
"enabled": member.enabled, |
|
|
"message": "Pool member updated successfully" |
|
|
} |
|
|
|
|
|
session.close() |
|
|
|
|
|
return result |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error updating pool member: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to update pool member: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.delete("/{pool_id}/members/{provider_id}") |
|
|
async def remove_pool_member(pool_id: int, provider_id: int): |
|
|
""" |
|
|
Remove a provider from a pool |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
provider_id: Provider ID |
|
|
|
|
|
Returns: |
|
|
Deletion confirmation |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
|
|
|
from database.models import PoolMember |
|
|
member = ( |
|
|
session.query(PoolMember) |
|
|
.filter_by(pool_id=pool_id, provider_id=provider_id) |
|
|
.first() |
|
|
) |
|
|
|
|
|
if not member: |
|
|
session.close() |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"Member not found in pool {pool_id}" |
|
|
) |
|
|
|
|
|
session.delete(member) |
|
|
session.commit() |
|
|
session.close() |
|
|
|
|
|
return { |
|
|
"message": "Provider removed from pool successfully", |
|
|
"pool_id": pool_id, |
|
|
"provider_id": provider_id |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error removing pool member: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to remove pool member: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/{pool_id}/rotate") |
|
|
async def trigger_rotation(pool_id: int, request: TriggerRotationRequest): |
|
|
""" |
|
|
Trigger manual rotation to next provider in pool |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
request: Rotation request |
|
|
|
|
|
Returns: |
|
|
New provider information |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
pool_manager = SourcePoolManager(session) |
|
|
|
|
|
provider = pool_manager.get_next_provider(pool_id) |
|
|
|
|
|
session.close() |
|
|
|
|
|
if not provider: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"No available providers in pool {pool_id}" |
|
|
) |
|
|
|
|
|
return { |
|
|
"pool_id": pool_id, |
|
|
"provider_id": provider.id, |
|
|
"provider_name": provider.name, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"message": f"Rotated to provider '{provider.name}'" |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error triggering rotation: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to trigger rotation: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/{pool_id}/failover") |
|
|
async def trigger_failover(pool_id: int, request: FailoverRequest): |
|
|
""" |
|
|
Trigger failover from a failed provider |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
request: Failover request |
|
|
|
|
|
Returns: |
|
|
New provider information |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
pool_manager = SourcePoolManager(session) |
|
|
|
|
|
provider = pool_manager.failover( |
|
|
pool_id=pool_id, |
|
|
failed_provider_id=request.failed_provider_id, |
|
|
reason=request.reason |
|
|
) |
|
|
|
|
|
session.close() |
|
|
|
|
|
if not provider: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"No alternative providers available in pool {pool_id}" |
|
|
) |
|
|
|
|
|
return { |
|
|
"pool_id": pool_id, |
|
|
"failed_provider_id": request.failed_provider_id, |
|
|
"new_provider_id": provider.id, |
|
|
"new_provider_name": provider.name, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"message": f"Failover successful: switched to '{provider.name}'" |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error triggering failover: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to trigger failover: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/{pool_id}/history") |
|
|
async def get_rotation_history(pool_id: int, limit: int = 50): |
|
|
""" |
|
|
Get rotation history for a pool |
|
|
|
|
|
Args: |
|
|
pool_id: Pool ID |
|
|
limit: Maximum number of records to return |
|
|
|
|
|
Returns: |
|
|
List of rotation history records |
|
|
""" |
|
|
try: |
|
|
session = db_manager.get_session() |
|
|
|
|
|
from database.models import RotationHistory, Provider |
|
|
history = ( |
|
|
session.query(RotationHistory) |
|
|
.filter_by(pool_id=pool_id) |
|
|
.order_by(RotationHistory.timestamp.desc()) |
|
|
.limit(limit) |
|
|
.all() |
|
|
) |
|
|
|
|
|
history_list = [] |
|
|
for record in history: |
|
|
from_provider = None |
|
|
if record.from_provider_id: |
|
|
from_prov = session.query(Provider).get(record.from_provider_id) |
|
|
from_provider = from_prov.name if from_prov else None |
|
|
|
|
|
to_prov = session.query(Provider).get(record.to_provider_id) |
|
|
to_provider = to_prov.name if to_prov else None |
|
|
|
|
|
history_list.append({ |
|
|
"id": record.id, |
|
|
"timestamp": record.timestamp.isoformat(), |
|
|
"from_provider": from_provider, |
|
|
"to_provider": to_provider, |
|
|
"reason": record.rotation_reason, |
|
|
"success": record.success, |
|
|
"notes": record.notes |
|
|
}) |
|
|
|
|
|
session.close() |
|
|
|
|
|
return { |
|
|
"pool_id": pool_id, |
|
|
"history": history_list, |
|
|
"total": len(history_list) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting rotation history: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Failed to get rotation history: {str(e)}") |
|
|
|
|
|
|
|
|
logger.info("Pool API endpoints module loaded successfully") |
|
|
|