
Python
Professional applications require optimized HTTP handling. Connection pooling, streaming, compression, and intelligent retry strategies dramatically improve performance and reliability at scale.
Reusing connections is crucial for performance:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_retries(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 503, 504)
):
"""Create session with automatic retries and connection pooling"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=['GET', 'POST', 'PUT', 'DELETE']
)
# Mount adapter with retry strategy
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=100, # Connection pool size
pool_maxsize=100
)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
# Usage
session = create_session_with_retries()
# All requests use connection pooling and automatic retries
for i in range(10):
response = session.get('https://api.example.com/data')
print(f"Request {i+1}: {response.status_code}")
# Connections are reused across requestsFor large files or streaming data:
import requests
# ❌ Bad - loads entire response into memory
response = requests.get('https://example.com/large-file.zip')
data = response.content # Entire file in memory!
# ✅ Good - stream the response
response = requests.get(
'https://example.com/large-file.zip',
stream=True
)
# Process in chunks
total_size = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
total_size += len(chunk)
# Process chunk (write to file, process, etc.)
print(f"Received {total_size} bytes")
# Download large file efficiently
with open('downloaded.zip', 'wb') as f:
response = requests.get('https://example.com/file.zip', stream=True)
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)import requests
import json
# Server sends newline-delimited JSON
response = requests.get('https://api.example.com/stream', stream=True)
for line in response.iter_lines():
if line:
data = json.loads(line)
print(f"Received: {data}")Automatically handle and optimize compression:
import requests
import gzip
import io
def decompress_if_needed(response):
"""Manually decompress if needed"""
if response.headers.get('content-encoding') == 'gzip':
return gzip.decompress(response.content)
return response.content
# Most libraries handle transparently
response = requests.get('https://api.example.com/data')
# requests automatically decompresses
# Accept compressed responses
headers = {
'Accept-Encoding': 'gzip, deflate, br'
}
response = requests.get('https://api.example.com/data', headers=headers)
# Check what encoding was used
print(response.headers.get('content-encoding'))import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
import logging
logger = logging.getLogger(__name__)
class AdaptiveRetryStrategy:
"""Intelligent retry with exponential backoff and jitter"""
def __init__(self, base_delay=1, max_delay=60, jitter=True):
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
def get_wait_time(self, attempt, last_response=None):
"""Calculate wait time for retry"""
# Check Retry-After header first
if last_response and 'Retry-After' in last_response.headers:
return int(last_response.headers['Retry-After'])
# Exponential backoff
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
# Add jitter to prevent thundering herd
if self.jitter:
import random
delay *= random.uniform(0.5, 1.5)
return delay
def retry_request(self, url, method='GET', max_retries=3, **kwargs):
"""Make request with adaptive retries"""
for attempt in range(max_retries):
try:
if method == 'GET':
response = requests.get(url, **kwargs)
elif method == 'POST':
response = requests.post(url, **kwargs)
else:
response = requests.request(method, url, **kwargs)
# Success
if response.ok:
return response
# Server error - retry
if response.status_code >= 500:
if attempt < max_retries - 1:
wait_time = self.get_wait_time(attempt, response)
logger.warning(f"Server error {response.status_code}. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
# Client error - don't retry
return response
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = self.get_wait_time(attempt)
logger.warning(f"Request error: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
return None
# Usage
retry_strategy = AdaptiveRetryStrategy(base_delay=1, max_delay=32)
response = retry_strategy.retry_request(
'https://unstable-api.example.com/data',
max_retries=5
)import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_multiple_urls(urls, max_workers=5, timeout=10):
"""Fetch multiple URLs concurrently"""
results = {}
session = requests.Session()
def fetch(url):
try:
response = session.get(url, timeout=timeout)
return url, response.json(), None
except Exception as e:
return url, None, str(e)
# Use thread pool for I/O-bound operations
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url, data, error = future.result()
if error:
print(f"❌ {url}: {error}")
results[url] = None
else:
print(f"✓ {url}: Success")
results[url] = data
return results
# Usage
urls = [
'https://api.github.com/users/octocat',
'https://api.github.com/users/gvanrossum',
'https://api.github.com/users/brettcannon'
]
start = time.time()
results = fetch_multiple_urls(urls, max_workers=5)
elapsed = time.time() - start
print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")import httpx
import asyncio
async def fetch_multiple_urls_async(urls):
"""Fetch multiple URLs with async"""
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
# Usage (Python 3.7+)
urls = [
'https://api.github.com/users/octocat',
'https://api.github.com/users/gvanrossum'
]
results = asyncio.run(fetch_multiple_urls_async(urls))import requests
import time
from threading import Lock
class OAuth2Session:
"""OAuth 2.0 with automatic token refresh"""
def __init__(self, client_id, client_secret, token_url):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.token = None
self.expires_at = 0
self.lock = Lock()
def refresh_token(self):
"""Get new token"""
with self.lock:
# Check again inside lock (double-check locking)
if time.time() < self.expires_at:
return
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials'
}
response = requests.post(self.token_url, data=data)
token_data = response.json()
self.token = token_data['access_token']
self.expires_at = time.time() + token_data.get('expires_in', 3600) - 60
def get(self, url, **kwargs):
"""Make authenticated GET request"""
self.refresh_token()
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {self.token}'
return requests.get(url, headers=headers, **kwargs)
def post(self, url, **kwargs):
"""Make authenticated POST request"""
self.refresh_token()
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {self.token}'
return requests.post(url, headers=headers, **kwargs)
# Usage
oauth = OAuth2Session(
client_id='YOUR_CLIENT_ID',
client_secret='YOUR_CLIENT_SECRET',
token_url='https://auth.example.com/token'
)
response = oauth.get('https://api.example.com/data')import requests
import logging
from requests.adapters import HTTPAdapter
import http.client as http_client
# Enable HTTP debug logging
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
# Log HTTP details
requests_log = logging.getLogger('requests.packages.urllib3')
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
# Custom request logger
class RequestLogger:
def __init__(self):
self.logger = logging.getLogger('api_client')
def log_request(self, method, url, headers=None, data=None):
self.logger.debug(f"{method} {url}")
if headers:
self.logger.debug(f"Headers: {headers}")
if data:
self.logger.debug(f"Body: {data}")
def log_response(self, response):
self.logger.debug(f"Status: {response.status_code}")
self.logger.debug(f"Headers: {dict(response.headers)}")
self.logger.debug(f"Body: {response.text[:500]}")
# Use logger
logger = RequestLogger()
response = requests.get('https://api.example.com/data')
logger.log_response(response)import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
class HighPerformanceAPIClient:
"""API client with all optimization techniques"""
def __init__(self, base_url, pool_size=100, timeout=30):
self.base_url = base_url
self.session = self._create_optimized_session(pool_size, timeout)
self.timeout = timeout
def _create_optimized_session(self, pool_size, timeout):
"""Create session with all optimizations"""
session = requests.Session()
# 1. Retry strategy
retry = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=(500, 502, 503, 504)
)
# 2. Connection pooling
adapter = HTTPAdapter(
max_retries=retry,
pool_connections=pool_size,
pool_maxsize=pool_size
)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 3. Headers for efficiency
session.headers.update({
'User-Agent': 'HighPerformanceClient/1.0',
'Accept-Encoding': 'gzip, deflate'
})
return session
def get(self, endpoint, stream=False, **kwargs):
"""Optimized GET request"""
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.get(
url,
timeout=self.timeout,
stream=stream,
**kwargs
)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
def batch_get(self, endpoints, stream=False):
"""Get multiple endpoints efficiently"""
results = []
for endpoint in endpoints:
response = self.get(endpoint, stream=stream)
if response:
results.append(response.json())
return results
# Usage
client = HighPerformanceAPIClient('https://api.example.com')
data = client.get('users/1')| Optimization | Benefit | Implementation |
|---|---|---|
| Connection Pooling | Reuse connections | HTTPAdapter with pool_size |
| Streaming | Lower memory usage | stream=True, iter_content() |
| Compression | Reduce bandwidth | Accept-Encoding header |
| Retries | Reliability | Retry strategy with backoff |
| Concurrency | Speed | ThreadPoolExecutor or async |
| Token Refresh | Auto-renewal | Check expiry, refresh before use |
| Timeouts | Prevent hanging | Set timeout parameter |
| Logging | Debugging | Debug logging and custom logs |
Explore advanced HTTP status codes and error scenarios.
Ready for advanced challenges? Try advanced challenges
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward