IDAgentsFreshTest / scripts /test_rate_limiters.py
IDAgents Developer
Add rate limiter testing script and documentation
346e39a
raw
history blame
15.6 kB
"""
Test Rate Limiters on HF Space
================================
Tests both Serper and NCBI rate limiters with concurrent requests
to verify they work correctly and prevent HTTP 429 errors.
Usage:
python scripts/test_rate_limiters.py
"""
import asyncio
import time
import os
from datetime import datetime
# Import the rate-limited wrappers
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from core.utils.serper_rate_limited import rate_limited_serper_search
from core.utils.ncbi_rate_limited import rate_limited_pubmed_search
# Test queries
SERPER_TEST_QUERIES = [
"antibiotic resistance mechanisms",
"COVID-19 treatment guidelines",
"hospital acquired infections prevention",
"sepsis diagnosis criteria",
"antimicrobial stewardship programs"
]
NCBI_TEST_QUERIES = [
"antibiotic resistance",
"hospital infection control",
"sepsis management",
"antimicrobial stewardship",
"infectious disease epidemiology"
]
class RateLimiterTester:
"""Test rate limiters with concurrent requests"""
def __init__(self):
self.serper_results = []
self.ncbi_results = []
self.serper_api_key = os.getenv("SERPER_API_KEY")
self.ncbi_api_key = os.getenv("NCBI_API_KEY")
async def test_serper_single(self, query: str, request_id: int):
"""Test a single Serper API request"""
start_time = time.time()
try:
result = await rate_limited_serper_search(query, self.serper_api_key, num_results=3)
elapsed = time.time() - start_time
if result and "organic" in result:
num_results = len(result.get("organic", []))
self.serper_results.append({
"request_id": request_id,
"query": query,
"status": "success",
"elapsed": elapsed,
"num_results": num_results
})
print(f" βœ… Serper #{request_id}: {query[:40]}... ({elapsed:.2f}s, {num_results} results)")
else:
self.serper_results.append({
"request_id": request_id,
"query": query,
"status": "no_results",
"elapsed": elapsed
})
print(f" ⚠️ Serper #{request_id}: No results ({elapsed:.2f}s)")
except Exception as e:
elapsed = time.time() - start_time
self.serper_results.append({
"request_id": request_id,
"query": query,
"status": "error",
"elapsed": elapsed,
"error": str(e)
})
print(f" ❌ Serper #{request_id}: Error - {e} ({elapsed:.2f}s)")
async def test_ncbi_single(self, query: str, request_id: int):
"""Test a single NCBI API request"""
start_time = time.time()
try:
result = await rate_limited_pubmed_search(query, self.ncbi_api_key, max_results=5)
elapsed = time.time() - start_time
if result and "esearchresult" in result:
idlist = result["esearchresult"].get("idlist", [])
num_results = len(idlist)
self.ncbi_results.append({
"request_id": request_id,
"query": query,
"status": "success",
"elapsed": elapsed,
"num_results": num_results
})
print(f" βœ… NCBI #{request_id}: {query[:40]}... ({elapsed:.2f}s, {num_results} articles)")
else:
self.ncbi_results.append({
"request_id": request_id,
"query": query,
"status": "no_results",
"elapsed": elapsed
})
print(f" ⚠️ NCBI #{request_id}: No results ({elapsed:.2f}s)")
except Exception as e:
elapsed = time.time() - start_time
self.ncbi_results.append({
"request_id": request_id,
"query": query,
"status": "error",
"elapsed": elapsed,
"error": str(e)
})
print(f" ❌ NCBI #{request_id}: Error - {e} ({elapsed:.2f}s)")
async def test_serper_concurrent(self, num_requests: int = 10):
"""Test Serper API with concurrent requests"""
print(f"\n{'='*70}")
print(f"πŸ” Testing Serper API Rate Limiter ({num_requests} concurrent requests)")
print(f"{'='*70}")
if not self.serper_api_key:
print("❌ ERROR: SERPER_API_KEY not found in environment")
return
print(f"βœ… Serper API Key found: {self.serper_api_key[:10]}...")
print(f"⏳ Starting {num_requests} concurrent requests...\n")
start_time = time.time()
# Create tasks - use queries cyclically
tasks = []
for i in range(num_requests):
query = SERPER_TEST_QUERIES[i % len(SERPER_TEST_QUERIES)]
tasks.append(self.test_serper_single(query, i + 1))
# Execute all tasks concurrently
await asyncio.gather(*tasks)
total_time = time.time() - start_time
# Analyze results
print(f"\n{'='*70}")
print(f"πŸ“Š Serper API Test Results")
print(f"{'='*70}")
success = [r for r in self.serper_results if r["status"] == "success"]
no_results = [r for r in self.serper_results if r["status"] == "no_results"]
errors = [r for r in self.serper_results if r["status"] == "error"]
print(f"Total Requests: {num_requests}")
print(f"Successful: {len(success)} ({len(success)/num_requests*100:.1f}%)")
print(f"No Results: {len(no_results)} ({len(no_results)/num_requests*100:.1f}%)")
print(f"Errors: {len(errors)} ({len(errors)/num_requests*100:.1f}%)")
print(f"Total Time: {total_time:.2f}s")
print(f"Avg Throughput: {num_requests/total_time:.2f} req/s")
if success:
avg_time = sum(r["elapsed"] for r in success) / len(success)
min_time = min(r["elapsed"] for r in success)
max_time = max(r["elapsed"] for r in success)
print(f"\nResponse Times (successful):")
print(f" Average: {avg_time:.2f}s")
print(f" Min: {min_time:.2f}s")
print(f" Max: {max_time:.2f}s")
if errors:
print(f"\n⚠️ Errors found:")
for err in errors[:5]: # Show first 5 errors
print(f" - Request #{err['request_id']}: {err.get('error', 'Unknown')}")
# Check for HTTP 429 errors
http_429_errors = [e for e in errors if "429" in str(e.get("error", ""))]
if http_429_errors:
print(f"\n❌ CRITICAL: {len(http_429_errors)} HTTP 429 (Rate Limit) errors detected!")
print(f" Rate limiter may not be working correctly.")
else:
print(f"\nβœ… SUCCESS: No HTTP 429 errors - Rate limiter working!")
async def test_ncbi_concurrent(self, num_requests: int = 10):
"""Test NCBI API with concurrent requests"""
print(f"\n{'='*70}")
print(f"πŸ”¬ Testing NCBI API Rate Limiter ({num_requests} concurrent requests)")
print(f"{'='*70}")
if self.ncbi_api_key:
print(f"βœ… NCBI API Key found: {self.ncbi_api_key[:10]}...")
print(f" Rate limit: 10 req/s (using 8 req/s throttle)")
else:
print(f"⚠️ No NCBI API Key found - SKIPPING NCBI TESTS")
print(f" This is expected if running locally without API key")
print(f" NCBI tests will run on HF Space where key is configured")
return
print(f"⏳ Starting {num_requests} concurrent requests...\n")
start_time = time.time()
# Create tasks - use queries cyclically
tasks = []
for i in range(num_requests):
query = NCBI_TEST_QUERIES[i % len(NCBI_TEST_QUERIES)]
tasks.append(self.test_ncbi_single(query, i + 1))
# Execute all tasks concurrently
await asyncio.gather(*tasks)
total_time = time.time() - start_time
# Analyze results
print(f"\n{'='*70}")
print(f"πŸ“Š NCBI API Test Results")
print(f"{'='*70}")
success = [r for r in self.ncbi_results if r["status"] == "success"]
no_results = [r for r in self.ncbi_results if r["status"] == "no_results"]
errors = [r for r in self.ncbi_results if r["status"] == "error"]
print(f"Total Requests: {num_requests}")
print(f"Successful: {len(success)} ({len(success)/num_requests*100:.1f}%)")
print(f"No Results: {len(no_results)} ({len(no_results)/num_requests*100:.1f}%)")
print(f"Errors: {len(errors)} ({len(errors)/num_requests*100:.1f}%)")
print(f"Total Time: {total_time:.2f}s")
print(f"Avg Throughput: {num_requests/total_time:.2f} req/s")
if success:
avg_time = sum(r["elapsed"] for r in success) / len(success)
min_time = min(r["elapsed"] for r in success)
max_time = max(r["elapsed"] for r in success)
print(f"\nResponse Times (successful):")
print(f" Average: {avg_time:.2f}s")
print(f" Min: {min_time:.2f}s")
print(f" Max: {max_time:.2f}s")
if errors:
print(f"\n⚠️ Errors found:")
for err in errors[:5]: # Show first 5 errors
print(f" - Request #{err['request_id']}: {err.get('error', 'Unknown')}")
# Check for HTTP 429 errors
http_429_errors = [e for e in errors if "429" in str(e.get("error", ""))]
if http_429_errors:
print(f"\n❌ CRITICAL: {len(http_429_errors)} HTTP 429 (Rate Limit) errors detected!")
print(f" Rate limiter may not be working correctly.")
else:
print(f"\nβœ… SUCCESS: No HTTP 429 errors - Rate limiter working!")
async def test_cache_effectiveness(self):
"""Test cache by running same queries twice"""
print(f"\n{'='*70}")
print(f"πŸ’Ύ Testing Cache Effectiveness")
print(f"{'='*70}")
if not self.serper_api_key:
print("⚠️ Serper API key not found - skipping cache test")
return
test_query = "antibiotic resistance mechanisms"
# First request (should hit API)
print(f"\n1️⃣ First request (should hit API):")
start1 = time.time()
result1 = await rate_limited_serper_search(test_query, self.serper_api_key, num_results=3)
time1 = time.time() - start1
print(f" Time: {time1:.3f}s")
# Wait a moment
await asyncio.sleep(0.5)
# Second request (should hit cache)
print(f"\n2️⃣ Second request (should hit cache):")
start2 = time.time()
result2 = await rate_limited_serper_search(test_query, self.serper_api_key, num_results=3)
time2 = time.time() - start2
print(f" Time: {time2:.3f}s")
# Analysis
print(f"\nπŸ“Š Cache Analysis:")
if time2 < time1 * 0.3: # Second request should be <30% of first
print(f" βœ… Cache HIT detected! (2nd request {time2/time1*100:.1f}% of 1st)")
print(f" Speedup: {time1/time2:.1f}x faster")
else:
print(f" ⚠️ Cache may not be working (2nd: {time2:.3f}s vs 1st: {time1:.3f}s)")
# NCBI cache test (only if API key available)
if self.ncbi_api_key:
print(f"\n3️⃣ Testing NCBI cache:")
ncbi_query = "sepsis diagnosis"
start3 = time.time()
result3 = await rate_limited_pubmed_search(ncbi_query, self.ncbi_api_key, max_results=5)
time3 = time.time() - start3
print(f" First request: {time3:.3f}s")
await asyncio.sleep(0.5)
start4 = time.time()
result4 = await rate_limited_pubmed_search(ncbi_query, self.ncbi_api_key, max_results=5)
time4 = time.time() - start4
print(f" Second request: {time4:.3f}s")
if time4 < time3 * 0.3:
print(f" βœ… Cache HIT detected! (2nd request {time4/time3*100:.1f}% of 1st)")
print(f" Speedup: {time3/time4:.1f}x faster")
else:
print(f" ⚠️ Cache may not be working (2nd: {time4:.3f}s vs 1st: {time3:.3f}s)")
else:
print(f"\n3️⃣ NCBI cache test skipped (no API key)")
async def run_all_tests(self):
"""Run all rate limiter tests"""
print(f"\n{'='*70}")
print(f"πŸš€ IDWeek Agents - Rate Limiter Test Suite")
print(f"{'='*70}")
print(f"Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Environment: {'Production' if 'SPACE_ID' in os.environ else 'Local'}")
# Test 1: Serper concurrent requests
await self.test_serper_concurrent(num_requests=15)
# Wait between tests
await asyncio.sleep(2)
# Test 2: NCBI concurrent requests
await self.test_ncbi_concurrent(num_requests=15)
# Wait between tests
await asyncio.sleep(2)
# Test 3: Cache effectiveness
await self.test_cache_effectiveness()
# Final summary
print(f"\n{'='*70}")
print(f"βœ… All Tests Complete!")
print(f"{'='*70}")
# Overall analysis
serper_success_rate = len([r for r in self.serper_results if r["status"] == "success"]) / len(self.serper_results) * 100 if self.serper_results else 0
ncbi_success_rate = len([r for r in self.ncbi_results if r["status"] == "success"]) / len(self.ncbi_results) * 100 if self.ncbi_results else 0
print(f"\nπŸ“Š Overall Success Rates:")
print(f" Serper API: {serper_success_rate:.1f}%")
print(f" NCBI API: {ncbi_success_rate:.1f}%")
# Check for HTTP 429 errors
serper_429 = len([r for r in self.serper_results if r["status"] == "error" and "429" in str(r.get("error", ""))])
ncbi_429 = len([r for r in self.ncbi_results if r["status"] == "error" and "429" in str(r.get("error", ""))])
if serper_429 == 0 and ncbi_429 == 0:
print(f"\nπŸŽ‰ SUCCESS: No HTTP 429 errors detected!")
print(f" βœ… Rate limiters are working correctly")
print(f" βœ… Ready for 150-user workshop")
else:
print(f"\n⚠️ WARNING: HTTP 429 errors detected:")
if serper_429 > 0:
print(f" - Serper API: {serper_429} rate limit errors")
if ncbi_429 > 0:
print(f" - NCBI API: {ncbi_429} rate limit errors")
print(f" ⚠️ Rate limiters may need adjustment")
async def main():
"""Main test execution"""
tester = RateLimiterTester()
await tester.run_all_tests()
if __name__ == "__main__":
print("Starting rate limiter tests...")
asyncio.run(main())