
Python
Production APIs require sophisticated rate limiting. Master token bucket, sliding window, and distributed rate limiting across multiple servers using Redis.
import time
import math
from typing import Tuple
class LeakyBucketRateLimiter:
"""Leaky bucket algorithm with burst support"""
def __init__(self, rate: float, capacity: float):
"""
rate: tokens per second
capacity: max tokens in bucket
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
def allow_request(self, tokens_needed: float = 1) -> Tuple[bool, float]:
"""Check if request is allowed and return wait time"""
now = time.time()
elapsed = now - self.last_update
# Add tokens based on elapsed time
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True, 0
# Calculate wait time
wait_time = (tokens_needed - self.tokens) / self.rate
return False, wait_time
# Usage
limiter = LeakyBucketRateLimiter(rate=10, capacity=100) # 10 req/s, burst of 100
for i in range(50):
allowed, wait = limiter.allow_request()
if allowed:
print(f"Request {i}: allowed")
else:
print(f"Request {i}: wait {wait:.2f}s")
time.sleep(wait)from collections import deque
from datetime import datetime, timedelta
class SlidingWindowRateLimiter:
"""Sliding window rate limiting - more accurate than fixed windows"""
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.window = timedelta(minutes=1)
self.requests = deque()
def allow_request(self) -> bool:
"""Check if request is allowed"""
now = datetime.utcnow()
# Remove old requests outside window
while self.requests and self.requests[0] < now - self.window:
self.requests.popleft()
# Check if under limit
if len(self.requests) < self.requests_per_minute:
self.requests.append(now)
return True
return False
def get_remaining(self) -> int:
"""Get remaining requests in window"""
return max(0, self.requests_per_minute - len(self.requests))
def get_reset_time(self) -> float:
"""Get seconds until window resets"""
if not self.requests:
return 0
oldest = self.requests[0]
reset_time = (oldest + self.window) - datetime.utcnow()
return max(0, reset_time.total_seconds())
# Usage
limiter = SlidingWindowRateLimiter(requests_per_minute=10)
for i in range(15):
if limiter.allow_request():
print(f"Request {i}: allowed")
else:
reset_in = limiter.get_reset_time()
print(f"Request {i}: denied, reset in {reset_in:.1f}s")import redis
import time
from typing import Tuple
class DistributedRateLimiter:
"""Rate limiter using Redis for distributed systems"""
def __init__(self, redis_url='redis://localhost:6379', rate=10, capacity=100):
self.redis = redis.from_url(redis_url)
self.rate = rate # tokens per second
self.capacity = capacity
def allow_request(self, key: str, tokens_needed: int = 1) -> Tuple[bool, float]:
"""Check if request allowed using Redis"""
now = time.time()
# Lua script for atomic operation
script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local tokens_needed = tonumber(ARGV[4])
local bucket = redis.call('HGETALL', key)
local tokens = capacity
local last_update = now
if #bucket > 0 then
tokens = tonumber(bucket[2])
last_update = tonumber(bucket[4])
end
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * rate)
if tokens >= tokens_needed then
tokens = tokens - tokens_needed
redis.call('HSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return {1, 0}
else
local wait_time = (tokens_needed - tokens) / rate
return {0, wait_time}
end
"""
result = self.redis.eval(
script, 1, key,
self.capacity, self.rate, now, tokens_needed
)
return result[0] == 1, result[1]
# Usage
limiter = DistributedRateLimiter(rate=10, capacity=100)
# Works across multiple processes/servers
user_key = 'user:123:rate_limit'
for i in range(15):
allowed, wait = limiter.allow_request(user_key)
if allowed:
print(f"Request {i}: allowed")
else:
print(f"Request {i}: wait {wait:.2f}s")from enum import Enum
class LoadLevel(Enum):
LOW = 0.3
NORMAL = 1.0
HIGH = 0.7
CRITICAL = 0.5
class AdaptiveRateLimiter:
"""Adjust rate limits based on server load"""
def __init__(self, base_rps=100): # requests per second
self.base_rps = base_rps
self.current_load = 0.5 # 0-1 scale
self.limiter = None
def update_load(self, cpu_percent: float, memory_percent: float, queue_depth: int):
"""Update server load metrics"""
# Calculate composite load
self.current_load = (
(cpu_percent / 100 * 0.4) +
(memory_percent / 100 * 0.4) +
(min(queue_depth, 100) / 100 * 0.2)
)
def get_current_limit(self) -> float:
"""Get current rate limit based on load"""
# Reduce rate limit as load increases
adjustment_factor = 1 - self.current_load
return self.base_rps * adjustment_factor
def allow_request(self) -> bool:
"""Check if request allowed under current conditions"""
current_limit = self.get_current_limit()
# Use token bucket with dynamic rate
if not self.limiter or self.limiter.rate != current_limit:
self.limiter = LeakyBucketRateLimiter(
rate=current_limit,
capacity=current_limit * 2
)
allowed, _ = self.limiter.allow_request()
return allowed
# Usage
limiter = AdaptiveRateLimiter(base_rps=100)
# Monitor server and update limits
while True:
# Get current metrics
cpu = 45 # percent
memory = 60 # percent
queue = 10 # requests in queue
limiter.update_load(cpu, memory, queue)
if limiter.allow_request():
print(f"Request allowed (limit: {limiter.get_current_limit():.0f} rps)")
else:
print(f"Request denied (limit: {limiter.get_current_limit():.0f} rps)")
time.sleep(0.1)from enum import Enum
class UserTier(Enum):
FREE = 100
BASIC = 1000
PREMIUM = 10000
ENTERPRISE = 100000
class TieredRateLimiter:
"""Different rate limits per user tier"""
def __init__(self):
self.limiters = {} # user_id -> limiter
def get_limit(self, user_tier: UserTier) -> int:
"""Get rate limit for tier"""
return user_tier.value
def allow_request(self, user_id: int, user_tier: UserTier) -> bool:
"""Check if request allowed"""
if user_id not in self.limiters:
limit = self.get_limit(user_tier)
# Create new limiter for this user
self.limiters[user_id] = SlidingWindowRateLimiter(
requests_per_minute=limit // 60 # Convert to per-minute
)
return self.limiters[user_id].allow_request()
def get_remaining(self, user_id: int) -> int:
"""Get remaining requests for user"""
if user_id in self.limiters:
return self.limiters[user_id].get_remaining()
return 0
# Flask integration
from flask import Flask, request, jsonify
app = Flask(__name__)
tier_limiter = TieredRateLimiter()
@app.before_request
def check_tier_limit():
"""Check rate limit based on user tier"""
user_id = get_user_id_from_request()
user_tier = get_user_tier_from_db(user_id)
if not tier_limiter.allow_request(user_id, user_tier):
remaining = tier_limiter.get_remaining(user_id)
return jsonify({
'error': 'Rate limit exceeded',
'tier': user_tier.name,
'remaining': remaining
}), 429from collections import defaultdict
from datetime import datetime, timedelta
class RateLimitAnalytics:
"""Track rate limit violations and patterns"""
def __init__(self):
self.violations = defaultdict(list)
self.limit_history = defaultdict(list)
def record_violation(self, user_id: int, reason: str):
"""Record rate limit violation"""
self.violations[user_id].append({
'time': datetime.utcnow(),
'reason': reason
})
def record_request(self, user_id: int, allowed: bool):
"""Record request attempt"""
self.limit_history[user_id].append({
'time': datetime.utcnow(),
'allowed': allowed
})
def get_abuse_patterns(self, hours: int = 24) -> dict:
"""Identify potential abuse patterns"""
cutoff = datetime.utcnow() - timedelta(hours=hours)
abuse_users = {}
for user_id, violations in self.violations.items():
recent = [v for v in violations if v['time'] > cutoff]
if len(recent) > 10: # More than 10 violations per day
abuse_users[user_id] = {
'violations': len(recent),
'rate': len(recent) / hours
}
return abuse_users
def get_user_stats(self, user_id: int, hours: int = 24) -> dict:
"""Get detailed stats for user"""
cutoff = datetime.utcnow() - timedelta(hours=hours)
history = [
r for r in self.limit_history[user_id]
if r['time'] > cutoff
]
if not history:
return {}
allowed = sum(1 for r in history if r['allowed'])
total = len(history)
return {
'total_requests': total,
'allowed': allowed,
'denied': total - allowed,
'allow_rate': allowed / total if total > 0 else 0,
'violations': len(self.violations.get(user_id, []))
}
# Usage
analytics = RateLimitAnalytics()
# Track violations
analytics.record_violation(123, 'Exceeded rate limit')
analytics.record_request(123, True)
analytics.record_request(123, False)
# Analyze patterns
abuse = analytics.get_abuse_patterns(hours=24)
stats = analytics.get_user_stats(123, hours=24)
print(f"Abuse patterns: {abuse}")
print(f"User stats: {stats}")| Algorithm | Accuracy | Complexity | Use Case |
|---|---|---|---|
| Token Bucket | Medium | Low | Simple APIs |
| Leaky Bucket | High | Medium | Smooth throttling |
| Sliding Window | Very High | Medium | Precise limits |
| Fixed Window | Low | Very Low | Simple cases |
| Distributed (Redis) | High | High | Multi-server |
| Adaptive | Medium | High | Load-based |
| Tier-based | High | Medium | SaaS apps |
Learn advanced API building techniques.
Ready for advanced challenges? Try advanced challenges
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward