
Python
Production APIs require sophisticated error handling. Circuit breakers, graceful degradation, comprehensive monitoring, and intelligent recovery strategies separate robust systems from fragile ones.
Prevent cascading failures by stopping requests to failing services:
import requests
from enum import Enum
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = 'closed' # Normal operation
OPEN = 'open' # Failing, reject requests
HALF_OPEN = 'half_open' # Testing if recovered
class CircuitBreaker:
"""Circuit breaker for external API calls"""
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=requests.RequestException):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.success_count = 0
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
logger.info("Circuit breaker: attempting reset")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self):
"""Check if enough time passed to retry"""
return (
self.last_failure_time and
datetime.now() >= self.last_failure_time + timedelta(seconds=self.recovery_timeout)
)
def _on_failure(self):
"""Handle failure"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.warning(f"Circuit breaker: OPEN (failures: {self.failure_count})")
def _on_success(self):
"""Handle success"""
self.success_count += 1
if self.success_count >= 2: # Confirm recovery
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
logger.info("Circuit breaker: CLOSED (recovered)")
# Usage
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def call_external_api():
return requests.get('https://api.example.com/data', timeout=5).json()
try:
data = breaker.call(call_external_api)
print(f"Data: {data}")
except Exception as e:
print(f"Error: {e}")
if breaker.state == CircuitState.OPEN:
print("Service is down, using fallback data")Isolate failures by separating resources:
from concurrent.futures import ThreadPoolExecutor
import threading
class Bulkhead:
"""Isolate failures using separate thread pools"""
def __init__(self, name, max_threads=5, queue_size=10):
self.name = name
self.executor = ThreadPoolExecutor(max_workers=max_threads)
self.semaphore = threading.Semaphore(queue_size)
def execute(self, func, *args, **kwargs):
"""Execute with bulkhead isolation"""
if not self.semaphore.acquire(blocking=False):
raise Exception(f"Bulkhead {self.name} queue full")
try:
future = self.executor.submit(func, *args, **kwargs)
return future.result(timeout=10)
finally:
self.semaphore.release()
# Usage
payment_bulkhead = Bulkhead('payment', max_threads=10)
notification_bulkhead = Bulkhead('notification', max_threads=20)
def process_payment():
# Payment processing code
return {'status': 'success'}
def send_notification():
# Notification code
return {'status': 'sent'}
# Failures in payments don't affect notifications
try:
payment_bulkhead.execute(process_payment)
notification_bulkhead.execute(send_notification)
except Exception as e:
print(f"Bulkhead error: {e}")import logging
import json
import uuid
from contextlib import contextmanager
class StructuredLogger:
"""Structured logging for debugging and monitoring"""
def __init__(self, name):
self.logger = logging.getLogger(name)
self.request_id = None
@contextmanager
def request_context(self, request_id=None):
"""Context manager for request tracking"""
self.request_id = request_id or str(uuid.uuid4())
try:
yield self.request_id
finally:
self.request_id = None
def _add_context(self, data):
"""Add request context to log"""
if self.request_id:
data['request_id'] = self.request_id
return data
def info(self, message, **context):
self.logger.info(
json.dumps(self._add_context({'message': message, **context}))
)
def error(self, message, exception=None, **context):
data = self._add_context({
'message': message,
'error': str(exception),
**context
})
self.logger.error(json.dumps(data))
# Usage
logger = StructuredLogger('api_client')
with logger.request_context() as request_id:
try:
response = requests.get('https://api.example.com/data')
logger.info('API call successful', status_code=response.status_code)
except Exception as e:
logger.error('API call failed', exception=e, endpoint='/data')import requests
class APIClientWithFallback:
"""API client with multiple fallback strategies"""
def __init__(self, primary_url, fallback_urls=None, cache=None):
self.primary_url = primary_url
self.fallback_urls = fallback_urls or []
self.cache = cache or {}
def get_user(self, user_id):
"""Get user with fallback strategies"""
# Try primary
try:
response = requests.get(f'{self.primary_url}/users/{user_id}', timeout=3)
if response.ok:
data = response.json()
self.cache[user_id] = data # Cache for future
return data, 'primary'
except requests.RequestException:
pass
# Try fallback endpoints
for fallback_url in self.fallback_urls:
try:
response = requests.get(f'{fallback_url}/users/{user_id}', timeout=3)
if response.ok:
data = response.json()
self.cache[user_id] = data
return data, 'fallback'
except requests.RequestException:
pass
# Use cached data if available
if user_id in self.cache:
return self.cache[user_id], 'cache'
# Return degraded response
return {
'id': user_id,
'name': 'Unknown',
'email': None,
'status': 'degraded'
}, 'degraded'
# Usage
client = APIClientWithFallback(
primary_url='https://api.primary.com',
fallback_urls=[
'https://api.backup1.com',
'https://api.backup2.com'
]
)
data, source = client.get_user(123)
print(f"User data from {source}: {data}")from collections import deque
from datetime import datetime, timedelta
import threading
class ErrorRateMonitor:
"""Monitor error rates and trigger alerts"""
def __init__(self, window_size=100, error_threshold=0.5):
self.requests = deque(maxlen=window_size)
self.error_threshold = error_threshold
self.lock = threading.Lock()
def record_request(self, success):
"""Record request success/failure"""
with self.lock:
self.requests.append({'success': success, 'time': datetime.now()})
def get_error_rate(self):
"""Calculate current error rate"""
with self.lock:
if not self.requests:
return 0
errors = sum(1 for r in self.requests if not r['success'])
return errors / len(self.requests)
def is_unhealthy(self):
"""Check if service is unhealthy"""
return self.get_error_rate() > self.error_threshold
def get_metrics(self):
"""Get detailed metrics"""
with self.lock:
if not self.requests:
return {}
errors = sum(1 for r in self.requests if not r['success'])
return {
'total_requests': len(self.requests),
'errors': errors,
'error_rate': errors / len(self.requests),
'is_unhealthy': self.is_unhealthy()
}
# Usage
monitor = ErrorRateMonitor(window_size=100, error_threshold=0.3)
for i in range(100):
try:
response = requests.get('https://api.example.com/data', timeout=5)
monitor.record_request(response.ok)
except:
monitor.record_request(False)
metrics = monitor.get_metrics()
print(f"Error rate: {metrics['error_rate']:.1%}")
if monitor.is_unhealthy():
print("⚠️ Service unhealthy - activating fallback")import requests
import uuid
import json
from datetime import datetime
class TracedAPIClient:
"""API client with distributed tracing"""
def __init__(self, name):
self.name = name
self.trace_id = str(uuid.uuid4())
self.spans = []
def start_span(self, span_name):
"""Start a new span"""
return {
'name': span_name,
'start_time': datetime.now().isoformat(),
'trace_id': self.trace_id
}
def end_span(self, span, duration=None, **metadata):
"""End a span"""
span['end_time'] = datetime.now().isoformat()
span['duration_ms'] = duration
span['metadata'] = metadata
self.spans.append(span)
def get(self, url, **kwargs):
"""GET with tracing"""
span = self.start_span('GET')
try:
response = requests.get(url, **kwargs)
self.end_span(
span,
duration=response.elapsed.total_seconds() * 1000,
status_code=response.status_code
)
return response
except Exception as e:
self.end_span(span, error=str(e))
raise
def get_trace(self):
"""Get complete trace"""
return {
'trace_id': self.trace_id,
'service': self.name,
'spans': self.spans
}
# Usage
tracer = TracedAPIClient('user_service')
try:
tracer.get('https://api.example.com/users/1')
tracer.get('https://api.example.com/users/2')
except:
pass
# Export trace for analysis
trace = tracer.get_trace()
print(json.dumps(trace, indent=2, default=str))| Pattern | Purpose | Implementation |
|---|---|---|
| Circuit Breaker | Stop cascading failures | Track failures, open on threshold |
| Bulkhead | Isolate failures | Separate thread pools |
| Fallback | Provide alternative | Multiple endpoints, cache, degraded mode |
| Structured Logging | Debug production | JSON logs with context |
| Monitoring | Detect issues early | Track error rates, metrics |
| Tracing | Track requests | Trace ID, spans, timing |
| Graceful Degradation | Keep service alive | Return partial/cached data |
Learn advanced authentication and security patterns.
Next: Advanced API Authentication →
Ready for advanced challenges? Try advanced challenges
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward