IDAgentsFreshTest / scripts /load_test_ncbi_api.py
IDAgents Developer
Add API load testing suite and rate limiters for workshop readiness
13537fe
raw
history blame
14.2 kB
"""
NCBI PubMed API Load Test - Validate 150 Concurrent Users
==========================================================
Tests if NCBI Entrez API (PubMed) can handle 150 concurrent requests
from your IDWeek Agents workshop.
NCBI API Rate Limits (without API key):
- 3 requests per second
- Blocks if exceeded
NCBI API Rate Limits (with API key):
- 10 requests per second
- More lenient enforcement
Usage:
python scripts/load_test_ncbi_api.py --users 150 --duration 60
"""
import asyncio
import aiohttp
import time
import statistics
import argparse
import os
from dataclasses import dataclass, field
from typing import List, Dict
import random
from xml.etree import ElementTree as ET
@dataclass
class RequestMetrics:
"""Metrics for a single NCBI API request."""
user_id: int
duration_ms: float
status_code: int
success: bool
error: str = ""
results_count: int = 0
@dataclass
class LoadTestResults:
"""Aggregated load test results."""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
response_times: List[float] = field(default_factory=list)
errors: Dict[str, int] = field(default_factory=dict)
status_codes: Dict[int, int] = field(default_factory=dict)
rate_limit_errors: int = 0
def add_metric(self, metric: RequestMetrics):
self.total_requests += 1
if metric.success:
self.successful_requests += 1
self.response_times.append(metric.duration_ms)
else:
self.failed_requests += 1
self.errors[metric.error] = self.errors.get(metric.error, 0) + 1
if metric.status_code == 429 or "rate" in metric.error.lower():
self.rate_limit_errors += 1
self.status_codes[metric.status_code] = self.status_codes.get(metric.status_code, 0) + 1
def print_summary(self, duration_sec: float, total_users: int):
print("\n" + "=" * 80)
print("NCBI PUBMED API LOAD TEST RESULTS")
print("=" * 80)
print(f"Test Configuration:")
print(f" Duration: {duration_sec:.1f}s")
print(f" Concurrent Users: {total_users}")
print(f" Total Requests: {self.total_requests}")
print(f" Throughput: {self.total_requests/duration_sec:.2f} req/s")
print(f"\nSuccess Metrics:")
success_rate = self.successful_requests/self.total_requests*100 if self.total_requests > 0 else 0
print(f" Successful: {self.successful_requests} ({success_rate:.1f}%)")
print(f" Failed: {self.failed_requests} ({100-success_rate:.1f}%)")
if self.rate_limit_errors > 0:
print(f" ⚠️ Rate Limit Errors: {self.rate_limit_errors} ({self.rate_limit_errors/self.total_requests*100:.1f}%)")
if self.response_times:
print(f"\nResponse Times (ms):")
print(f" p50 (Median): {statistics.median(self.response_times):.0f} ms")
print(f" p75: {statistics.quantiles(self.response_times, n=4)[2]:.0f} ms")
print(f" p95: {statistics.quantiles(self.response_times, n=20)[18]:.0f} ms")
print(f" p99: {statistics.quantiles(self.response_times, n=100)[98]:.0f} ms")
print(f" Max: {max(self.response_times):.0f} ms")
print(f" Min: {min(self.response_times):.0f} ms")
print(f" Average: {sum(self.response_times)/len(self.response_times):.0f} ms")
if self.status_codes:
print(f"\nHTTP Status Codes:")
for code, count in sorted(self.status_codes.items()):
emoji = "✅" if code == 200 else ("⚠️" if code == 429 else "❌")
print(f" {emoji} {code}: {count}")
if self.errors:
print(f"\nErrors:")
for err, count in sorted(self.errors.items(), key=lambda x: -x[1])[:10]:
print(f" {err[:80]}: {count}")
# Performance Assessment
print(f"\n{'=' * 80}")
print("PERFORMANCE ASSESSMENT FOR 150-USER WORKSHOP:")
print(f"{'=' * 80}")
if success_rate >= 95:
print("✅ SUCCESS RATE: EXCELLENT (≥95%)")
print(" → NCBI PubMed API can handle workshop load")
elif success_rate >= 90:
print("⚠️ SUCCESS RATE: ACCEPTABLE (90-95%)")
print(" → May see occasional failures during peak usage")
elif success_rate >= 80:
print("⚠️ SUCCESS RATE: MARGINAL (80-90%)")
print(" → Consider rate limiting or caching")
else:
print("❌ SUCCESS RATE: INSUFFICIENT (<80%)")
print(" → NCBI PubMed API cannot handle 150 concurrent users")
if self.rate_limit_errors > 0:
rate_limit_pct = self.rate_limit_errors/self.total_requests*100
if rate_limit_pct > 10:
print(f"\n⚠️ HIGH RATE LIMITING: {rate_limit_pct:.1f}% of requests")
print(" Recommendations:")
print(" 1. Use NCBI API key (increases limit from 3 to 10 req/s)")
print(" 2. Implement request queuing/throttling")
print(" 3. Cache PubMed results (TTL: 24 hours)")
print(" 4. Add retry logic with exponential backoff")
if self.response_times:
p95 = statistics.quantiles(self.response_times, n=20)[18]
if p95 < 1000:
print("\n✅ RESPONSE TIME: EXCELLENT (p95 < 1s)")
print(" → Fast PubMed lookups for workshop users")
elif p95 < 3000:
print("\n⚠️ RESPONSE TIME: ACCEPTABLE (p95 < 3s)")
print(" → Reasonable search latency")
else:
print("\n❌ RESPONSE TIME: SLOW (p95 > 3s)")
print(" → May impact user experience")
print("=" * 80)
# PubMed search queries simulating real agent usage
PUBMED_QUERIES = [
"MRSA treatment guidelines",
"sepsis management protocol",
"antibiotic resistance mechanisms",
"C difficile infection therapy",
"bloodstream infection empiric therapy",
"pneumonia antibiotic duration",
"urinary tract infection resistance",
"surgical site infection prevention",
"vancomycin dosing guidelines",
"carbapenem resistant enterobacteriaceae",
"infectious diseases clinical trials",
"antimicrobial stewardship interventions",
"hospital acquired pneumonia treatment",
"neutropenic fever management",
"endocarditis antibiotic therapy"
]
async def test_ncbi_api_request(session: aiohttp.ClientSession, user_id: int, query: str, api_key: str = None) -> RequestMetrics:
"""Make a single NCBI Entrez (PubMed) API request."""
# Step 1: Search PubMed for query
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
params = {
"db": "pubmed",
"term": query,
"retmax": 10,
"retmode": "json"
}
if api_key:
params["api_key"] = api_key
start = time.time()
try:
async with session.get(base_url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as response:
duration_ms = (time.time() - start) * 1000
status = response.status
if status == 200:
data = await response.json()
result_count = int(data.get("esearchresult", {}).get("count", 0))
return RequestMetrics(
user_id=user_id,
duration_ms=duration_ms,
status_code=status,
success=True,
results_count=result_count
)
elif status == 429:
return RequestMetrics(
user_id=user_id,
duration_ms=duration_ms,
status_code=status,
success=False,
error="HTTP_429_Rate_Limit"
)
else:
error_text = await response.text()
return RequestMetrics(
user_id=user_id,
duration_ms=duration_ms,
status_code=status,
success=False,
error=f"HTTP_{status}"
)
except asyncio.TimeoutError:
duration_ms = (time.time() - start) * 1000
return RequestMetrics(
user_id=user_id,
duration_ms=duration_ms,
status_code=0,
success=False,
error="TimeoutError"
)
except Exception as e:
duration_ms = (time.time() - start) * 1000
return RequestMetrics(
user_id=user_id,
duration_ms=duration_ms,
status_code=0,
success=False,
error=type(e).__name__
)
async def simulate_user(user_id: int, api_key: str, results: LoadTestResults, duration_sec: int):
"""Simulate a single user making PubMed searches."""
async with aiohttp.ClientSession() as session:
end_time = time.time() + duration_sec
request_count = 0
while time.time() < end_time:
# Pick a random PubMed query
query = random.choice(PUBMED_QUERIES)
# Make request
metric = await test_ncbi_api_request(session, user_id, query, api_key)
results.add_metric(metric)
request_count += 1
# Random delay between requests (5-10 seconds, simulating user reading results)
await asyncio.sleep(random.uniform(5.0, 10.0))
status = "✓" if request_count > 0 else "✗"
print(f"{status} User {user_id:3d} completed {request_count} PubMed searches")
async def run_load_test(num_users: int, duration_sec: int, api_key: str = None):
"""Run the NCBI PubMed API load test."""
print(f"\n{'=' * 80}")
print("NCBI PUBMED API LOAD TEST - 150 USER WORKSHOP VALIDATION")
print(f"{'=' * 80}")
print(f"Concurrent Users: {num_users}")
print(f"Test Duration: {duration_sec} seconds")
print(f"Expected Requests: ~{num_users * (duration_sec / 7)} (avg 1 search per 7s per user)")
if api_key:
print(f"\n✅ Using NCBI API Key (Rate Limit: 10 req/s)")
else:
print(f"\n⚠️ No API Key (Rate Limit: 3 req/s) - Consider using API key for better performance")
print(f"\nNCBI API Rate Limits:")
print(f" • Without API key: 3 requests/second")
print(f" • With API key: 10 requests/second")
print(f" • This test will generate ~{num_users * (duration_sec / 7) / duration_sec:.1f} req/s average")
print(f"{'=' * 80}\n")
results = LoadTestResults()
start_time = time.time()
# Launch all user simulations concurrently
tasks = [
simulate_user(i+1, api_key, results, duration_sec)
for i in range(num_users)
]
await asyncio.gather(*tasks)
actual_duration = time.time() - start_time
results.print_summary(actual_duration, num_users)
# Recommendations
print("\n" + "=" * 80)
print("WORKSHOP RECOMMENDATIONS:")
print("=" * 80)
success_rate = results.successful_requests/results.total_requests*100 if results.total_requests > 0 else 0
if success_rate >= 95 and results.rate_limit_errors == 0:
print("✅ NCBI PubMed API is ready for your 150-user workshop!")
print("\nOptional optimizations:")
print(" • Cache PubMed results for 24 hours to reduce API calls")
print(" • Register for NCBI API key if not already done")
print(" • Monitor usage during workshop")
elif success_rate >= 90:
print("⚠️ NCBI PubMed API can handle workshop but consider optimizations:")
print("\n 1. Get NCBI API key (increases limit from 3 to 10 req/s)")
print(" 2. Cache PubMed results (reduces duplicate queries)")
print(" 3. Add retry logic for failed requests")
print(" 4. Implement rate limiting (max 8 req/s with API key)")
else:
print("❌ NCBI PubMed API may struggle with 150 concurrent users:")
print("\n REQUIRED Actions:")
print(" 1. Get NCBI API key (https://ncbiinsights.ncbi.nlm.nih.gov/2017/11/02/new-api-keys-for-the-e-utilities/)")
print(" 2. Implement request queuing (max 8 req/s)")
print(" 3. Cache all PubMed results (TTL: 24 hours)")
print(" 4. Add retry logic with exponential backoff")
# API key registration
if not api_key:
print("\n" + "=" * 80)
print("💡 HOW TO GET NCBI API KEY:")
print("=" * 80)
print("1. Visit: https://www.ncbi.nlm.nih.gov/account/")
print("2. Sign in or create NCBI account")
print("3. Go to Settings → API Key Management")
print("4. Create new API key")
print("5. Add to environment: export NCBI_API_KEY=your_key_here")
print("=" * 80)
print("\n")
def main():
parser = argparse.ArgumentParser(description="Load test NCBI PubMed API for workshop")
parser.add_argument("--users", type=int, default=150, help="Number of concurrent users (default: 150)")
parser.add_argument("--duration", type=int, default=60, help="Test duration in seconds (default: 60)")
parser.add_argument("--api-key", type=str, help="NCBI API key (or set NCBI_API_KEY env var)")
args = parser.parse_args()
# Get API key
api_key = args.api_key or os.getenv("NCBI_API_KEY")
print("\n🔬 Starting NCBI PubMed API load test...")
print("⏱️ This will take approximately", args.duration, "seconds")
if not api_key:
print("⚠️ No API key detected - running with 3 req/s limit")
print("💡 For better results, get API key: https://www.ncbi.nlm.nih.gov/account/\n")
else:
print("✅ Using API key - 10 req/s limit\n")
asyncio.run(run_load_test(args.users, args.duration, api_key))
if __name__ == "__main__":
main()