Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
Exceptions OverviewException TypesTry-Except BlocksRaising ExceptionsCustom ExceptionsMultiple ExceptionsFinally & CleanupDebugging TechniquesLogging Best Practices
Python/Error Handling/Multiple Exceptions

🔀 Multiple Exceptions — Advanced Patterns for Complex Systems

Master sophisticated patterns for handling multiple exceptions in distributed and complex systems.


🧬 Exception Aggregation and Batching

# Collect and report multiple errors
class AggregateException(Exception):
    """Contains multiple exceptions"""

    def __init__(self, message, exceptions=None):
        self.message = message
        self.exceptions = exceptions or []
        self.inner_exceptions = []
        super().__init__(message)

    def add(self, exception):
        """Add exception to aggregate"""
        self.inner_exceptions.append(exception)
        return self

    def throw_if_any(self):
        """Raise if any exceptions collected"""
        if self.inner_exceptions:
            self.exceptions = self.inner_exceptions
            raise self

    def __str__(self):
        msg = f"{self.message} ({len(self.exceptions)} errors)\n"
        for i, exc in enumerate(self.exceptions, 1):
            msg += f"  {i}. {type(exc).__name__}: {exc}\n"
        return msg

# Usage
def validate_batch_data(items):
    """Validate items, collecting all errors"""
    agg = AggregateException("Validation failed")

    for i, item in enumerate(items):
        try:
            validate_item(item)
        except ValidationError as e:
            e.item_index = i
            agg.add(e)

    agg.throw_if_any()

def process_multiple_files(filenames):
    """Process files, handling failures individually"""
    results = []
    failures = []

    for filename in filenames:
        try:
            result = process_file(filename)
            results.append(result)
        except FileNotFoundError as e:
            failures.append(('not_found', filename, e))
        except IOError as e:
            failures.append(('io_error', filename, e))
        except ValueError as e:
            failures.append(('parse_error', filename, e))

    if failures:
        exc = AggregateException("File processing failed")
        for error_type, filename, e in failures:
            exc.add(Exception(f"{filename} ({error_type}): {e}"))
        raise exc

    return results

🔄 Exception Chaining Strategies

# Complex exception chaining
class ChainedExceptionHandler:
    """Manages chains of related exceptions"""

    def __init__(self):
        self.chain = []
        self.root_cause = None

    def add_to_chain(self, exception):
        """Add exception to chain"""
        if not self.root_cause:
            self.root_cause = exception
        self.chain.append(exception)
        return self

    def raise_with_chain(self, final_exception):
        """Raise final exception with full chain"""
        for exc in self.chain:
            if exc != final_exception:
                final_exception = type(final_exception)(
                    f"{final_exception}: caused by {exc}"
                ) from exc
        raise final_exception

# Example: multi-layer operation with chaining
def layered_operation():
    handler = ChainedExceptionHandler()

    try:
        # Layer 1: API call
        api_response = call_api()
    except requests.ConnectionError as e:
        handler.add_to_chain(e)
        raise NetworkLayerError("Cannot reach API") from e

    try:
        # Layer 2: Parse response
        data = parse_response(api_response)
    except ValueError as e:
        handler.add_to_chain(e)
        raise ParsingError("Invalid response format") from e

    try:
        # Layer 3: Process data
        result = process_data(data)
    except ProcessingError as e:
        handler.add_to_chain(e)
        raise ApplicationError("Processing failed") from e

    return result

🛡️ Selective Exception Handling

# Handle exceptions based on criteria
class SelectiveExceptionHandler:
    """Handles exceptions based on predicates"""

    def __init__(self):
        self.handlers = []

    def when(self, predicate, handler):
        """Register handler for predicate"""
        self.handlers.append((predicate, handler))
        return self

    def handle(self, exception):
        """Handle exception with matching handler"""
        for predicate, handler in self.handlers:
            if predicate(exception):
                return handler(exception)
        raise exception

# Usage
handler = SelectiveExceptionHandler()
handler.when(
    lambda e: isinstance(e, ConnectionError),
    lambda e: retry_with_backoff()
).when(
    lambda e: isinstance(e, ValueError) and 'invalid' in str(e),
    lambda e: use_default_value()
).when(
    lambda e: isinstance(e, PermissionError),
    lambda e: log_and_exit()
)

try:
    operation()
except Exception as e:
    result = handler.handle(e)

# Pattern matching for exceptions
def handle_exception_by_pattern(exception):
    """Match exceptions and handle accordingly"""
    matches = {
        (ConnectionError, TimeoutError): 'retry',
        (ValueError, TypeError): 'invalid_input',
        (PermissionError, OSError): 'access_denied',
        (KeyError, IndexError): 'missing_data'
    }

    for exception_types, handler in matches.items():
        if isinstance(exception, exception_types):
            return handler

    return 'unknown_error'

🔍 Nested Error Contexts

# Track error context through nested calls
class ErrorContext:
    """Manages error context across function calls"""

    _stack = []

    def __init__(self, operation_name, **context):
        self.operation_name = operation_name
        self.context = context
        self.start_time = time.time()

    def __enter__(self):
        ErrorContext._stack.append(self)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        ErrorContext._stack.pop()
        if exc_type:
            self._handle_error(exc_type, exc_val)
        return False

    def _handle_error(self, exc_type, exc_val):
        """Handle error with context"""
        context_trace = self._build_context_trace()
        exc_val.context_trace = context_trace

    @staticmethod
    def _build_context_trace():
        """Build trace of all contexts in stack"""
        trace = []
        for ctx in ErrorContext._stack:
            trace.append({
                'operation': ctx.operation_name,
                'context': ctx.context,
                'duration': time.time() - ctx.start_time
            })
        return trace

# Usage
try:
    with ErrorContext('api_call', endpoint='users'):
        with ErrorContext('fetch_data'):
            with ErrorContext('parse_json'):
                raise ValueError("Invalid JSON")
except ValueError as e:
    print(f"Error: {e}")
    print(f"Context: {e.context_trace}")

📊 Error Recovery Strategies

# Sophisticated error recovery
class RecoveryStrategy:
    """Strategy pattern for error recovery"""

    def can_recover(self, exception):
        raise NotImplementedError

    def recover(self, exception):
        raise NotImplementedError

class RetryStrategy(RecoveryStrategy):
    def __init__(self, max_retries=3, backoff=2):
        self.max_retries = max_retries
        self.backoff = backoff

    def can_recover(self, exception):
        return isinstance(exception, (ConnectionError, TimeoutError))

    def recover(self, exception, operation, *args, **kwargs):
        for attempt in range(self.max_retries):
            try:
                time.sleep(self.backoff ** attempt)
                return operation(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise
                continue

class FallbackStrategy(RecoveryStrategy):
    def __init__(self, fallback_source):
        self.fallback_source = fallback_source

    def can_recover(self, exception):
        return isinstance(exception, DataUnavailable)

    def recover(self, exception):
        return self.fallback_source.get_data()

class CircuitBreakerStrategy(RecoveryStrategy):
    def __init__(self, threshold=5, timeout=60):
        self.threshold = threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'

    def can_recover(self, exception):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.timeout:
                self.state = 'HALF_OPEN'
                return True
            return False
        return True

    def recover(self, exception, operation, *args, **kwargs):
        if self.state == 'OPEN':
            raise CircuitBreakerOpen(f"Circuit is {self.state}")

        try:
            result = operation(*args, **kwargs)
            self.failure_count = 0
            self.state = 'CLOSED'
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.threshold:
                self.state = 'OPEN'
            raise

# Usage with multiple strategies
class ResilientOperation:
    def __init__(self, operation, strategies):
        self.operation = operation
        self.strategies = strategies

    def execute(self, *args, **kwargs):
        last_exception = None

        for strategy in self.strategies:
            try:
                if isinstance(strategy, RetryStrategy):
                    return strategy.recover(None, self.operation, *args, **kwargs)
                else:
                    return self.operation(*args, **kwargs)
            except Exception as e:
                last_exception = e
                if strategy.can_recover(e):
                    try:
                        return strategy.recover(e)
                    except Exception:
                        continue
                else:
                    raise

        raise last_exception

# Setup
strategies = [
    RetryStrategy(max_retries=3),
    CircuitBreakerStrategy(),
    FallbackStrategy(cache)
]

operation = ResilientOperation(fetch_from_api, strategies)
try:
    data = operation.execute('https://api.example.com/data')
except Exception as e:
    logger.error(f"All recovery strategies failed: {e}")

🎯 Exception Aggregation for Batch Operations

# Batch operation with partial failure
def batch_operation_with_recovery(items, processor):
    """Process items, recovering from individual failures"""
    results = {
        'success': [],
        'failed': [],
        'recovered': []
    }

    for i, item in enumerate(items):
        try:
            result = processor(item)
            results['success'].append((i, result))
        except RecoverableError as e:
            try:
                # Try recovery strategy
                recovered = recover_from_error(e, item)
                results['recovered'].append((i, recovered))
            except Exception as recovery_error:
                results['failed'].append((i, recovery_error))
        except FatalError as e:
            results['failed'].append((i, e))
            # Don't continue for fatal errors
            raise

    return results

# Batch operation result handling
def process_api_batch(user_ids):
    """Process users, handling partial failures"""
    results = batch_operation_with_recovery(
        user_ids,
        lambda uid: fetch_user_data(uid)
    )

    success_count = len(results['success'])
    failed_count = len(results['failed'])
    recovered_count = len(results['recovered'])

    logger.info(
        f"Batch processing: {success_count} succeeded, "
        f"{recovered_count} recovered, {failed_count} failed"
    )

    if failed_count > 0:
        failed_ids = [uid for uid, _ in results['failed']]
        logger.warning(f"Failed user IDs: {failed_ids}")

        if failed_count / len(user_ids) > 0.2:  # More than 20% failed
            raise BatchOperationFailed(
                f"Too many failures: {failed_count}/{len(user_ids)}"
            )

    return results

🎯 Key Takeaways

  • ✅ Aggregate exceptions for batch operations
  • ✅ Chain exceptions to preserve context
  • ✅ Use selective handling based on predicates
  • ✅ Track error context through nested calls
  • ✅ Implement recovery strategies
  • ✅ Handle partial failures gracefully
  • ✅ Log comprehensive error information


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn