Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
REST API BasicsHTTP RequestsStatus CodesJSON SerializationError HandlingAPI AuthenticationRate LimitingBuilding APIsWeb Scraping Basics
Python/Apis Json/Rate Limiting

⚙️ Advanced Rate Limiting — Sophisticated Throttling Strategies

Production APIs require sophisticated rate limiting. Master token bucket, sliding window, and distributed rate limiting across multiple servers using Redis.


🪣 Advanced Token Bucket with Leaky Bucket

import time
import math
from typing import Tuple

class LeakyBucketRateLimiter:
    """Leaky bucket algorithm with burst support"""

    def __init__(self, rate: float, capacity: float):
        """
        rate: tokens per second
        capacity: max tokens in bucket
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()

    def allow_request(self, tokens_needed: float = 1) -> Tuple[bool, float]:
        """Check if request is allowed and return wait time"""

        now = time.time()
        elapsed = now - self.last_update

        # Add tokens based on elapsed time
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.rate
        )

        self.last_update = now

        if self.tokens >= tokens_needed:
            self.tokens -= tokens_needed
            return True, 0

        # Calculate wait time
        wait_time = (tokens_needed - self.tokens) / self.rate
        return False, wait_time

# Usage
limiter = LeakyBucketRateLimiter(rate=10, capacity=100)  # 10 req/s, burst of 100

for i in range(50):
    allowed, wait = limiter.allow_request()
    if allowed:
        print(f"Request {i}: allowed")
    else:
        print(f"Request {i}: wait {wait:.2f}s")
        time.sleep(wait)

🪟 Sliding Window Rate Limiting

from collections import deque
from datetime import datetime, timedelta

class SlidingWindowRateLimiter:
    """Sliding window rate limiting - more accurate than fixed windows"""

    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.window = timedelta(minutes=1)
        self.requests = deque()

    def allow_request(self) -> bool:
        """Check if request is allowed"""

        now = datetime.utcnow()

        # Remove old requests outside window
        while self.requests and self.requests[0] < now - self.window:
            self.requests.popleft()

        # Check if under limit
        if len(self.requests) < self.requests_per_minute:
            self.requests.append(now)
            return True

        return False

    def get_remaining(self) -> int:
        """Get remaining requests in window"""
        return max(0, self.requests_per_minute - len(self.requests))

    def get_reset_time(self) -> float:
        """Get seconds until window resets"""
        if not self.requests:
            return 0

        oldest = self.requests[0]
        reset_time = (oldest + self.window) - datetime.utcnow()
        return max(0, reset_time.total_seconds())

# Usage
limiter = SlidingWindowRateLimiter(requests_per_minute=10)

for i in range(15):
    if limiter.allow_request():
        print(f"Request {i}: allowed")
    else:
        reset_in = limiter.get_reset_time()
        print(f"Request {i}: denied, reset in {reset_in:.1f}s")

🔴 Token Bucket with Distributed Support (Redis)

import redis
import time
from typing import Tuple

class DistributedRateLimiter:
    """Rate limiter using Redis for distributed systems"""

    def __init__(self, redis_url='redis://localhost:6379', rate=10, capacity=100):
        self.redis = redis.from_url(redis_url)
        self.rate = rate  # tokens per second
        self.capacity = capacity

    def allow_request(self, key: str, tokens_needed: int = 1) -> Tuple[bool, float]:
        """Check if request allowed using Redis"""

        now = time.time()

        # Lua script for atomic operation
        script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local rate = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        local tokens_needed = tonumber(ARGV[4])

        local bucket = redis.call('HGETALL', key)
        local tokens = capacity
        local last_update = now

        if #bucket > 0 then
            tokens = tonumber(bucket[2])
            last_update = tonumber(bucket[4])
        end

        local elapsed = now - last_update
        tokens = math.min(capacity, tokens + elapsed * rate)

        if tokens >= tokens_needed then
            tokens = tokens - tokens_needed
            redis.call('HSET', key, 'tokens', tokens, 'last_update', now)
            redis.call('EXPIRE', key, 3600)
            return {1, 0}
        else
            local wait_time = (tokens_needed - tokens) / rate
            return {0, wait_time}
        end
        """

        result = self.redis.eval(
            script, 1, key,
            self.capacity, self.rate, now, tokens_needed
        )

        return result[0] == 1, result[1]

# Usage
limiter = DistributedRateLimiter(rate=10, capacity=100)

# Works across multiple processes/servers
user_key = 'user:123:rate_limit'

for i in range(15):
    allowed, wait = limiter.allow_request(user_key)
    if allowed:
        print(f"Request {i}: allowed")
    else:
        print(f"Request {i}: wait {wait:.2f}s")

🎯 Adaptive Rate Limiting

from enum import Enum

class LoadLevel(Enum):
    LOW = 0.3
    NORMAL = 1.0
    HIGH = 0.7
    CRITICAL = 0.5

class AdaptiveRateLimiter:
    """Adjust rate limits based on server load"""

    def __init__(self, base_rps=100):  # requests per second
        self.base_rps = base_rps
        self.current_load = 0.5  # 0-1 scale
        self.limiter = None

    def update_load(self, cpu_percent: float, memory_percent: float, queue_depth: int):
        """Update server load metrics"""

        # Calculate composite load
        self.current_load = (
            (cpu_percent / 100 * 0.4) +
            (memory_percent / 100 * 0.4) +
            (min(queue_depth, 100) / 100 * 0.2)
        )

    def get_current_limit(self) -> float:
        """Get current rate limit based on load"""

        # Reduce rate limit as load increases
        adjustment_factor = 1 - self.current_load

        return self.base_rps * adjustment_factor

    def allow_request(self) -> bool:
        """Check if request allowed under current conditions"""

        current_limit = self.get_current_limit()

        # Use token bucket with dynamic rate
        if not self.limiter or self.limiter.rate != current_limit:
            self.limiter = LeakyBucketRateLimiter(
                rate=current_limit,
                capacity=current_limit * 2
            )

        allowed, _ = self.limiter.allow_request()
        return allowed

# Usage
limiter = AdaptiveRateLimiter(base_rps=100)

# Monitor server and update limits
while True:
    # Get current metrics
    cpu = 45  # percent
    memory = 60  # percent
    queue = 10  # requests in queue

    limiter.update_load(cpu, memory, queue)

    if limiter.allow_request():
        print(f"Request allowed (limit: {limiter.get_current_limit():.0f} rps)")
    else:
        print(f"Request denied (limit: {limiter.get_current_limit():.0f} rps)")

    time.sleep(0.1)

🎭 User-Tier Based Rate Limiting

from enum import Enum

class UserTier(Enum):
    FREE = 100
    BASIC = 1000
    PREMIUM = 10000
    ENTERPRISE = 100000

class TieredRateLimiter:
    """Different rate limits per user tier"""

    def __init__(self):
        self.limiters = {}  # user_id -> limiter

    def get_limit(self, user_tier: UserTier) -> int:
        """Get rate limit for tier"""
        return user_tier.value

    def allow_request(self, user_id: int, user_tier: UserTier) -> bool:
        """Check if request allowed"""

        if user_id not in self.limiters:
            limit = self.get_limit(user_tier)
            # Create new limiter for this user
            self.limiters[user_id] = SlidingWindowRateLimiter(
                requests_per_minute=limit // 60  # Convert to per-minute
            )

        return self.limiters[user_id].allow_request()

    def get_remaining(self, user_id: int) -> int:
        """Get remaining requests for user"""
        if user_id in self.limiters:
            return self.limiters[user_id].get_remaining()
        return 0

# Flask integration
from flask import Flask, request, jsonify

app = Flask(__name__)
tier_limiter = TieredRateLimiter()

@app.before_request
def check_tier_limit():
    """Check rate limit based on user tier"""

    user_id = get_user_id_from_request()
    user_tier = get_user_tier_from_db(user_id)

    if not tier_limiter.allow_request(user_id, user_tier):
        remaining = tier_limiter.get_remaining(user_id)
        return jsonify({
            'error': 'Rate limit exceeded',
            'tier': user_tier.name,
            'remaining': remaining
        }), 429

📊 Rate Limit Metrics and Analytics

from collections import defaultdict
from datetime import datetime, timedelta

class RateLimitAnalytics:
    """Track rate limit violations and patterns"""

    def __init__(self):
        self.violations = defaultdict(list)
        self.limit_history = defaultdict(list)

    def record_violation(self, user_id: int, reason: str):
        """Record rate limit violation"""

        self.violations[user_id].append({
            'time': datetime.utcnow(),
            'reason': reason
        })

    def record_request(self, user_id: int, allowed: bool):
        """Record request attempt"""

        self.limit_history[user_id].append({
            'time': datetime.utcnow(),
            'allowed': allowed
        })

    def get_abuse_patterns(self, hours: int = 24) -> dict:
        """Identify potential abuse patterns"""

        cutoff = datetime.utcnow() - timedelta(hours=hours)

        abuse_users = {}

        for user_id, violations in self.violations.items():
            recent = [v for v in violations if v['time'] > cutoff]

            if len(recent) > 10:  # More than 10 violations per day
                abuse_users[user_id] = {
                    'violations': len(recent),
                    'rate': len(recent) / hours
                }

        return abuse_users

    def get_user_stats(self, user_id: int, hours: int = 24) -> dict:
        """Get detailed stats for user"""

        cutoff = datetime.utcnow() - timedelta(hours=hours)

        history = [
            r for r in self.limit_history[user_id]
            if r['time'] > cutoff
        ]

        if not history:
            return {}

        allowed = sum(1 for r in history if r['allowed'])
        total = len(history)

        return {
            'total_requests': total,
            'allowed': allowed,
            'denied': total - allowed,
            'allow_rate': allowed / total if total > 0 else 0,
            'violations': len(self.violations.get(user_id, []))
        }

# Usage
analytics = RateLimitAnalytics()

# Track violations
analytics.record_violation(123, 'Exceeded rate limit')
analytics.record_request(123, True)
analytics.record_request(123, False)

# Analyze patterns
abuse = analytics.get_abuse_patterns(hours=24)
stats = analytics.get_user_stats(123, hours=24)

print(f"Abuse patterns: {abuse}")
print(f"User stats: {stats}")

✅ Key Takeaways

AlgorithmAccuracyComplexityUse Case
Token BucketMediumLowSimple APIs
Leaky BucketHighMediumSmooth throttling
Sliding WindowVery HighMediumPrecise limits
Fixed WindowLowVery LowSimple cases
Distributed (Redis)HighHighMulti-server
AdaptiveMediumHighLoad-based
Tier-basedHighMediumSaaS apps

🔗 What's Next?

Learn advanced API building techniques.

Next: Advanced API Building →


Ready for advanced challenges? Try advanced challenges


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn