|
|
""" |
|
|
Unified Async API Client - Replace mixed sync/async HTTP calls |
|
|
Implements retry logic, error handling, and logging consistently |
|
|
""" |
|
|
|
|
|
import aiohttp |
|
|
import asyncio |
|
|
import logging |
|
|
from typing import Optional, Dict, Any, List |
|
|
from datetime import datetime, timedelta |
|
|
import traceback |
|
|
|
|
|
import config |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class AsyncAPIClient: |
|
|
""" |
|
|
Unified async HTTP client with retry logic and error handling |
|
|
Replaces mixed requests/aiohttp calls throughout the codebase |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
timeout: int = config.REQUEST_TIMEOUT, |
|
|
max_retries: int = config.MAX_RETRIES, |
|
|
retry_delay: float = 2.0 |
|
|
): |
|
|
""" |
|
|
Initialize async API client |
|
|
|
|
|
Args: |
|
|
timeout: Request timeout in seconds |
|
|
max_retries: Maximum number of retry attempts |
|
|
retry_delay: Base delay between retries (exponential backoff) |
|
|
""" |
|
|
self.timeout = aiohttp.ClientTimeout(total=timeout) |
|
|
self.max_retries = max_retries |
|
|
self.retry_delay = retry_delay |
|
|
self._session: Optional[aiohttp.ClientSession] = None |
|
|
|
|
|
async def __aenter__(self): |
|
|
"""Async context manager entry""" |
|
|
self._session = aiohttp.ClientSession(timeout=self.timeout) |
|
|
return self |
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb): |
|
|
"""Async context manager exit""" |
|
|
if self._session: |
|
|
await self._session.close() |
|
|
|
|
|
async def get( |
|
|
self, |
|
|
url: str, |
|
|
params: Optional[Dict[str, Any]] = None, |
|
|
headers: Optional[Dict[str, str]] = None |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Make async GET request with retry logic |
|
|
|
|
|
Args: |
|
|
url: Request URL |
|
|
params: Query parameters |
|
|
headers: HTTP headers |
|
|
|
|
|
Returns: |
|
|
JSON response as dictionary or None on failure |
|
|
""" |
|
|
if not self._session: |
|
|
raise RuntimeError("Client must be used as async context manager") |
|
|
|
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
logger.debug(f"GET {url} (attempt {attempt + 1}/{self.max_retries})") |
|
|
|
|
|
async with self._session.get(url, params=params, headers=headers) as response: |
|
|
response.raise_for_status() |
|
|
data = await response.json() |
|
|
logger.debug(f"GET {url} successful") |
|
|
return data |
|
|
|
|
|
except aiohttp.ClientResponseError as e: |
|
|
logger.warning(f"HTTP {e.status} error on {url}: {e.message}") |
|
|
if e.status in (404, 400, 401, 403): |
|
|
|
|
|
return None |
|
|
|
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
|
|
continue |
|
|
return None |
|
|
|
|
|
except aiohttp.ClientConnectionError as e: |
|
|
logger.warning(f"Connection error on {url}: {e}") |
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
|
|
continue |
|
|
return None |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
logger.warning(f"Timeout on {url} (attempt {attempt + 1})") |
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
|
|
continue |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error on {url}: {e}\n{traceback.format_exc()}") |
|
|
return None |
|
|
|
|
|
return None |
|
|
|
|
|
async def post( |
|
|
self, |
|
|
url: str, |
|
|
data: Optional[Dict[str, Any]] = None, |
|
|
json: Optional[Dict[str, Any]] = None, |
|
|
headers: Optional[Dict[str, str]] = None |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Make async POST request with retry logic |
|
|
|
|
|
Args: |
|
|
url: Request URL |
|
|
data: Form data |
|
|
json: JSON payload |
|
|
headers: HTTP headers |
|
|
|
|
|
Returns: |
|
|
JSON response as dictionary or None on failure |
|
|
""" |
|
|
if not self._session: |
|
|
raise RuntimeError("Client must be used as async context manager") |
|
|
|
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
logger.debug(f"POST {url} (attempt {attempt + 1}/{self.max_retries})") |
|
|
|
|
|
async with self._session.post( |
|
|
url, data=data, json=json, headers=headers |
|
|
) as response: |
|
|
response.raise_for_status() |
|
|
response_data = await response.json() |
|
|
logger.debug(f"POST {url} successful") |
|
|
return response_data |
|
|
|
|
|
except aiohttp.ClientResponseError as e: |
|
|
logger.warning(f"HTTP {e.status} error on {url}: {e.message}") |
|
|
if e.status in (404, 400, 401, 403): |
|
|
return None |
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
|
|
continue |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error on POST {url}: {e}") |
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(self.retry_delay * (2 ** attempt)) |
|
|
continue |
|
|
return None |
|
|
|
|
|
return None |
|
|
|
|
|
async def gather_requests( |
|
|
self, |
|
|
urls: List[str], |
|
|
params_list: Optional[List[Optional[Dict[str, Any]]]] = None |
|
|
) -> List[Optional[Dict[str, Any]]]: |
|
|
""" |
|
|
Make multiple async GET requests in parallel |
|
|
|
|
|
Args: |
|
|
urls: List of URLs to fetch |
|
|
params_list: Optional list of params for each URL |
|
|
|
|
|
Returns: |
|
|
List of responses (None for failed requests) |
|
|
""" |
|
|
if params_list is None: |
|
|
params_list = [None] * len(urls) |
|
|
|
|
|
tasks = [ |
|
|
self.get(url, params=params) |
|
|
for url, params in zip(urls, params_list) |
|
|
] |
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
return [ |
|
|
result if not isinstance(result, Exception) else None |
|
|
for result in results |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def safe_api_call( |
|
|
url: str, |
|
|
params: Optional[Dict[str, Any]] = None, |
|
|
headers: Optional[Dict[str, str]] = None, |
|
|
timeout: int = config.REQUEST_TIMEOUT |
|
|
) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Convenience function for single async API call |
|
|
|
|
|
Args: |
|
|
url: Request URL |
|
|
params: Query parameters |
|
|
headers: HTTP headers |
|
|
timeout: Request timeout |
|
|
|
|
|
Returns: |
|
|
JSON response or None on failure |
|
|
""" |
|
|
async with AsyncAPIClient(timeout=timeout) as client: |
|
|
return await client.get(url, params=params, headers=headers) |
|
|
|
|
|
|
|
|
async def parallel_api_calls( |
|
|
urls: List[str], |
|
|
params_list: Optional[List[Optional[Dict[str, Any]]]] = None, |
|
|
timeout: int = config.REQUEST_TIMEOUT |
|
|
) -> List[Optional[Dict[str, Any]]]: |
|
|
""" |
|
|
Convenience function for parallel async API calls |
|
|
|
|
|
Args: |
|
|
urls: List of URLs |
|
|
params_list: Optional params for each URL |
|
|
timeout: Request timeout |
|
|
|
|
|
Returns: |
|
|
List of responses (None for failures) |
|
|
""" |
|
|
async with AsyncAPIClient(timeout=timeout) as client: |
|
|
return await client.gather_requests(urls, params_list) |
|
|
|