Spaces:
Sleeping
Sleeping
| """ | |
| Test Rate Limiters on HF Space | |
| ================================ | |
| Tests both Serper and NCBI rate limiters with concurrent requests | |
| to verify they work correctly and prevent HTTP 429 errors. | |
| Usage: | |
| python scripts/test_rate_limiters.py | |
| """ | |
| import asyncio | |
| import time | |
| import os | |
| from datetime import datetime | |
| # Import the rate-limited wrappers | |
| import sys | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) | |
| from core.utils.serper_rate_limited import rate_limited_serper_search | |
| from core.utils.ncbi_rate_limited import rate_limited_pubmed_search | |
| # Test queries | |
| SERPER_TEST_QUERIES = [ | |
| "antibiotic resistance mechanisms", | |
| "COVID-19 treatment guidelines", | |
| "hospital acquired infections prevention", | |
| "sepsis diagnosis criteria", | |
| "antimicrobial stewardship programs" | |
| ] | |
| NCBI_TEST_QUERIES = [ | |
| "antibiotic resistance", | |
| "hospital infection control", | |
| "sepsis management", | |
| "antimicrobial stewardship", | |
| "infectious disease epidemiology" | |
| ] | |
| class RateLimiterTester: | |
| """Test rate limiters with concurrent requests""" | |
| def __init__(self): | |
| self.serper_results = [] | |
| self.ncbi_results = [] | |
| self.serper_api_key = os.getenv("SERPER_API_KEY") | |
| self.ncbi_api_key = os.getenv("NCBI_API_KEY") | |
| async def test_serper_single(self, query: str, request_id: int): | |
| """Test a single Serper API request""" | |
| start_time = time.time() | |
| try: | |
| result = await rate_limited_serper_search(query, self.serper_api_key, num_results=3) | |
| elapsed = time.time() - start_time | |
| if result and "organic" in result: | |
| num_results = len(result.get("organic", [])) | |
| self.serper_results.append({ | |
| "request_id": request_id, | |
| "query": query, | |
| "status": "success", | |
| "elapsed": elapsed, | |
| "num_results": num_results | |
| }) | |
| print(f" β Serper #{request_id}: {query[:40]}... ({elapsed:.2f}s, {num_results} results)") | |
| else: | |
| self.serper_results.append({ | |
| "request_id": request_id, | |
| "query": query, | |
| "status": "no_results", | |
| "elapsed": elapsed | |
| }) | |
| print(f" β οΈ Serper #{request_id}: No results ({elapsed:.2f}s)") | |
| except Exception as e: | |
| elapsed = time.time() - start_time | |
| self.serper_results.append({ | |
| "request_id": request_id, | |
| "query": query, | |
| "status": "error", | |
| "elapsed": elapsed, | |
| "error": str(e) | |
| }) | |
| print(f" β Serper #{request_id}: Error - {e} ({elapsed:.2f}s)") | |
| async def test_ncbi_single(self, query: str, request_id: int): | |
| """Test a single NCBI API request""" | |
| start_time = time.time() | |
| try: | |
| result = await rate_limited_pubmed_search(query, self.ncbi_api_key, max_results=5) | |
| elapsed = time.time() - start_time | |
| if result and "esearchresult" in result: | |
| idlist = result["esearchresult"].get("idlist", []) | |
| num_results = len(idlist) | |
| self.ncbi_results.append({ | |
| "request_id": request_id, | |
| "query": query, | |
| "status": "success", | |
| "elapsed": elapsed, | |
| "num_results": num_results | |
| }) | |
| print(f" β NCBI #{request_id}: {query[:40]}... ({elapsed:.2f}s, {num_results} articles)") | |
| else: | |
| self.ncbi_results.append({ | |
| "request_id": request_id, | |
| "query": query, | |
| "status": "no_results", | |
| "elapsed": elapsed | |
| }) | |
| print(f" β οΈ NCBI #{request_id}: No results ({elapsed:.2f}s)") | |
| except Exception as e: | |
| elapsed = time.time() - start_time | |
| self.ncbi_results.append({ | |
| "request_id": request_id, | |
| "query": query, | |
| "status": "error", | |
| "elapsed": elapsed, | |
| "error": str(e) | |
| }) | |
| print(f" β NCBI #{request_id}: Error - {e} ({elapsed:.2f}s)") | |
| async def test_serper_concurrent(self, num_requests: int = 10): | |
| """Test Serper API with concurrent requests""" | |
| print(f"\n{'='*70}") | |
| print(f"π Testing Serper API Rate Limiter ({num_requests} concurrent requests)") | |
| print(f"{'='*70}") | |
| if not self.serper_api_key: | |
| print("β ERROR: SERPER_API_KEY not found in environment") | |
| return | |
| print(f"β Serper API Key found: {self.serper_api_key[:10]}...") | |
| print(f"β³ Starting {num_requests} concurrent requests...\n") | |
| start_time = time.time() | |
| # Create tasks - use queries cyclically | |
| tasks = [] | |
| for i in range(num_requests): | |
| query = SERPER_TEST_QUERIES[i % len(SERPER_TEST_QUERIES)] | |
| tasks.append(self.test_serper_single(query, i + 1)) | |
| # Execute all tasks concurrently | |
| await asyncio.gather(*tasks) | |
| total_time = time.time() - start_time | |
| # Analyze results | |
| print(f"\n{'='*70}") | |
| print(f"π Serper API Test Results") | |
| print(f"{'='*70}") | |
| success = [r for r in self.serper_results if r["status"] == "success"] | |
| no_results = [r for r in self.serper_results if r["status"] == "no_results"] | |
| errors = [r for r in self.serper_results if r["status"] == "error"] | |
| print(f"Total Requests: {num_requests}") | |
| print(f"Successful: {len(success)} ({len(success)/num_requests*100:.1f}%)") | |
| print(f"No Results: {len(no_results)} ({len(no_results)/num_requests*100:.1f}%)") | |
| print(f"Errors: {len(errors)} ({len(errors)/num_requests*100:.1f}%)") | |
| print(f"Total Time: {total_time:.2f}s") | |
| print(f"Avg Throughput: {num_requests/total_time:.2f} req/s") | |
| if success: | |
| avg_time = sum(r["elapsed"] for r in success) / len(success) | |
| min_time = min(r["elapsed"] for r in success) | |
| max_time = max(r["elapsed"] for r in success) | |
| print(f"\nResponse Times (successful):") | |
| print(f" Average: {avg_time:.2f}s") | |
| print(f" Min: {min_time:.2f}s") | |
| print(f" Max: {max_time:.2f}s") | |
| if errors: | |
| print(f"\nβ οΈ Errors found:") | |
| for err in errors[:5]: # Show first 5 errors | |
| print(f" - Request #{err['request_id']}: {err.get('error', 'Unknown')}") | |
| # Check for HTTP 429 errors | |
| http_429_errors = [e for e in errors if "429" in str(e.get("error", ""))] | |
| if http_429_errors: | |
| print(f"\nβ CRITICAL: {len(http_429_errors)} HTTP 429 (Rate Limit) errors detected!") | |
| print(f" Rate limiter may not be working correctly.") | |
| else: | |
| print(f"\nβ SUCCESS: No HTTP 429 errors - Rate limiter working!") | |
| async def test_ncbi_concurrent(self, num_requests: int = 10): | |
| """Test NCBI API with concurrent requests""" | |
| print(f"\n{'='*70}") | |
| print(f"π¬ Testing NCBI API Rate Limiter ({num_requests} concurrent requests)") | |
| print(f"{'='*70}") | |
| if self.ncbi_api_key: | |
| print(f"β NCBI API Key found: {self.ncbi_api_key[:10]}...") | |
| print(f" Rate limit: 10 req/s (using 8 req/s throttle)") | |
| else: | |
| print(f"β οΈ No NCBI API Key found - SKIPPING NCBI TESTS") | |
| print(f" This is expected if running locally without API key") | |
| print(f" NCBI tests will run on HF Space where key is configured") | |
| return | |
| print(f"β³ Starting {num_requests} concurrent requests...\n") | |
| start_time = time.time() | |
| # Create tasks - use queries cyclically | |
| tasks = [] | |
| for i in range(num_requests): | |
| query = NCBI_TEST_QUERIES[i % len(NCBI_TEST_QUERIES)] | |
| tasks.append(self.test_ncbi_single(query, i + 1)) | |
| # Execute all tasks concurrently | |
| await asyncio.gather(*tasks) | |
| total_time = time.time() - start_time | |
| # Analyze results | |
| print(f"\n{'='*70}") | |
| print(f"π NCBI API Test Results") | |
| print(f"{'='*70}") | |
| success = [r for r in self.ncbi_results if r["status"] == "success"] | |
| no_results = [r for r in self.ncbi_results if r["status"] == "no_results"] | |
| errors = [r for r in self.ncbi_results if r["status"] == "error"] | |
| print(f"Total Requests: {num_requests}") | |
| print(f"Successful: {len(success)} ({len(success)/num_requests*100:.1f}%)") | |
| print(f"No Results: {len(no_results)} ({len(no_results)/num_requests*100:.1f}%)") | |
| print(f"Errors: {len(errors)} ({len(errors)/num_requests*100:.1f}%)") | |
| print(f"Total Time: {total_time:.2f}s") | |
| print(f"Avg Throughput: {num_requests/total_time:.2f} req/s") | |
| if success: | |
| avg_time = sum(r["elapsed"] for r in success) / len(success) | |
| min_time = min(r["elapsed"] for r in success) | |
| max_time = max(r["elapsed"] for r in success) | |
| print(f"\nResponse Times (successful):") | |
| print(f" Average: {avg_time:.2f}s") | |
| print(f" Min: {min_time:.2f}s") | |
| print(f" Max: {max_time:.2f}s") | |
| if errors: | |
| print(f"\nβ οΈ Errors found:") | |
| for err in errors[:5]: # Show first 5 errors | |
| print(f" - Request #{err['request_id']}: {err.get('error', 'Unknown')}") | |
| # Check for HTTP 429 errors | |
| http_429_errors = [e for e in errors if "429" in str(e.get("error", ""))] | |
| if http_429_errors: | |
| print(f"\nβ CRITICAL: {len(http_429_errors)} HTTP 429 (Rate Limit) errors detected!") | |
| print(f" Rate limiter may not be working correctly.") | |
| else: | |
| print(f"\nβ SUCCESS: No HTTP 429 errors - Rate limiter working!") | |
| async def test_cache_effectiveness(self): | |
| """Test cache by running same queries twice""" | |
| print(f"\n{'='*70}") | |
| print(f"πΎ Testing Cache Effectiveness") | |
| print(f"{'='*70}") | |
| if not self.serper_api_key: | |
| print("β οΈ Serper API key not found - skipping cache test") | |
| return | |
| test_query = "antibiotic resistance mechanisms" | |
| # First request (should hit API) | |
| print(f"\n1οΈβ£ First request (should hit API):") | |
| start1 = time.time() | |
| result1 = await rate_limited_serper_search(test_query, self.serper_api_key, num_results=3) | |
| time1 = time.time() - start1 | |
| print(f" Time: {time1:.3f}s") | |
| # Wait a moment | |
| await asyncio.sleep(0.5) | |
| # Second request (should hit cache) | |
| print(f"\n2οΈβ£ Second request (should hit cache):") | |
| start2 = time.time() | |
| result2 = await rate_limited_serper_search(test_query, self.serper_api_key, num_results=3) | |
| time2 = time.time() - start2 | |
| print(f" Time: {time2:.3f}s") | |
| # Analysis | |
| print(f"\nπ Cache Analysis:") | |
| if time2 < time1 * 0.3: # Second request should be <30% of first | |
| print(f" β Cache HIT detected! (2nd request {time2/time1*100:.1f}% of 1st)") | |
| print(f" Speedup: {time1/time2:.1f}x faster") | |
| else: | |
| print(f" β οΈ Cache may not be working (2nd: {time2:.3f}s vs 1st: {time1:.3f}s)") | |
| # NCBI cache test (only if API key available) | |
| if self.ncbi_api_key: | |
| print(f"\n3οΈβ£ Testing NCBI cache:") | |
| ncbi_query = "sepsis diagnosis" | |
| start3 = time.time() | |
| result3 = await rate_limited_pubmed_search(ncbi_query, self.ncbi_api_key, max_results=5) | |
| time3 = time.time() - start3 | |
| print(f" First request: {time3:.3f}s") | |
| await asyncio.sleep(0.5) | |
| start4 = time.time() | |
| result4 = await rate_limited_pubmed_search(ncbi_query, self.ncbi_api_key, max_results=5) | |
| time4 = time.time() - start4 | |
| print(f" Second request: {time4:.3f}s") | |
| if time4 < time3 * 0.3: | |
| print(f" β Cache HIT detected! (2nd request {time4/time3*100:.1f}% of 1st)") | |
| print(f" Speedup: {time3/time4:.1f}x faster") | |
| else: | |
| print(f" β οΈ Cache may not be working (2nd: {time4:.3f}s vs 1st: {time3:.3f}s)") | |
| else: | |
| print(f"\n3οΈβ£ NCBI cache test skipped (no API key)") | |
| async def run_all_tests(self): | |
| """Run all rate limiter tests""" | |
| print(f"\n{'='*70}") | |
| print(f"π IDWeek Agents - Rate Limiter Test Suite") | |
| print(f"{'='*70}") | |
| print(f"Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"Environment: {'Production' if 'SPACE_ID' in os.environ else 'Local'}") | |
| # Test 1: Serper concurrent requests | |
| await self.test_serper_concurrent(num_requests=15) | |
| # Wait between tests | |
| await asyncio.sleep(2) | |
| # Test 2: NCBI concurrent requests | |
| await self.test_ncbi_concurrent(num_requests=15) | |
| # Wait between tests | |
| await asyncio.sleep(2) | |
| # Test 3: Cache effectiveness | |
| await self.test_cache_effectiveness() | |
| # Final summary | |
| print(f"\n{'='*70}") | |
| print(f"β All Tests Complete!") | |
| print(f"{'='*70}") | |
| # Overall analysis | |
| serper_success_rate = len([r for r in self.serper_results if r["status"] == "success"]) / len(self.serper_results) * 100 if self.serper_results else 0 | |
| ncbi_success_rate = len([r for r in self.ncbi_results if r["status"] == "success"]) / len(self.ncbi_results) * 100 if self.ncbi_results else 0 | |
| print(f"\nπ Overall Success Rates:") | |
| print(f" Serper API: {serper_success_rate:.1f}%") | |
| print(f" NCBI API: {ncbi_success_rate:.1f}%") | |
| # Check for HTTP 429 errors | |
| serper_429 = len([r for r in self.serper_results if r["status"] == "error" and "429" in str(r.get("error", ""))]) | |
| ncbi_429 = len([r for r in self.ncbi_results if r["status"] == "error" and "429" in str(r.get("error", ""))]) | |
| if serper_429 == 0 and ncbi_429 == 0: | |
| print(f"\nπ SUCCESS: No HTTP 429 errors detected!") | |
| print(f" β Rate limiters are working correctly") | |
| print(f" β Ready for 150-user workshop") | |
| else: | |
| print(f"\nβ οΈ WARNING: HTTP 429 errors detected:") | |
| if serper_429 > 0: | |
| print(f" - Serper API: {serper_429} rate limit errors") | |
| if ncbi_429 > 0: | |
| print(f" - NCBI API: {ncbi_429} rate limit errors") | |
| print(f" β οΈ Rate limiters may need adjustment") | |
| async def main(): | |
| """Main test execution""" | |
| tester = RateLimiterTester() | |
| await tester.run_all_tests() | |
| if __name__ == "__main__": | |
| print("Starting rate limiter tests...") | |
| asyncio.run(main()) | |