
Python
Master sophisticated exception handling patterns for production systems.
Understanding and implementing the context manager protocol:
# The context manager protocol
class ManagedResource:
def __init__(self, name):
self.name = name
def __enter__(self):
"""Called when entering 'with' block"""
print(f"Acquiring {self.name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Called when exiting 'with' block"""
print(f"Releasing {self.name}")
# Return True to suppress exception
if exc_type is ValueError:
print(f"Suppressing ValueError: {exc_val}")
return True
return False
# Usage
try:
with ManagedResource("Database") as resource:
print("Using resource")
raise ValueError("Oops")
# ValueError is suppressed!
except ValueError:
print("This won't print")
print("Program continues")
# Using contextlib
from contextlib import contextmanager
@contextmanager
def timed_operation(name):
"""Context manager using decorator"""
import time
start = time.time()
print(f"Starting {name}")
try:
yield
finally:
elapsed = time.time() - start
print(f"Completed {name} in {elapsed:.2f}s")
with timed_operation("Database Query"):
# Simulate work
time.sleep(1)# Decorator for automatic error handling
import functools
import logging
def handle_errors(logger=None):
"""Decorator for automatic exception handling"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if logger:
logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
raise
return wrapper
return decorator
@handle_errors(logger=logging.getLogger(__name__))
def risky_operation():
raise ValueError("Oops")
# Decorator for retry logic
def retry(max_attempts=3, backoff=2):
"""Decorator for automatic retry with exponential backoff"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
wait_time = backoff ** attempt
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time}s")
time.sleep(wait_time)
return wrapper
return decorator
@retry(max_attempts=3)
def unstable_api_call():
import random
if random.random() < 0.7:
raise ConnectionError("API unavailable")
return "Success"
# Decorator for timeout handling
import signal
class TimeoutError(Exception):
pass
def timeout(seconds):
"""Decorator for timeout handling"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
def timeout_handler(signum, frame):
raise TimeoutError(f"Function {func.__name__} timed out")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
return func(*args, **kwargs)
finally:
signal.alarm(0)
return wrapper
return decorator
# Usage
@timeout(5)
def long_running_task():
time.sleep(10)
try:
long_running_task()
except TimeoutError as e:
print(f"Task timed out: {e}")# Pattern 1: Fallback chain
def get_config_value(key):
"""Try multiple sources for configuration"""
strategies = [
lambda: os.environ[key], # Try environment
lambda: config_file.get(key), # Try file
lambda: defaults.get(key), # Try defaults
]
for strategy in strategies:
try:
return strategy()
except (KeyError, FileNotFoundError):
continue
raise ConfigError(f"Cannot find configuration for {key}")
# Pattern 2: Partial failure tolerance
def batch_process_with_tolerance(items, error_threshold=0.1):
"""Process batch, failing if too many errors"""
successes = []
failures = []
for item in items:
try:
result = process_item(item)
successes.append(result)
except Exception as e:
failures.append((item, e))
failure_rate = len(failures) / len(items)
if failure_rate > error_threshold:
raise BatchProcessError(f"Too many failures: {failure_rate:.1%}")
return successes, failures
# Pattern 3: Graceful degradation
class DataService:
def __init__(self, primary_db, cache, fallback_db):
self.primary = primary_db
self.cache = cache
self.fallback = fallback_db
def get_user(self, user_id):
"""Try sources in order of preference"""
# Try primary
try:
return self.primary.get(user_id)
except PrimaryDBError:
pass
# Try cache
try:
return self.cache.get(user_id)
except CacheError:
pass
# Try fallback
try:
return self.fallback.get(user_id)
except FallbackDBError:
pass
# All failed
raise DataUnavailable(f"Cannot retrieve user {user_id}")
# Pattern 4: Compensating transactions
def transfer_funds(from_account, to_account, amount):
"""Transfer with automatic rollback on failure"""
try:
# Step 1: Debit from_account
from_account.debit(amount)
try:
# Step 2: Credit to_account
to_account.credit(amount)
except CreditError as e:
# Compensating transaction: reverse debit
from_account.credit(amount)
raise CompensationError(f"Credit failed, reverting debit: {e}") from e
except DebitError as e:
raise TransferError(f"Cannot debit from account: {e}") from e# Composite exceptions containing multiple errors
class CompositeError(Exception):
"""Exception containing multiple sub-errors"""
def __init__(self, message, errors=None):
self.message = message
self.errors = errors or []
super().__init__(message)
def add_error(self, error):
self.errors.append(error)
return self
def __str__(self):
error_msgs = '\n '.join(str(e) for e in self.errors)
return f"{self.message}\n {error_msgs}"
def validate_user_data(data):
"""Validate and collect all errors"""
errors = []
if not data.get('email'):
errors.append(ValidationError('email', 'required'))
elif '@' not in data['email']:
errors.append(ValidationError('email', 'invalid format'))
if not data.get('age'):
errors.append(ValidationError('age', 'required'))
elif not isinstance(data['age'], int):
errors.append(ValidationError('age', 'must be integer'))
if errors:
raise CompositeError("Validation failed", errors)
return True
# Usage
try:
validate_user_data({'email': 'invalid', 'age': 'not a number'})
except CompositeError as e:
print(e)
for error in e.errors:
print(f" - {error}")# Complex chaining scenarios
class LayeredError(Exception):
"""Exception with layered information"""
def __init__(self, layer, message, original_error=None):
self.layer = layer
self.message = message
self.original_error = original_error
super().__init__(message)
def api_layer(endpoint):
try:
make_http_request(endpoint)
except requests.RequestException as e:
raise LayeredError('api', f'Request failed to {endpoint}', e) from e
def service_layer(endpoint):
try:
api_layer(endpoint)
except LayeredError as e:
raise LayeredError('service', f'Service unavailable: {e.message}', e) from e
def application_layer(endpoint):
try:
service_layer(endpoint)
except LayeredError as e:
raise LayeredError('app', f'Operation failed', e) from e
# Usage with traceback
try:
application_layer('https://api.example.com/data')
except LayeredError as e:
print(f"Layer: {e.layer}")
print(f"Message: {e.message}")
if e.original_error:
print(f"Original: {e.original_error}")
if e.__cause__:
print(f"Cause: {e.__cause__}")# EAFP vs LBYL performance comparison
import timeit
def test_eafp():
"""Easier to Ask for Forgiveness than Permission"""
try:
value = int("123")
return value
except ValueError:
return 0
def test_lbyl():
"""Look Before You Leap"""
value_str = "123"
if value_str.isdigit():
return int(value_str)
return 0
# EAFP is faster when success is likely
print(f"EAFP: {timeit.timeit(test_eafp, number=100000)}")
print(f"LBYL: {timeit.timeit(test_lbyl, number=100000)}")
# Example: dictionary access
def eafp_dict(d, key):
try:
return d[key]
except KeyError:
return None
def lbyl_dict(d, key):
if key in d:
return d[key]
return None
# EAFP is faster (single lookup vs double)
d = {f'key_{i}': i for i in range(1000)}
print(f"EAFP dict: {timeit.timeit(lambda: eafp_dict(d, 'key_500'), number=10000)}")
print(f"LBYL dict: {timeit.timeit(lambda: lbyl_dict(d, 'key_500'), number=10000)}")Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward