File size: 4,458 Bytes
13537fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Rate-Limited Serper API Wrapper
================================
Throttles Serper API requests to 5 per second (free tier limit)
with caching to reduce duplicate queries.

Usage in your agents:
    from serper_rate_limited import rate_limited_serper_search
    
    results = await rate_limited_serper_search(query)
"""

import asyncio
import aiohttp
import time
from functools import lru_cache
from collections import deque
import hashlib


class SerperRateLimiter:
    """Rate limiter for Serper API (5 requests per second)."""
    
    def __init__(self, max_requests_per_second=5):
        self.max_requests_per_second = max_requests_per_second
        self.request_times = deque()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        """Wait until we can make a request within rate limit."""
        async with self.lock:
            now = time.time()
            
            # Remove requests older than 1 second
            while self.request_times and now - self.request_times[0] > 1.0:
                self.request_times.popleft()
            
            # If we've hit the limit, wait
            if len(self.request_times) >= self.max_requests_per_second:
                sleep_time = 1.0 - (now - self.request_times[0])
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)
                    # Remove old requests after waiting
                    now = time.time()
                    while self.request_times and now - self.request_times[0] > 1.0:
                        self.request_times.popleft()
            
            # Record this request
            self.request_times.append(time.time())


# Global rate limiter instance
rate_limiter = SerperRateLimiter(max_requests_per_second=5)

# Simple in-memory cache (TTL: 10 minutes)
_search_cache = {}
_cache_ttl = 600  # 10 minutes


def _cache_key(query: str) -> str:
    """Generate cache key from query."""
    return hashlib.md5(query.lower().encode()).hexdigest()


def _get_cached_result(query: str):
    """Get cached search result if available and not expired."""
    key = _cache_key(query)
    if key in _search_cache:
        result, timestamp = _search_cache[key]
        if time.time() - timestamp < _cache_ttl:
            return result
        else:
            del _search_cache[key]
    return None


def _cache_result(query: str, result):
    """Cache search result."""
    key = _cache_key(query)
    _search_cache[key] = (result, time.time())


async def rate_limited_serper_search(query: str, api_key: str, num_results: int = 5) -> dict:
    """
    Make a rate-limited Serper API request with caching.
    
    Args:
        query: Search query string
        api_key: Serper API key
        num_results: Number of results to return (default: 5)
    
    Returns:
        Search results dict or None if failed
    """
    # Check cache first
    cached = _get_cached_result(query)
    if cached:
        return cached
    
    # Wait for rate limit clearance
    await rate_limiter.acquire()
    
    # Make API request
    url = "https://google.serper.dev/search"
    headers = {
        "X-API-KEY": api_key,
        "Content-Type": "application/json"
    }
    payload = {
        "q": query,
        "num": num_results
    }
    
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
                if response.status == 200:
                    result = await response.json()
                    _cache_result(query, result)
                    return result
                elif response.status == 429:
                    # Still hit rate limit, wait and retry once
                    await asyncio.sleep(1.0)
                    return await rate_limited_serper_search(query, api_key, num_results)
                else:
                    print(f"Serper API error {response.status}: {await response.text()}")
                    return None
    except Exception as e:
        print(f"Serper API exception: {e}")
        return None


# Synchronous wrapper for compatibility
def rate_limited_serper_search_sync(query: str, api_key: str, num_results: int = 5) -> dict:
    """Synchronous version of rate_limited_serper_search."""
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(rate_limited_serper_search(query, api_key, num_results))