
Python
Master sophisticated patterns for handling multiple exceptions in distributed and complex systems.
# Collect and report multiple errors
class AggregateException(Exception):
"""Contains multiple exceptions"""
def __init__(self, message, exceptions=None):
self.message = message
self.exceptions = exceptions or []
self.inner_exceptions = []
super().__init__(message)
def add(self, exception):
"""Add exception to aggregate"""
self.inner_exceptions.append(exception)
return self
def throw_if_any(self):
"""Raise if any exceptions collected"""
if self.inner_exceptions:
self.exceptions = self.inner_exceptions
raise self
def __str__(self):
msg = f"{self.message} ({len(self.exceptions)} errors)\n"
for i, exc in enumerate(self.exceptions, 1):
msg += f" {i}. {type(exc).__name__}: {exc}\n"
return msg
# Usage
def validate_batch_data(items):
"""Validate items, collecting all errors"""
agg = AggregateException("Validation failed")
for i, item in enumerate(items):
try:
validate_item(item)
except ValidationError as e:
e.item_index = i
agg.add(e)
agg.throw_if_any()
def process_multiple_files(filenames):
"""Process files, handling failures individually"""
results = []
failures = []
for filename in filenames:
try:
result = process_file(filename)
results.append(result)
except FileNotFoundError as e:
failures.append(('not_found', filename, e))
except IOError as e:
failures.append(('io_error', filename, e))
except ValueError as e:
failures.append(('parse_error', filename, e))
if failures:
exc = AggregateException("File processing failed")
for error_type, filename, e in failures:
exc.add(Exception(f"{filename} ({error_type}): {e}"))
raise exc
return results# Complex exception chaining
class ChainedExceptionHandler:
"""Manages chains of related exceptions"""
def __init__(self):
self.chain = []
self.root_cause = None
def add_to_chain(self, exception):
"""Add exception to chain"""
if not self.root_cause:
self.root_cause = exception
self.chain.append(exception)
return self
def raise_with_chain(self, final_exception):
"""Raise final exception with full chain"""
for exc in self.chain:
if exc != final_exception:
final_exception = type(final_exception)(
f"{final_exception}: caused by {exc}"
) from exc
raise final_exception
# Example: multi-layer operation with chaining
def layered_operation():
handler = ChainedExceptionHandler()
try:
# Layer 1: API call
api_response = call_api()
except requests.ConnectionError as e:
handler.add_to_chain(e)
raise NetworkLayerError("Cannot reach API") from e
try:
# Layer 2: Parse response
data = parse_response(api_response)
except ValueError as e:
handler.add_to_chain(e)
raise ParsingError("Invalid response format") from e
try:
# Layer 3: Process data
result = process_data(data)
except ProcessingError as e:
handler.add_to_chain(e)
raise ApplicationError("Processing failed") from e
return result# Handle exceptions based on criteria
class SelectiveExceptionHandler:
"""Handles exceptions based on predicates"""
def __init__(self):
self.handlers = []
def when(self, predicate, handler):
"""Register handler for predicate"""
self.handlers.append((predicate, handler))
return self
def handle(self, exception):
"""Handle exception with matching handler"""
for predicate, handler in self.handlers:
if predicate(exception):
return handler(exception)
raise exception
# Usage
handler = SelectiveExceptionHandler()
handler.when(
lambda e: isinstance(e, ConnectionError),
lambda e: retry_with_backoff()
).when(
lambda e: isinstance(e, ValueError) and 'invalid' in str(e),
lambda e: use_default_value()
).when(
lambda e: isinstance(e, PermissionError),
lambda e: log_and_exit()
)
try:
operation()
except Exception as e:
result = handler.handle(e)
# Pattern matching for exceptions
def handle_exception_by_pattern(exception):
"""Match exceptions and handle accordingly"""
matches = {
(ConnectionError, TimeoutError): 'retry',
(ValueError, TypeError): 'invalid_input',
(PermissionError, OSError): 'access_denied',
(KeyError, IndexError): 'missing_data'
}
for exception_types, handler in matches.items():
if isinstance(exception, exception_types):
return handler
return 'unknown_error'# Track error context through nested calls
class ErrorContext:
"""Manages error context across function calls"""
_stack = []
def __init__(self, operation_name, **context):
self.operation_name = operation_name
self.context = context
self.start_time = time.time()
def __enter__(self):
ErrorContext._stack.append(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
ErrorContext._stack.pop()
if exc_type:
self._handle_error(exc_type, exc_val)
return False
def _handle_error(self, exc_type, exc_val):
"""Handle error with context"""
context_trace = self._build_context_trace()
exc_val.context_trace = context_trace
@staticmethod
def _build_context_trace():
"""Build trace of all contexts in stack"""
trace = []
for ctx in ErrorContext._stack:
trace.append({
'operation': ctx.operation_name,
'context': ctx.context,
'duration': time.time() - ctx.start_time
})
return trace
# Usage
try:
with ErrorContext('api_call', endpoint='users'):
with ErrorContext('fetch_data'):
with ErrorContext('parse_json'):
raise ValueError("Invalid JSON")
except ValueError as e:
print(f"Error: {e}")
print(f"Context: {e.context_trace}")# Sophisticated error recovery
class RecoveryStrategy:
"""Strategy pattern for error recovery"""
def can_recover(self, exception):
raise NotImplementedError
def recover(self, exception):
raise NotImplementedError
class RetryStrategy(RecoveryStrategy):
def __init__(self, max_retries=3, backoff=2):
self.max_retries = max_retries
self.backoff = backoff
def can_recover(self, exception):
return isinstance(exception, (ConnectionError, TimeoutError))
def recover(self, exception, operation, *args, **kwargs):
for attempt in range(self.max_retries):
try:
time.sleep(self.backoff ** attempt)
return operation(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
continue
class FallbackStrategy(RecoveryStrategy):
def __init__(self, fallback_source):
self.fallback_source = fallback_source
def can_recover(self, exception):
return isinstance(exception, DataUnavailable)
def recover(self, exception):
return self.fallback_source.get_data()
class CircuitBreakerStrategy(RecoveryStrategy):
def __init__(self, threshold=5, timeout=60):
self.threshold = threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED'
def can_recover(self, exception):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
return True
return False
return True
def recover(self, exception, operation, *args, **kwargs):
if self.state == 'OPEN':
raise CircuitBreakerOpen(f"Circuit is {self.state}")
try:
result = operation(*args, **kwargs)
self.failure_count = 0
self.state = 'CLOSED'
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.threshold:
self.state = 'OPEN'
raise
# Usage with multiple strategies
class ResilientOperation:
def __init__(self, operation, strategies):
self.operation = operation
self.strategies = strategies
def execute(self, *args, **kwargs):
last_exception = None
for strategy in self.strategies:
try:
if isinstance(strategy, RetryStrategy):
return strategy.recover(None, self.operation, *args, **kwargs)
else:
return self.operation(*args, **kwargs)
except Exception as e:
last_exception = e
if strategy.can_recover(e):
try:
return strategy.recover(e)
except Exception:
continue
else:
raise
raise last_exception
# Setup
strategies = [
RetryStrategy(max_retries=3),
CircuitBreakerStrategy(),
FallbackStrategy(cache)
]
operation = ResilientOperation(fetch_from_api, strategies)
try:
data = operation.execute('https://api.example.com/data')
except Exception as e:
logger.error(f"All recovery strategies failed: {e}")# Batch operation with partial failure
def batch_operation_with_recovery(items, processor):
"""Process items, recovering from individual failures"""
results = {
'success': [],
'failed': [],
'recovered': []
}
for i, item in enumerate(items):
try:
result = processor(item)
results['success'].append((i, result))
except RecoverableError as e:
try:
# Try recovery strategy
recovered = recover_from_error(e, item)
results['recovered'].append((i, recovered))
except Exception as recovery_error:
results['failed'].append((i, recovery_error))
except FatalError as e:
results['failed'].append((i, e))
# Don't continue for fatal errors
raise
return results
# Batch operation result handling
def process_api_batch(user_ids):
"""Process users, handling partial failures"""
results = batch_operation_with_recovery(
user_ids,
lambda uid: fetch_user_data(uid)
)
success_count = len(results['success'])
failed_count = len(results['failed'])
recovered_count = len(results['recovered'])
logger.info(
f"Batch processing: {success_count} succeeded, "
f"{recovered_count} recovered, {failed_count} failed"
)
if failed_count > 0:
failed_ids = [uid for uid, _ in results['failed']]
logger.warning(f"Failed user IDs: {failed_ids}")
if failed_count / len(user_ids) > 0.2: # More than 20% failed
raise BatchOperationFailed(
f"Too many failures: {failed_count}/{len(user_ids)}"
)
return resultsResources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward