Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
REST API BasicsHTTP RequestsStatus CodesJSON SerializationError HandlingAPI AuthenticationRate LimitingBuilding APIsWeb Scraping Basics
Python/Apis Json/Http Requests

⚡ Advanced HTTP Requests — Performance & Optimization

Professional applications require optimized HTTP handling. Connection pooling, streaming, compression, and intelligent retry strategies dramatically improve performance and reliability at scale.


🔌 Connection Pooling with HTTPAdapter

Reusing connections is crucial for performance:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session_with_retries(
    retries=3,
    backoff_factor=0.3,
    status_forcelist=(500, 502, 503, 504)
):
    """Create session with automatic retries and connection pooling"""

    session = requests.Session()

    # Configure retry strategy
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        allowed_methods=['GET', 'POST', 'PUT', 'DELETE']
    )

    # Mount adapter with retry strategy
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=100,  # Connection pool size
        pool_maxsize=100
    )

    session.mount('http://', adapter)
    session.mount('https://', adapter)

    return session

# Usage
session = create_session_with_retries()

# All requests use connection pooling and automatic retries
for i in range(10):
    response = session.get('https://api.example.com/data')
    print(f"Request {i+1}: {response.status_code}")

# Connections are reused across requests

📦 Streaming Large Responses

For large files or streaming data:

import requests

# ❌ Bad - loads entire response into memory
response = requests.get('https://example.com/large-file.zip')
data = response.content  # Entire file in memory!

# ✅ Good - stream the response
response = requests.get(
    'https://example.com/large-file.zip',
    stream=True
)

# Process in chunks
total_size = 0
for chunk in response.iter_content(chunk_size=8192):
    if chunk:
        total_size += len(chunk)
        # Process chunk (write to file, process, etc.)

print(f"Received {total_size} bytes")

# Download large file efficiently
with open('downloaded.zip', 'wb') as f:
    response = requests.get('https://example.com/file.zip', stream=True)
    for chunk in response.iter_content(chunk_size=8192):
        if chunk:
            f.write(chunk)

Stream JSON Lines

import requests
import json

# Server sends newline-delimited JSON
response = requests.get('https://api.example.com/stream', stream=True)

for line in response.iter_lines():
    if line:
        data = json.loads(line)
        print(f"Received: {data}")

🗜️ Compression Handling

Automatically handle and optimize compression:

import requests
import gzip
import io

def decompress_if_needed(response):
    """Manually decompress if needed"""
    if response.headers.get('content-encoding') == 'gzip':
        return gzip.decompress(response.content)
    return response.content

# Most libraries handle transparently
response = requests.get('https://api.example.com/data')
# requests automatically decompresses

# Accept compressed responses
headers = {
    'Accept-Encoding': 'gzip, deflate, br'
}
response = requests.get('https://api.example.com/data', headers=headers)

# Check what encoding was used
print(response.headers.get('content-encoding'))

🔄 Advanced Retry Strategies

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
import logging

logger = logging.getLogger(__name__)

class AdaptiveRetryStrategy:
    """Intelligent retry with exponential backoff and jitter"""

    def __init__(self, base_delay=1, max_delay=60, jitter=True):
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter

    def get_wait_time(self, attempt, last_response=None):
        """Calculate wait time for retry"""

        # Check Retry-After header first
        if last_response and 'Retry-After' in last_response.headers:
            return int(last_response.headers['Retry-After'])

        # Exponential backoff
        delay = min(self.base_delay * (2 ** attempt), self.max_delay)

        # Add jitter to prevent thundering herd
        if self.jitter:
            import random
            delay *= random.uniform(0.5, 1.5)

        return delay

    def retry_request(self, url, method='GET', max_retries=3, **kwargs):
        """Make request with adaptive retries"""

        for attempt in range(max_retries):
            try:
                if method == 'GET':
                    response = requests.get(url, **kwargs)
                elif method == 'POST':
                    response = requests.post(url, **kwargs)
                else:
                    response = requests.request(method, url, **kwargs)

                # Success
                if response.ok:
                    return response

                # Server error - retry
                if response.status_code >= 500:
                    if attempt < max_retries - 1:
                        wait_time = self.get_wait_time(attempt, response)
                        logger.warning(f"Server error {response.status_code}. Waiting {wait_time}s...")
                        time.sleep(wait_time)
                        continue

                # Client error - don't retry
                return response

            except requests.exceptions.RequestException as e:
                if attempt < max_retries - 1:
                    wait_time = self.get_wait_time(attempt)
                    logger.warning(f"Request error: {e}. Retrying in {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    raise

        return None

# Usage
retry_strategy = AdaptiveRetryStrategy(base_delay=1, max_delay=32)
response = retry_strategy.retry_request(
    'https://unstable-api.example.com/data',
    max_retries=5
)

🚀 Concurrent Requests

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_multiple_urls(urls, max_workers=5, timeout=10):
    """Fetch multiple URLs concurrently"""

    results = {}
    session = requests.Session()

    def fetch(url):
        try:
            response = session.get(url, timeout=timeout)
            return url, response.json(), None
        except Exception as e:
            return url, None, str(e)

    # Use thread pool for I/O-bound operations
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(fetch, url): url for url in urls}

        for future in as_completed(futures):
            url, data, error = future.result()

            if error:
                print(f"❌ {url}: {error}")
                results[url] = None
            else:
                print(f"✓ {url}: Success")
                results[url] = data

    return results

# Usage
urls = [
    'https://api.github.com/users/octocat',
    'https://api.github.com/users/gvanrossum',
    'https://api.github.com/users/brettcannon'
]

start = time.time()
results = fetch_multiple_urls(urls, max_workers=5)
elapsed = time.time() - start

print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")

Async with HTTPX (Modern Alternative)

import httpx
import asyncio

async def fetch_multiple_urls_async(urls):
    """Fetch multiple URLs with async"""

    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)

    return [r.json() for r in responses]

# Usage (Python 3.7+)
urls = [
    'https://api.github.com/users/octocat',
    'https://api.github.com/users/gvanrossum'
]

results = asyncio.run(fetch_multiple_urls_async(urls))

🔐 Advanced Authentication

OAuth 2.0 with Token Refresh

import requests
import time
from threading import Lock

class OAuth2Session:
    """OAuth 2.0 with automatic token refresh"""

    def __init__(self, client_id, client_secret, token_url):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.token = None
        self.expires_at = 0
        self.lock = Lock()

    def refresh_token(self):
        """Get new token"""
        with self.lock:
            # Check again inside lock (double-check locking)
            if time.time() < self.expires_at:
                return

            data = {
                'client_id': self.client_id,
                'client_secret': self.client_secret,
                'grant_type': 'client_credentials'
            }

            response = requests.post(self.token_url, data=data)
            token_data = response.json()

            self.token = token_data['access_token']
            self.expires_at = time.time() + token_data.get('expires_in', 3600) - 60

    def get(self, url, **kwargs):
        """Make authenticated GET request"""
        self.refresh_token()

        headers = kwargs.pop('headers', {})
        headers['Authorization'] = f'Bearer {self.token}'

        return requests.get(url, headers=headers, **kwargs)

    def post(self, url, **kwargs):
        """Make authenticated POST request"""
        self.refresh_token()

        headers = kwargs.pop('headers', {})
        headers['Authorization'] = f'Bearer {self.token}'

        return requests.post(url, headers=headers, **kwargs)

# Usage
oauth = OAuth2Session(
    client_id='YOUR_CLIENT_ID',
    client_secret='YOUR_CLIENT_SECRET',
    token_url='https://auth.example.com/token'
)

response = oauth.get('https://api.example.com/data')

📊 Request/Response Logging and Debugging

import requests
import logging
from requests.adapters import HTTPAdapter
import http.client as http_client

# Enable HTTP debug logging
http_client.HTTPConnection.debuglevel = 1

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)

# Log HTTP details
requests_log = logging.getLogger('requests.packages.urllib3')
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True

# Custom request logger
class RequestLogger:
    def __init__(self):
        self.logger = logging.getLogger('api_client')

    def log_request(self, method, url, headers=None, data=None):
        self.logger.debug(f"{method} {url}")
        if headers:
            self.logger.debug(f"Headers: {headers}")
        if data:
            self.logger.debug(f"Body: {data}")

    def log_response(self, response):
        self.logger.debug(f"Status: {response.status_code}")
        self.logger.debug(f"Headers: {dict(response.headers)}")
        self.logger.debug(f"Body: {response.text[:500]}")

# Use logger
logger = RequestLogger()
response = requests.get('https://api.example.com/data')
logger.log_response(response)

🎯 Performance Optimization Checklist

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time

class HighPerformanceAPIClient:
    """API client with all optimization techniques"""

    def __init__(self, base_url, pool_size=100, timeout=30):
        self.base_url = base_url
        self.session = self._create_optimized_session(pool_size, timeout)
        self.timeout = timeout

    def _create_optimized_session(self, pool_size, timeout):
        """Create session with all optimizations"""
        session = requests.Session()

        # 1. Retry strategy
        retry = Retry(
            total=3,
            backoff_factor=0.5,
            status_forcelist=(500, 502, 503, 504)
        )

        # 2. Connection pooling
        adapter = HTTPAdapter(
            max_retries=retry,
            pool_connections=pool_size,
            pool_maxsize=pool_size
        )

        session.mount('http://', adapter)
        session.mount('https://', adapter)

        # 3. Headers for efficiency
        session.headers.update({
            'User-Agent': 'HighPerformanceClient/1.0',
            'Accept-Encoding': 'gzip, deflate'
        })

        return session

    def get(self, endpoint, stream=False, **kwargs):
        """Optimized GET request"""
        url = f"{self.base_url}/{endpoint}"

        try:
            response = self.session.get(
                url,
                timeout=self.timeout,
                stream=stream,
                **kwargs
            )
            response.raise_for_status()
            return response

        except requests.exceptions.RequestException as e:
            print(f"Error: {e}")
            return None

    def batch_get(self, endpoints, stream=False):
        """Get multiple endpoints efficiently"""
        results = []

        for endpoint in endpoints:
            response = self.get(endpoint, stream=stream)
            if response:
                results.append(response.json())

        return results

# Usage
client = HighPerformanceAPIClient('https://api.example.com')
data = client.get('users/1')

✅ Key Takeaways

OptimizationBenefitImplementation
Connection PoolingReuse connectionsHTTPAdapter with pool_size
StreamingLower memory usagestream=True, iter_content()
CompressionReduce bandwidthAccept-Encoding header
RetriesReliabilityRetry strategy with backoff
ConcurrencySpeedThreadPoolExecutor or async
Token RefreshAuto-renewalCheck expiry, refresh before use
TimeoutsPrevent hangingSet timeout parameter
LoggingDebuggingDebug logging and custom logs

🔗 What's Next?

Explore advanced HTTP status codes and error scenarios.

Next: Advanced Status Codes →


Ready for advanced challenges? Try advanced challenges


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn