
Python
APIs limit the number of requests you can make in a given time period. This protects servers from overload and ensures fair usage. Understanding rate limiting and implementing proper throttling makes your applications more reliable and respectful of shared resources.
Rate limiting controls how many requests you can make to an API within a specific time period. When you exceed the limit, the API returns a 429 (Too Many Requests) response and temporarily blocks further requests.
import requests
# Making requests too fast
for i in range(1000):
response = requests.get('https://api.example.com/data')
if response.status_code == 429:
print("Rate limited! Too many requests")
break
print(f"Request {i+1}: {response.status_code}")APIs tell you about rate limits through response headers:
import requests
response = requests.get('https://api.github.com/repos/octocat/Hello-World')
# Common rate limit headers
print(response.headers.get('X-RateLimit-Limit')) # Total limit
print(response.headers.get('X-RateLimit-Remaining')) # Requests left
print(response.headers.get('X-RateLimit-Reset')) # When it resets
print(response.headers.get('Retry-After')) # How long to wait
# Example output:
# X-RateLimit-Limit: 60
# X-RateLimit-Remaining: 45
# X-RateLimit-Reset: 1677043200import requests
from datetime import datetime
response = requests.get('https://api.github.com/user')
limit = int(response.headers.get('X-RateLimit-Limit', 0))
remaining = int(response.headers.get('X-RateLimit-Remaining', 0))
reset_timestamp = int(response.headers.get('X-RateLimit-Reset', 0))
reset_time = datetime.fromtimestamp(reset_timestamp)
print(f"Limit: {limit}")
print(f"Remaining: {remaining}")
print(f"Reset at: {reset_time}")
print(f"Usage: {limit - remaining}/{limit}")
# Check if close to limit
if remaining < limit * 0.1: # Less than 10% left
print("Warning: approaching rate limit")import requests
import time
def check_rate_limit(response):
"""Check if response indicates rate limiting"""
if response.status_code == 429:
# Method 1: Check Retry-After header
retry_after = response.headers.get('Retry-After')
if retry_after:
wait_seconds = int(retry_after)
print(f"Rate limited. Wait {wait_seconds} seconds")
return wait_seconds
# Method 2: Calculate from X-RateLimit-Reset
reset_timestamp = response.headers.get('X-RateLimit-Reset')
if reset_timestamp:
reset_time = int(reset_timestamp)
wait_seconds = reset_time - int(time.time())
print(f"Rate limited. Wait {wait_seconds} seconds")
return wait_seconds
# Default wait
return 60
return 0
# Using the function
response = requests.get('https://api.example.com/data')
wait_time = check_rate_limit(response)
if wait_time > 0:
print(f"Sleeping for {wait_time} seconds...")
time.sleep(wait_time)import requests
import time
def fetch_with_retry(url, max_retries=3):
"""Fetch with exponential backoff on rate limiting"""
for attempt in range(max_retries):
response = requests.get(url, timeout=5)
if response.status_code == 429:
# Calculate wait time
retry_after = response.headers.get('Retry-After', str(2 ** attempt))
wait_seconds = int(retry_after)
print(f"Rate limited (attempt {attempt + 1}). Waiting {wait_seconds}s...")
time.sleep(wait_seconds)
# Retry the request
continue
# Success or other error
return response
print(f"Failed after {max_retries} attempts")
return None
# Using the function
response = fetch_with_retry('https://api.example.com/data')
if response and response.ok:
print(response.json())import requests
import time
def fetch_with_exponential_backoff(url, max_retries=4, base_wait=1):
"""Fetch with exponential backoff: 1s, 2s, 4s, 8s"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=5)
if response.status_code == 429:
# Exponential backoff: 2^attempt seconds
wait_time = base_wait * (2 ** attempt)
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
return response
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = base_wait * (2 ** attempt)
print(f"Error: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
return None
# Using the function
response = fetch_with_exponential_backoff('https://api.example.com/data')Instead of reacting to 429 errors, prevent them by monitoring limits:
import requests
import time
class RateLimitedAPIClient:
"""API client that respects rate limits proactively"""
def __init__(self, base_url, requests_per_minute=60):
self.base_url = base_url
self.requests_per_minute = requests_per_minute
self.last_request_time = 0
self.min_interval = 60 / requests_per_minute
def get(self, endpoint):
"""Make GET request with rate limit awareness"""
# Wait if necessary to respect rate limit
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
wait_time = self.min_interval - elapsed
print(f"Rate limiting: waiting {wait_time:.2f}s...")
time.sleep(wait_time)
response = requests.get(f'{self.base_url}/{endpoint}', timeout=5)
self.last_request_time = time.time()
# Check rate limit headers
remaining = response.headers.get('X-RateLimit-Remaining')
if remaining:
print(f"Remaining requests: {remaining}")
return response.json() if response.ok else None
def get_multiple(self, endpoints):
"""Fetch multiple endpoints respecting rate limit"""
results = []
for endpoint in endpoints:
data = self.get(endpoint)
if data:
results.append(data)
return results
# Using the client
client = RateLimitedAPIClient(
'https://api.example.com',
requests_per_minute=30
)
# These requests will be automatically throttled
data1 = client.get('users/1')
data2 = client.get('users/2')
data3 = client.get('users/3')A popular technique for rate limiting:
import time
from collections import deque
class TokenBucketRateLimiter:
"""Rate limiter using token bucket algorithm"""
def __init__(self, capacity, refill_rate):
"""
capacity: max tokens in bucket
refill_rate: tokens added per second
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
def refill(self):
"""Add tokens based on elapsed time"""
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def allow_request(self, tokens_needed=1):
"""Check if request is allowed"""
self.refill()
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
return False
def wait_for_request(self, tokens_needed=1):
"""Wait until request is allowed"""
while not self.allow_request(tokens_needed):
time.sleep(0.1)
# Using token bucket limiter
limiter = TokenBucketRateLimiter(
capacity=100, # Max 100 requests
refill_rate=10 # Add 10 requests per second
)
for i in range(150):
if limiter.allow_request():
print(f"Request {i+1}: allowed")
else:
print(f"Request {i+1}: rate limited, waiting...")
limiter.wait_for_request()import requests
import time
import logging
from typing import Optional, Dict
logger = logging.getLogger(__name__)
class SmartAPIClient:
"""API client with intelligent rate limit handling"""
def __init__(self, base_url, api_key=None):
self.base_url = base_url
self.api_key = api_key
self.rate_limit_remaining = None
self.rate_limit_reset = None
def _update_rate_limit_info(self, response):
"""Update rate limit info from response headers"""
remaining = response.headers.get('X-RateLimit-Remaining')
reset = response.headers.get('X-RateLimit-Reset')
if remaining:
self.rate_limit_remaining = int(remaining)
if reset:
self.rate_limit_reset = int(reset)
if self.rate_limit_remaining == 0:
wait_time = self.rate_limit_reset - int(time.time())
if wait_time > 0:
logger.warning(f"Rate limit exhausted. Waiting {wait_time}s...")
time.sleep(wait_time)
def _get_headers(self):
"""Get request headers with authentication"""
headers = {'User-Agent': 'SmartAPIClient/1.0'}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
return headers
def get(self, endpoint, **kwargs) -> Optional[Dict]:
"""Make GET request with rate limit handling"""
try:
response = requests.get(
f'{self.base_url}/{endpoint}',
headers=self._get_headers(),
timeout=5,
**kwargs
)
# Update rate limit info
self._update_rate_limit_info(response)
# Handle rate limit
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
logger.warning(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
return self.get(endpoint, **kwargs) # Retry
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {e}")
return None
def batch_get(self, endpoints, delay=0.5):
"""Get multiple endpoints with delay between requests"""
results = []
for endpoint in endpoints:
data = self.get(endpoint)
if data:
results.append(data)
# Delay between requests
if endpoint != endpoints[-1]: # Not last
time.sleep(delay)
return results
def get_rate_limit_status(self):
"""Get current rate limit status"""
return {
'remaining': self.rate_limit_remaining,
'reset_at': self.rate_limit_reset
}
# Using the smart client
client = SmartAPIClient('https://api.example.com', api_key='YOUR_KEY')
# Single request
user = client.get('users/1')
# Batch requests with throttling
endpoints = ['users/1', 'users/2', 'users/3', 'users/4', 'users/5']
users = client.batch_get(endpoints, delay=1)
# Check rate limit status
status = client.get_rate_limit_status()
print(f"Rate limit status: {status}")| Concept | Remember |
|---|---|
| Status 429 | Too Many Requests - you've hit the limit |
| Rate Limit Headers | API tells you about limits in response headers |
| Retry-After | Header indicating how long to wait before retrying |
| Exponential Backoff | Wait 1s, 2s, 4s, 8s before retrying |
| Token Bucket | Algorithm that refills tokens over time |
| Proactive Limiting | Monitor limits and throttle before hitting them |
| X-RateLimit-Remaining | Header showing requests you have left |
| Batch Processing | Add delays between requests to stay under limit |
Learn how to build your own APIs to provide data to others.
Ready to practice? Try challenges or explore resources
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward