|
|
""" |
|
|
Rate Limit Tracking Module |
|
|
Manages rate limits per provider with in-memory tracking |
|
|
""" |
|
|
|
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, Optional, Tuple |
|
|
from threading import Lock |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger("rate_limiter") |
|
|
|
|
|
|
|
|
class RateLimiter: |
|
|
""" |
|
|
Rate limiter with per-provider tracking |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize rate limiter""" |
|
|
self.limits: Dict[str, Dict] = {} |
|
|
self.lock = Lock() |
|
|
|
|
|
def configure_limit( |
|
|
self, |
|
|
provider: str, |
|
|
limit_type: str, |
|
|
limit_value: int |
|
|
): |
|
|
""" |
|
|
Configure rate limit for a provider |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
limit_type: Type of limit (per_minute, per_hour, per_day, per_second) |
|
|
limit_value: Maximum requests allowed |
|
|
""" |
|
|
with self.lock: |
|
|
|
|
|
now = datetime.now() |
|
|
if limit_type == "per_second": |
|
|
reset_time = now + timedelta(seconds=1) |
|
|
elif limit_type == "per_minute": |
|
|
reset_time = now + timedelta(minutes=1) |
|
|
elif limit_type == "per_hour": |
|
|
reset_time = now + timedelta(hours=1) |
|
|
elif limit_type == "per_day": |
|
|
reset_time = now + timedelta(days=1) |
|
|
else: |
|
|
logger.warning(f"Unknown limit type {limit_type} for {provider}") |
|
|
reset_time = now + timedelta(minutes=1) |
|
|
|
|
|
self.limits[provider] = { |
|
|
"limit_type": limit_type, |
|
|
"limit_value": limit_value, |
|
|
"current_usage": 0, |
|
|
"reset_time": reset_time, |
|
|
"last_request_time": None |
|
|
} |
|
|
|
|
|
logger.info(f"Configured rate limit for {provider}: {limit_value} {limit_type}") |
|
|
|
|
|
def can_make_request(self, provider: str) -> Tuple[bool, Optional[str]]: |
|
|
""" |
|
|
Check if request can be made without exceeding rate limit |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
|
|
|
Returns: |
|
|
Tuple of (can_proceed, reason_if_blocked) |
|
|
""" |
|
|
with self.lock: |
|
|
if provider not in self.limits: |
|
|
|
|
|
return True, None |
|
|
|
|
|
limit_info = self.limits[provider] |
|
|
now = datetime.now() |
|
|
|
|
|
|
|
|
if now >= limit_info["reset_time"]: |
|
|
self._reset_limit(provider) |
|
|
limit_info = self.limits[provider] |
|
|
|
|
|
|
|
|
if limit_info["current_usage"] < limit_info["limit_value"]: |
|
|
return True, None |
|
|
else: |
|
|
seconds_until_reset = (limit_info["reset_time"] - now).total_seconds() |
|
|
return False, f"Rate limit reached. Reset in {int(seconds_until_reset)}s" |
|
|
|
|
|
def record_request(self, provider: str): |
|
|
""" |
|
|
Record a request against the rate limit |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
""" |
|
|
with self.lock: |
|
|
if provider not in self.limits: |
|
|
logger.warning(f"Recording request for unconfigured provider: {provider}") |
|
|
return |
|
|
|
|
|
limit_info = self.limits[provider] |
|
|
now = datetime.now() |
|
|
|
|
|
|
|
|
if now >= limit_info["reset_time"]: |
|
|
self._reset_limit(provider) |
|
|
limit_info = self.limits[provider] |
|
|
|
|
|
|
|
|
limit_info["current_usage"] += 1 |
|
|
limit_info["last_request_time"] = now |
|
|
|
|
|
|
|
|
percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 |
|
|
if percentage >= 80: |
|
|
logger.warning( |
|
|
f"Rate limit warning for {provider}: {percentage:.1f}% used " |
|
|
f"({limit_info['current_usage']}/{limit_info['limit_value']})" |
|
|
) |
|
|
|
|
|
def _reset_limit(self, provider: str): |
|
|
""" |
|
|
Reset rate limit counter |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
""" |
|
|
if provider not in self.limits: |
|
|
return |
|
|
|
|
|
limit_info = self.limits[provider] |
|
|
limit_type = limit_info["limit_type"] |
|
|
now = datetime.now() |
|
|
|
|
|
|
|
|
if limit_type == "per_second": |
|
|
reset_time = now + timedelta(seconds=1) |
|
|
elif limit_type == "per_minute": |
|
|
reset_time = now + timedelta(minutes=1) |
|
|
elif limit_type == "per_hour": |
|
|
reset_time = now + timedelta(hours=1) |
|
|
elif limit_type == "per_day": |
|
|
reset_time = now + timedelta(days=1) |
|
|
else: |
|
|
reset_time = now + timedelta(minutes=1) |
|
|
|
|
|
limit_info["current_usage"] = 0 |
|
|
limit_info["reset_time"] = reset_time |
|
|
|
|
|
logger.debug(f"Reset rate limit for {provider}. Next reset: {reset_time}") |
|
|
|
|
|
def get_status(self, provider: str) -> Optional[Dict]: |
|
|
""" |
|
|
Get current rate limit status for provider |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
|
|
|
Returns: |
|
|
Dict with limit info or None if not configured |
|
|
""" |
|
|
with self.lock: |
|
|
if provider not in self.limits: |
|
|
return None |
|
|
|
|
|
limit_info = self.limits[provider] |
|
|
now = datetime.now() |
|
|
|
|
|
|
|
|
if now >= limit_info["reset_time"]: |
|
|
self._reset_limit(provider) |
|
|
limit_info = self.limits[provider] |
|
|
|
|
|
percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 if limit_info["limit_value"] > 0 else 0 |
|
|
seconds_until_reset = max(0, (limit_info["reset_time"] - now).total_seconds()) |
|
|
|
|
|
status = "ok" |
|
|
if percentage >= 100: |
|
|
status = "blocked" |
|
|
elif percentage >= 80: |
|
|
status = "warning" |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"limit_type": limit_info["limit_type"], |
|
|
"limit_value": limit_info["limit_value"], |
|
|
"current_usage": limit_info["current_usage"], |
|
|
"percentage": round(percentage, 1), |
|
|
"reset_time": limit_info["reset_time"].isoformat(), |
|
|
"reset_in_seconds": int(seconds_until_reset), |
|
|
"status": status, |
|
|
"last_request_time": limit_info["last_request_time"].isoformat() if limit_info["last_request_time"] else None |
|
|
} |
|
|
|
|
|
def get_all_statuses(self) -> Dict[str, Dict]: |
|
|
""" |
|
|
Get rate limit status for all providers |
|
|
|
|
|
Returns: |
|
|
Dict mapping provider names to their rate limit status |
|
|
""" |
|
|
with self.lock: |
|
|
return { |
|
|
provider: self.get_status(provider) |
|
|
for provider in self.limits.keys() |
|
|
} |
|
|
|
|
|
def remove_limit(self, provider: str): |
|
|
""" |
|
|
Remove rate limit configuration for provider |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
""" |
|
|
with self.lock: |
|
|
if provider in self.limits: |
|
|
del self.limits[provider] |
|
|
logger.info(f"Removed rate limit for {provider}") |
|
|
|
|
|
|
|
|
|
|
|
rate_limiter = RateLimiter() |
|
|
|