""" NCBI PubMed API Load Test - Validate 150 Concurrent Users ========================================================== Tests if NCBI Entrez API (PubMed) can handle 150 concurrent requests from your IDWeek Agents workshop. NCBI API Rate Limits (without API key): - 3 requests per second - Blocks if exceeded NCBI API Rate Limits (with API key): - 10 requests per second - More lenient enforcement Usage: python scripts/load_test_ncbi_api.py --users 150 --duration 60 """ import asyncio import aiohttp import time import statistics import argparse import os from dataclasses import dataclass, field from typing import List, Dict import random from xml.etree import ElementTree as ET @dataclass class RequestMetrics: """Metrics for a single NCBI API request.""" user_id: int duration_ms: float status_code: int success: bool error: str = "" results_count: int = 0 @dataclass class LoadTestResults: """Aggregated load test results.""" total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 response_times: List[float] = field(default_factory=list) errors: Dict[str, int] = field(default_factory=dict) status_codes: Dict[int, int] = field(default_factory=dict) rate_limit_errors: int = 0 def add_metric(self, metric: RequestMetrics): self.total_requests += 1 if metric.success: self.successful_requests += 1 self.response_times.append(metric.duration_ms) else: self.failed_requests += 1 self.errors[metric.error] = self.errors.get(metric.error, 0) + 1 if metric.status_code == 429 or "rate" in metric.error.lower(): self.rate_limit_errors += 1 self.status_codes[metric.status_code] = self.status_codes.get(metric.status_code, 0) + 1 def print_summary(self, duration_sec: float, total_users: int): print("\n" + "=" * 80) print("NCBI PUBMED API LOAD TEST RESULTS") print("=" * 80) print(f"Test Configuration:") print(f" Duration: {duration_sec:.1f}s") print(f" Concurrent Users: {total_users}") print(f" Total Requests: {self.total_requests}") print(f" Throughput: {self.total_requests/duration_sec:.2f} req/s") print(f"\nSuccess Metrics:") success_rate = self.successful_requests/self.total_requests*100 if self.total_requests > 0 else 0 print(f" Successful: {self.successful_requests} ({success_rate:.1f}%)") print(f" Failed: {self.failed_requests} ({100-success_rate:.1f}%)") if self.rate_limit_errors > 0: print(f" ⚠️ Rate Limit Errors: {self.rate_limit_errors} ({self.rate_limit_errors/self.total_requests*100:.1f}%)") if self.response_times: print(f"\nResponse Times (ms):") print(f" p50 (Median): {statistics.median(self.response_times):.0f} ms") print(f" p75: {statistics.quantiles(self.response_times, n=4)[2]:.0f} ms") print(f" p95: {statistics.quantiles(self.response_times, n=20)[18]:.0f} ms") print(f" p99: {statistics.quantiles(self.response_times, n=100)[98]:.0f} ms") print(f" Max: {max(self.response_times):.0f} ms") print(f" Min: {min(self.response_times):.0f} ms") print(f" Average: {sum(self.response_times)/len(self.response_times):.0f} ms") if self.status_codes: print(f"\nHTTP Status Codes:") for code, count in sorted(self.status_codes.items()): emoji = "✅" if code == 200 else ("⚠️" if code == 429 else "❌") print(f" {emoji} {code}: {count}") if self.errors: print(f"\nErrors:") for err, count in sorted(self.errors.items(), key=lambda x: -x[1])[:10]: print(f" {err[:80]}: {count}") # Performance Assessment print(f"\n{'=' * 80}") print("PERFORMANCE ASSESSMENT FOR 150-USER WORKSHOP:") print(f"{'=' * 80}") if success_rate >= 95: print("✅ SUCCESS RATE: EXCELLENT (≥95%)") print(" → NCBI PubMed API can handle workshop load") elif success_rate >= 90: print("⚠️ SUCCESS RATE: ACCEPTABLE (90-95%)") print(" → May see occasional failures during peak usage") elif success_rate >= 80: print("⚠️ SUCCESS RATE: MARGINAL (80-90%)") print(" → Consider rate limiting or caching") else: print("❌ SUCCESS RATE: INSUFFICIENT (<80%)") print(" → NCBI PubMed API cannot handle 150 concurrent users") if self.rate_limit_errors > 0: rate_limit_pct = self.rate_limit_errors/self.total_requests*100 if rate_limit_pct > 10: print(f"\n⚠️ HIGH RATE LIMITING: {rate_limit_pct:.1f}% of requests") print(" Recommendations:") print(" 1. Use NCBI API key (increases limit from 3 to 10 req/s)") print(" 2. Implement request queuing/throttling") print(" 3. Cache PubMed results (TTL: 24 hours)") print(" 4. Add retry logic with exponential backoff") if self.response_times: p95 = statistics.quantiles(self.response_times, n=20)[18] if p95 < 1000: print("\n✅ RESPONSE TIME: EXCELLENT (p95 < 1s)") print(" → Fast PubMed lookups for workshop users") elif p95 < 3000: print("\n⚠️ RESPONSE TIME: ACCEPTABLE (p95 < 3s)") print(" → Reasonable search latency") else: print("\n❌ RESPONSE TIME: SLOW (p95 > 3s)") print(" → May impact user experience") print("=" * 80) # PubMed search queries simulating real agent usage PUBMED_QUERIES = [ "MRSA treatment guidelines", "sepsis management protocol", "antibiotic resistance mechanisms", "C difficile infection therapy", "bloodstream infection empiric therapy", "pneumonia antibiotic duration", "urinary tract infection resistance", "surgical site infection prevention", "vancomycin dosing guidelines", "carbapenem resistant enterobacteriaceae", "infectious diseases clinical trials", "antimicrobial stewardship interventions", "hospital acquired pneumonia treatment", "neutropenic fever management", "endocarditis antibiotic therapy" ] async def test_ncbi_api_request(session: aiohttp.ClientSession, user_id: int, query: str, api_key: str = None) -> RequestMetrics: """Make a single NCBI Entrez (PubMed) API request.""" # Step 1: Search PubMed for query base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi" params = { "db": "pubmed", "term": query, "retmax": 10, "retmode": "json" } if api_key: params["api_key"] = api_key start = time.time() try: async with session.get(base_url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as response: duration_ms = (time.time() - start) * 1000 status = response.status if status == 200: data = await response.json() result_count = int(data.get("esearchresult", {}).get("count", 0)) return RequestMetrics( user_id=user_id, duration_ms=duration_ms, status_code=status, success=True, results_count=result_count ) elif status == 429: return RequestMetrics( user_id=user_id, duration_ms=duration_ms, status_code=status, success=False, error="HTTP_429_Rate_Limit" ) else: error_text = await response.text() return RequestMetrics( user_id=user_id, duration_ms=duration_ms, status_code=status, success=False, error=f"HTTP_{status}" ) except asyncio.TimeoutError: duration_ms = (time.time() - start) * 1000 return RequestMetrics( user_id=user_id, duration_ms=duration_ms, status_code=0, success=False, error="TimeoutError" ) except Exception as e: duration_ms = (time.time() - start) * 1000 return RequestMetrics( user_id=user_id, duration_ms=duration_ms, status_code=0, success=False, error=type(e).__name__ ) async def simulate_user(user_id: int, api_key: str, results: LoadTestResults, duration_sec: int): """Simulate a single user making PubMed searches.""" async with aiohttp.ClientSession() as session: end_time = time.time() + duration_sec request_count = 0 while time.time() < end_time: # Pick a random PubMed query query = random.choice(PUBMED_QUERIES) # Make request metric = await test_ncbi_api_request(session, user_id, query, api_key) results.add_metric(metric) request_count += 1 # Random delay between requests (5-10 seconds, simulating user reading results) await asyncio.sleep(random.uniform(5.0, 10.0)) status = "✓" if request_count > 0 else "✗" print(f"{status} User {user_id:3d} completed {request_count} PubMed searches") async def run_load_test(num_users: int, duration_sec: int, api_key: str = None): """Run the NCBI PubMed API load test.""" print(f"\n{'=' * 80}") print("NCBI PUBMED API LOAD TEST - 150 USER WORKSHOP VALIDATION") print(f"{'=' * 80}") print(f"Concurrent Users: {num_users}") print(f"Test Duration: {duration_sec} seconds") print(f"Expected Requests: ~{num_users * (duration_sec / 7)} (avg 1 search per 7s per user)") if api_key: print(f"\n✅ Using NCBI API Key (Rate Limit: 10 req/s)") else: print(f"\n⚠️ No API Key (Rate Limit: 3 req/s) - Consider using API key for better performance") print(f"\nNCBI API Rate Limits:") print(f" • Without API key: 3 requests/second") print(f" • With API key: 10 requests/second") print(f" • This test will generate ~{num_users * (duration_sec / 7) / duration_sec:.1f} req/s average") print(f"{'=' * 80}\n") results = LoadTestResults() start_time = time.time() # Launch all user simulations concurrently tasks = [ simulate_user(i+1, api_key, results, duration_sec) for i in range(num_users) ] await asyncio.gather(*tasks) actual_duration = time.time() - start_time results.print_summary(actual_duration, num_users) # Recommendations print("\n" + "=" * 80) print("WORKSHOP RECOMMENDATIONS:") print("=" * 80) success_rate = results.successful_requests/results.total_requests*100 if results.total_requests > 0 else 0 if success_rate >= 95 and results.rate_limit_errors == 0: print("✅ NCBI PubMed API is ready for your 150-user workshop!") print("\nOptional optimizations:") print(" • Cache PubMed results for 24 hours to reduce API calls") print(" • Register for NCBI API key if not already done") print(" • Monitor usage during workshop") elif success_rate >= 90: print("⚠️ NCBI PubMed API can handle workshop but consider optimizations:") print("\n 1. Get NCBI API key (increases limit from 3 to 10 req/s)") print(" 2. Cache PubMed results (reduces duplicate queries)") print(" 3. Add retry logic for failed requests") print(" 4. Implement rate limiting (max 8 req/s with API key)") else: print("❌ NCBI PubMed API may struggle with 150 concurrent users:") print("\n REQUIRED Actions:") print(" 1. Get NCBI API key (https://ncbiinsights.ncbi.nlm.nih.gov/2017/11/02/new-api-keys-for-the-e-utilities/)") print(" 2. Implement request queuing (max 8 req/s)") print(" 3. Cache all PubMed results (TTL: 24 hours)") print(" 4. Add retry logic with exponential backoff") # API key registration if not api_key: print("\n" + "=" * 80) print("💡 HOW TO GET NCBI API KEY:") print("=" * 80) print("1. Visit: https://www.ncbi.nlm.nih.gov/account/") print("2. Sign in or create NCBI account") print("3. Go to Settings → API Key Management") print("4. Create new API key") print("5. Add to environment: export NCBI_API_KEY=your_key_here") print("=" * 80) print("\n") def main(): parser = argparse.ArgumentParser(description="Load test NCBI PubMed API for workshop") parser.add_argument("--users", type=int, default=150, help="Number of concurrent users (default: 150)") parser.add_argument("--duration", type=int, default=60, help="Test duration in seconds (default: 60)") parser.add_argument("--api-key", type=str, help="NCBI API key (or set NCBI_API_KEY env var)") args = parser.parse_args() # Get API key api_key = args.api_key or os.getenv("NCBI_API_KEY") print("\n🔬 Starting NCBI PubMed API load test...") print("⏱️ This will take approximately", args.duration, "seconds") if not api_key: print("⚠️ No API key detected - running with 3 req/s limit") print("💡 For better results, get API key: https://www.ncbi.nlm.nih.gov/account/\n") else: print("✅ Using API key - 10 req/s limit\n") asyncio.run(run_load_test(args.users, args.duration, api_key)) if __name__ == "__main__": main()