Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
REST API BasicsHTTP RequestsStatus CodesJSON SerializationError HandlingAPI AuthenticationRate LimitingBuilding APIsWeb Scraping Basics
Python/Apis Json/Error Handling Apis

🚨 Advanced Error Handling — Production-Grade Strategies

Production APIs require sophisticated error handling. Circuit breakers, graceful degradation, comprehensive monitoring, and intelligent recovery strategies separate robust systems from fragile ones.


🔌 Circuit Breaker Pattern

Prevent cascading failures by stopping requests to failing services:

import requests
from enum import Enum
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class CircuitState(Enum):
    CLOSED = 'closed'      # Normal operation
    OPEN = 'open'          # Failing, reject requests
    HALF_OPEN = 'half_open'  # Testing if recovered

class CircuitBreaker:
    """Circuit breaker for external API calls"""

    def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=requests.RequestException):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.success_count = 0

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                logger.info("Circuit breaker: attempting reset")
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)

            if self.state == CircuitState.HALF_OPEN:
                self._on_success()

            return result

        except self.expected_exception as e:
            self._on_failure()
            raise

    def _should_attempt_reset(self):
        """Check if enough time passed to retry"""
        return (
            self.last_failure_time and
            datetime.now() >= self.last_failure_time + timedelta(seconds=self.recovery_timeout)
        )

    def _on_failure(self):
        """Handle failure"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.warning(f"Circuit breaker: OPEN (failures: {self.failure_count})")

    def _on_success(self):
        """Handle success"""
        self.success_count += 1

        if self.success_count >= 2:  # Confirm recovery
            self.state = CircuitState.CLOSED
            self.failure_count = 0
            self.success_count = 0
            logger.info("Circuit breaker: CLOSED (recovered)")

# Usage
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def call_external_api():
    return requests.get('https://api.example.com/data', timeout=5).json()

try:
    data = breaker.call(call_external_api)
    print(f"Data: {data}")
except Exception as e:
    print(f"Error: {e}")
    if breaker.state == CircuitState.OPEN:
        print("Service is down, using fallback data")

🔄 Bulkhead Pattern

Isolate failures by separating resources:

from concurrent.futures import ThreadPoolExecutor
import threading

class Bulkhead:
    """Isolate failures using separate thread pools"""

    def __init__(self, name, max_threads=5, queue_size=10):
        self.name = name
        self.executor = ThreadPoolExecutor(max_workers=max_threads)
        self.semaphore = threading.Semaphore(queue_size)

    def execute(self, func, *args, **kwargs):
        """Execute with bulkhead isolation"""

        if not self.semaphore.acquire(blocking=False):
            raise Exception(f"Bulkhead {self.name} queue full")

        try:
            future = self.executor.submit(func, *args, **kwargs)
            return future.result(timeout=10)
        finally:
            self.semaphore.release()

# Usage
payment_bulkhead = Bulkhead('payment', max_threads=10)
notification_bulkhead = Bulkhead('notification', max_threads=20)

def process_payment():
    # Payment processing code
    return {'status': 'success'}

def send_notification():
    # Notification code
    return {'status': 'sent'}

# Failures in payments don't affect notifications
try:
    payment_bulkhead.execute(process_payment)
    notification_bulkhead.execute(send_notification)
except Exception as e:
    print(f"Bulkhead error: {e}")

📊 Structured Logging and Tracing

import logging
import json
import uuid
from contextlib import contextmanager

class StructuredLogger:
    """Structured logging for debugging and monitoring"""

    def __init__(self, name):
        self.logger = logging.getLogger(name)
        self.request_id = None

    @contextmanager
    def request_context(self, request_id=None):
        """Context manager for request tracking"""
        self.request_id = request_id or str(uuid.uuid4())
        try:
            yield self.request_id
        finally:
            self.request_id = None

    def _add_context(self, data):
        """Add request context to log"""
        if self.request_id:
            data['request_id'] = self.request_id
        return data

    def info(self, message, **context):
        self.logger.info(
            json.dumps(self._add_context({'message': message, **context}))
        )

    def error(self, message, exception=None, **context):
        data = self._add_context({
            'message': message,
            'error': str(exception),
            **context
        })
        self.logger.error(json.dumps(data))

# Usage
logger = StructuredLogger('api_client')

with logger.request_context() as request_id:
    try:
        response = requests.get('https://api.example.com/data')
        logger.info('API call successful', status_code=response.status_code)

    except Exception as e:
        logger.error('API call failed', exception=e, endpoint='/data')

🎯 Fallback and Degradation Strategies

import requests

class APIClientWithFallback:
    """API client with multiple fallback strategies"""

    def __init__(self, primary_url, fallback_urls=None, cache=None):
        self.primary_url = primary_url
        self.fallback_urls = fallback_urls or []
        self.cache = cache or {}

    def get_user(self, user_id):
        """Get user with fallback strategies"""

        # Try primary
        try:
            response = requests.get(f'{self.primary_url}/users/{user_id}', timeout=3)
            if response.ok:
                data = response.json()
                self.cache[user_id] = data  # Cache for future
                return data, 'primary'
        except requests.RequestException:
            pass

        # Try fallback endpoints
        for fallback_url in self.fallback_urls:
            try:
                response = requests.get(f'{fallback_url}/users/{user_id}', timeout=3)
                if response.ok:
                    data = response.json()
                    self.cache[user_id] = data
                    return data, 'fallback'
            except requests.RequestException:
                pass

        # Use cached data if available
        if user_id in self.cache:
            return self.cache[user_id], 'cache'

        # Return degraded response
        return {
            'id': user_id,
            'name': 'Unknown',
            'email': None,
            'status': 'degraded'
        }, 'degraded'

# Usage
client = APIClientWithFallback(
    primary_url='https://api.primary.com',
    fallback_urls=[
        'https://api.backup1.com',
        'https://api.backup2.com'
    ]
)

data, source = client.get_user(123)
print(f"User data from {source}: {data}")

📈 Error Rate Monitoring

from collections import deque
from datetime import datetime, timedelta
import threading

class ErrorRateMonitor:
    """Monitor error rates and trigger alerts"""

    def __init__(self, window_size=100, error_threshold=0.5):
        self.requests = deque(maxlen=window_size)
        self.error_threshold = error_threshold
        self.lock = threading.Lock()

    def record_request(self, success):
        """Record request success/failure"""
        with self.lock:
            self.requests.append({'success': success, 'time': datetime.now()})

    def get_error_rate(self):
        """Calculate current error rate"""
        with self.lock:
            if not self.requests:
                return 0

            errors = sum(1 for r in self.requests if not r['success'])
            return errors / len(self.requests)

    def is_unhealthy(self):
        """Check if service is unhealthy"""
        return self.get_error_rate() > self.error_threshold

    def get_metrics(self):
        """Get detailed metrics"""
        with self.lock:
            if not self.requests:
                return {}

            errors = sum(1 for r in self.requests if not r['success'])

            return {
                'total_requests': len(self.requests),
                'errors': errors,
                'error_rate': errors / len(self.requests),
                'is_unhealthy': self.is_unhealthy()
            }

# Usage
monitor = ErrorRateMonitor(window_size=100, error_threshold=0.3)

for i in range(100):
    try:
        response = requests.get('https://api.example.com/data', timeout=5)
        monitor.record_request(response.ok)
    except:
        monitor.record_request(False)

metrics = monitor.get_metrics()
print(f"Error rate: {metrics['error_rate']:.1%}")

if monitor.is_unhealthy():
    print("⚠️ Service unhealthy - activating fallback")

🔍 Distributed Tracing

import requests
import uuid
import json
from datetime import datetime

class TracedAPIClient:
    """API client with distributed tracing"""

    def __init__(self, name):
        self.name = name
        self.trace_id = str(uuid.uuid4())
        self.spans = []

    def start_span(self, span_name):
        """Start a new span"""
        return {
            'name': span_name,
            'start_time': datetime.now().isoformat(),
            'trace_id': self.trace_id
        }

    def end_span(self, span, duration=None, **metadata):
        """End a span"""
        span['end_time'] = datetime.now().isoformat()
        span['duration_ms'] = duration
        span['metadata'] = metadata
        self.spans.append(span)

    def get(self, url, **kwargs):
        """GET with tracing"""
        span = self.start_span('GET')

        try:
            response = requests.get(url, **kwargs)
            self.end_span(
                span,
                duration=response.elapsed.total_seconds() * 1000,
                status_code=response.status_code
            )
            return response

        except Exception as e:
            self.end_span(span, error=str(e))
            raise

    def get_trace(self):
        """Get complete trace"""
        return {
            'trace_id': self.trace_id,
            'service': self.name,
            'spans': self.spans
        }

# Usage
tracer = TracedAPIClient('user_service')

try:
    tracer.get('https://api.example.com/users/1')
    tracer.get('https://api.example.com/users/2')
except:
    pass

# Export trace for analysis
trace = tracer.get_trace()
print(json.dumps(trace, indent=2, default=str))

✅ Key Takeaways

PatternPurposeImplementation
Circuit BreakerStop cascading failuresTrack failures, open on threshold
BulkheadIsolate failuresSeparate thread pools
FallbackProvide alternativeMultiple endpoints, cache, degraded mode
Structured LoggingDebug productionJSON logs with context
MonitoringDetect issues earlyTrack error rates, metrics
TracingTrack requestsTrace ID, spans, timing
Graceful DegradationKeep service aliveReturn partial/cached data

🔗 What's Next?

Learn advanced authentication and security patterns.

Next: Advanced API Authentication →


Ready for advanced challenges? Try advanced challenges


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn