Spaces:
Sleeping
Sleeping
File size: 4,458 Bytes
13537fe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
"""
Rate-Limited Serper API Wrapper
================================
Throttles Serper API requests to 5 per second (free tier limit)
with caching to reduce duplicate queries.
Usage in your agents:
from serper_rate_limited import rate_limited_serper_search
results = await rate_limited_serper_search(query)
"""
import asyncio
import aiohttp
import time
from functools import lru_cache
from collections import deque
import hashlib
class SerperRateLimiter:
"""Rate limiter for Serper API (5 requests per second)."""
def __init__(self, max_requests_per_second=5):
self.max_requests_per_second = max_requests_per_second
self.request_times = deque()
self.lock = asyncio.Lock()
async def acquire(self):
"""Wait until we can make a request within rate limit."""
async with self.lock:
now = time.time()
# Remove requests older than 1 second
while self.request_times and now - self.request_times[0] > 1.0:
self.request_times.popleft()
# If we've hit the limit, wait
if len(self.request_times) >= self.max_requests_per_second:
sleep_time = 1.0 - (now - self.request_times[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Remove old requests after waiting
now = time.time()
while self.request_times and now - self.request_times[0] > 1.0:
self.request_times.popleft()
# Record this request
self.request_times.append(time.time())
# Global rate limiter instance
rate_limiter = SerperRateLimiter(max_requests_per_second=5)
# Simple in-memory cache (TTL: 10 minutes)
_search_cache = {}
_cache_ttl = 600 # 10 minutes
def _cache_key(query: str) -> str:
"""Generate cache key from query."""
return hashlib.md5(query.lower().encode()).hexdigest()
def _get_cached_result(query: str):
"""Get cached search result if available and not expired."""
key = _cache_key(query)
if key in _search_cache:
result, timestamp = _search_cache[key]
if time.time() - timestamp < _cache_ttl:
return result
else:
del _search_cache[key]
return None
def _cache_result(query: str, result):
"""Cache search result."""
key = _cache_key(query)
_search_cache[key] = (result, time.time())
async def rate_limited_serper_search(query: str, api_key: str, num_results: int = 5) -> dict:
"""
Make a rate-limited Serper API request with caching.
Args:
query: Search query string
api_key: Serper API key
num_results: Number of results to return (default: 5)
Returns:
Search results dict or None if failed
"""
# Check cache first
cached = _get_cached_result(query)
if cached:
return cached
# Wait for rate limit clearance
await rate_limiter.acquire()
# Make API request
url = "https://google.serper.dev/search"
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json"
}
payload = {
"q": query,
"num": num_results
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
result = await response.json()
_cache_result(query, result)
return result
elif response.status == 429:
# Still hit rate limit, wait and retry once
await asyncio.sleep(1.0)
return await rate_limited_serper_search(query, api_key, num_results)
else:
print(f"Serper API error {response.status}: {await response.text()}")
return None
except Exception as e:
print(f"Serper API exception: {e}")
return None
# Synchronous wrapper for compatibility
def rate_limited_serper_search_sync(query: str, api_key: str, num_results: int = 5) -> dict:
"""Synchronous version of rate_limited_serper_search."""
loop = asyncio.get_event_loop()
return loop.run_until_complete(rate_limited_serper_search(query, api_key, num_results))
|