""" Rate Limit Tracking Module Manages rate limits per provider with in-memory tracking """ import time from datetime import datetime, timedelta from typing import Dict, Optional, Tuple from threading import Lock from utils.logger import setup_logger logger = setup_logger("rate_limiter") class RateLimiter: """ Rate limiter with per-provider tracking """ def __init__(self): """Initialize rate limiter""" self.limits: Dict[str, Dict] = {} self.lock = Lock() def configure_limit( self, provider: str, limit_type: str, limit_value: int ): """ Configure rate limit for a provider Args: provider: Provider name limit_type: Type of limit (per_minute, per_hour, per_day, per_second) limit_value: Maximum requests allowed """ with self.lock: # Calculate reset time based on limit type now = datetime.now() if limit_type == "per_second": reset_time = now + timedelta(seconds=1) elif limit_type == "per_minute": reset_time = now + timedelta(minutes=1) elif limit_type == "per_hour": reset_time = now + timedelta(hours=1) elif limit_type == "per_day": reset_time = now + timedelta(days=1) else: logger.warning(f"Unknown limit type {limit_type} for {provider}") reset_time = now + timedelta(minutes=1) self.limits[provider] = { "limit_type": limit_type, "limit_value": limit_value, "current_usage": 0, "reset_time": reset_time, "last_request_time": None } logger.info(f"Configured rate limit for {provider}: {limit_value} {limit_type}") def can_make_request(self, provider: str) -> Tuple[bool, Optional[str]]: """ Check if request can be made without exceeding rate limit Args: provider: Provider name Returns: Tuple of (can_proceed, reason_if_blocked) """ with self.lock: if provider not in self.limits: # No limit configured, allow request return True, None limit_info = self.limits[provider] now = datetime.now() # Check if we need to reset the counter if now >= limit_info["reset_time"]: self._reset_limit(provider) limit_info = self.limits[provider] # Check if under limit if limit_info["current_usage"] < limit_info["limit_value"]: return True, None else: seconds_until_reset = (limit_info["reset_time"] - now).total_seconds() return False, f"Rate limit reached. Reset in {int(seconds_until_reset)}s" def record_request(self, provider: str): """ Record a request against the rate limit Args: provider: Provider name """ with self.lock: if provider not in self.limits: logger.warning(f"Recording request for unconfigured provider: {provider}") return limit_info = self.limits[provider] now = datetime.now() # Check if we need to reset first if now >= limit_info["reset_time"]: self._reset_limit(provider) limit_info = self.limits[provider] # Increment usage limit_info["current_usage"] += 1 limit_info["last_request_time"] = now # Log warning if approaching limit percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 if percentage >= 80: logger.warning( f"Rate limit warning for {provider}: {percentage:.1f}% used " f"({limit_info['current_usage']}/{limit_info['limit_value']})" ) def _reset_limit(self, provider: str): """ Reset rate limit counter Args: provider: Provider name """ if provider not in self.limits: return limit_info = self.limits[provider] limit_type = limit_info["limit_type"] now = datetime.now() # Calculate new reset time if limit_type == "per_second": reset_time = now + timedelta(seconds=1) elif limit_type == "per_minute": reset_time = now + timedelta(minutes=1) elif limit_type == "per_hour": reset_time = now + timedelta(hours=1) elif limit_type == "per_day": reset_time = now + timedelta(days=1) else: reset_time = now + timedelta(minutes=1) limit_info["current_usage"] = 0 limit_info["reset_time"] = reset_time logger.debug(f"Reset rate limit for {provider}. Next reset: {reset_time}") def get_status(self, provider: str) -> Optional[Dict]: """ Get current rate limit status for provider Args: provider: Provider name Returns: Dict with limit info or None if not configured """ with self.lock: if provider not in self.limits: return None limit_info = self.limits[provider] now = datetime.now() # Check if needs reset if now >= limit_info["reset_time"]: self._reset_limit(provider) limit_info = self.limits[provider] percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 if limit_info["limit_value"] > 0 else 0 seconds_until_reset = max(0, (limit_info["reset_time"] - now).total_seconds()) status = "ok" if percentage >= 100: status = "blocked" elif percentage >= 80: status = "warning" return { "provider": provider, "limit_type": limit_info["limit_type"], "limit_value": limit_info["limit_value"], "current_usage": limit_info["current_usage"], "percentage": round(percentage, 1), "reset_time": limit_info["reset_time"].isoformat(), "reset_in_seconds": int(seconds_until_reset), "status": status, "last_request_time": limit_info["last_request_time"].isoformat() if limit_info["last_request_time"] else None } def get_all_statuses(self) -> Dict[str, Dict]: """ Get rate limit status for all providers Returns: Dict mapping provider names to their rate limit status """ with self.lock: return { provider: self.get_status(provider) for provider in self.limits.keys() } def remove_limit(self, provider: str): """ Remove rate limit configuration for provider Args: provider: Provider name """ with self.lock: if provider in self.limits: del self.limits[provider] logger.info(f"Removed rate limit for {provider}") # Global rate limiter instance rate_limiter = RateLimiter()