Spaces:
Sleeping
Sleeping
| """ | |
| Rate-Limited Serper API Wrapper | |
| ================================ | |
| Throttles Serper API requests to 5 per second (free tier limit) | |
| with caching to reduce duplicate queries. | |
| Usage in your agents: | |
| from serper_rate_limited import rate_limited_serper_search | |
| results = await rate_limited_serper_search(query) | |
| """ | |
| import asyncio | |
| import aiohttp | |
| import time | |
| from functools import lru_cache | |
| from collections import deque | |
| import hashlib | |
| class SerperRateLimiter: | |
| """Rate limiter for Serper API (5 requests per second).""" | |
| def __init__(self, max_requests_per_second=5): | |
| self.max_requests_per_second = max_requests_per_second | |
| self.request_times = deque() | |
| self.lock = asyncio.Lock() | |
| async def acquire(self): | |
| """Wait until we can make a request within rate limit.""" | |
| async with self.lock: | |
| now = time.time() | |
| # Remove requests older than 1 second | |
| while self.request_times and now - self.request_times[0] > 1.0: | |
| self.request_times.popleft() | |
| # If we've hit the limit, wait | |
| if len(self.request_times) >= self.max_requests_per_second: | |
| sleep_time = 1.0 - (now - self.request_times[0]) | |
| if sleep_time > 0: | |
| await asyncio.sleep(sleep_time) | |
| # Remove old requests after waiting | |
| now = time.time() | |
| while self.request_times and now - self.request_times[0] > 1.0: | |
| self.request_times.popleft() | |
| # Record this request | |
| self.request_times.append(time.time()) | |
| # Global rate limiter instance | |
| rate_limiter = SerperRateLimiter(max_requests_per_second=5) | |
| # Simple in-memory cache (TTL: 10 minutes) | |
| _search_cache = {} | |
| _cache_ttl = 600 # 10 minutes | |
| def _cache_key(query: str) -> str: | |
| """Generate cache key from query.""" | |
| return hashlib.md5(query.lower().encode()).hexdigest() | |
| def _get_cached_result(query: str): | |
| """Get cached search result if available and not expired.""" | |
| key = _cache_key(query) | |
| if key in _search_cache: | |
| result, timestamp = _search_cache[key] | |
| if time.time() - timestamp < _cache_ttl: | |
| return result | |
| else: | |
| del _search_cache[key] | |
| return None | |
| def _cache_result(query: str, result): | |
| """Cache search result.""" | |
| key = _cache_key(query) | |
| _search_cache[key] = (result, time.time()) | |
| async def rate_limited_serper_search(query: str, api_key: str, num_results: int = 5) -> dict: | |
| """ | |
| Make a rate-limited Serper API request with caching. | |
| Args: | |
| query: Search query string | |
| api_key: Serper API key | |
| num_results: Number of results to return (default: 5) | |
| Returns: | |
| Search results dict or None if failed | |
| """ | |
| # Check cache first | |
| cached = _get_cached_result(query) | |
| if cached: | |
| return cached | |
| # Wait for rate limit clearance | |
| await rate_limiter.acquire() | |
| # Make API request | |
| url = "https://google.serper.dev/search" | |
| headers = { | |
| "X-API-KEY": api_key, | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "q": query, | |
| "num": num_results | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| _cache_result(query, result) | |
| return result | |
| elif response.status == 429: | |
| # Still hit rate limit, wait and retry once | |
| await asyncio.sleep(1.0) | |
| return await rate_limited_serper_search(query, api_key, num_results) | |
| else: | |
| print(f"Serper API error {response.status}: {await response.text()}") | |
| return None | |
| except Exception as e: | |
| print(f"Serper API exception: {e}") | |
| return None | |
| # Synchronous wrapper for compatibility | |
| def rate_limited_serper_search_sync(query: str, api_key: str, num_results: int = 5) -> dict: | |
| """Synchronous version of rate_limited_serper_search.""" | |
| loop = asyncio.get_event_loop() | |
| return loop.run_until_complete(rate_limited_serper_search(query, api_key, num_results)) | |