IDAgentsFreshTest / core /utils /serper_rate_limited.py
IDAgents Developer
Add API load testing suite and rate limiters for workshop readiness
13537fe
raw
history blame
4.46 kB
"""
Rate-Limited Serper API Wrapper
================================
Throttles Serper API requests to 5 per second (free tier limit)
with caching to reduce duplicate queries.
Usage in your agents:
from serper_rate_limited import rate_limited_serper_search
results = await rate_limited_serper_search(query)
"""
import asyncio
import aiohttp
import time
from functools import lru_cache
from collections import deque
import hashlib
class SerperRateLimiter:
"""Rate limiter for Serper API (5 requests per second)."""
def __init__(self, max_requests_per_second=5):
self.max_requests_per_second = max_requests_per_second
self.request_times = deque()
self.lock = asyncio.Lock()
async def acquire(self):
"""Wait until we can make a request within rate limit."""
async with self.lock:
now = time.time()
# Remove requests older than 1 second
while self.request_times and now - self.request_times[0] > 1.0:
self.request_times.popleft()
# If we've hit the limit, wait
if len(self.request_times) >= self.max_requests_per_second:
sleep_time = 1.0 - (now - self.request_times[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Remove old requests after waiting
now = time.time()
while self.request_times and now - self.request_times[0] > 1.0:
self.request_times.popleft()
# Record this request
self.request_times.append(time.time())
# Global rate limiter instance
rate_limiter = SerperRateLimiter(max_requests_per_second=5)
# Simple in-memory cache (TTL: 10 minutes)
_search_cache = {}
_cache_ttl = 600 # 10 minutes
def _cache_key(query: str) -> str:
"""Generate cache key from query."""
return hashlib.md5(query.lower().encode()).hexdigest()
def _get_cached_result(query: str):
"""Get cached search result if available and not expired."""
key = _cache_key(query)
if key in _search_cache:
result, timestamp = _search_cache[key]
if time.time() - timestamp < _cache_ttl:
return result
else:
del _search_cache[key]
return None
def _cache_result(query: str, result):
"""Cache search result."""
key = _cache_key(query)
_search_cache[key] = (result, time.time())
async def rate_limited_serper_search(query: str, api_key: str, num_results: int = 5) -> dict:
"""
Make a rate-limited Serper API request with caching.
Args:
query: Search query string
api_key: Serper API key
num_results: Number of results to return (default: 5)
Returns:
Search results dict or None if failed
"""
# Check cache first
cached = _get_cached_result(query)
if cached:
return cached
# Wait for rate limit clearance
await rate_limiter.acquire()
# Make API request
url = "https://google.serper.dev/search"
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json"
}
payload = {
"q": query,
"num": num_results
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
result = await response.json()
_cache_result(query, result)
return result
elif response.status == 429:
# Still hit rate limit, wait and retry once
await asyncio.sleep(1.0)
return await rate_limited_serper_search(query, api_key, num_results)
else:
print(f"Serper API error {response.status}: {await response.text()}")
return None
except Exception as e:
print(f"Serper API exception: {e}")
return None
# Synchronous wrapper for compatibility
def rate_limited_serper_search_sync(query: str, api_key: str, num_results: int = 5) -> dict:
"""Synchronous version of rate_limited_serper_search."""
loop = asyncio.get_event_loop()
return loop.run_until_complete(rate_limited_serper_search(query, api_key, num_results))