Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

๐ŸŸข Beginner๐Ÿ”ต Advanced
Exceptions OverviewException TypesTry-Except BlocksRaising ExceptionsCustom ExceptionsMultiple ExceptionsFinally & CleanupDebugging TechniquesLogging Best Practices
Python/Error Handling/Finally And Cleanup

๐Ÿงน Finally & Cleanup โ€” Advanced Resource Management

Master sophisticated resource management patterns for production systems.


๐ŸŽฏ Advanced Context Manager Protocol

# Comprehensive context manager implementation
class AdvancedContextManager:
    """Context manager with advanced features"""

    def __init__(self, name):
        self.name = name
        self.resource = None
        self.cleanup_tasks = []
        self.metrics = {'acquired_at': None, 'released_at': None}

    def __enter__(self):
        print(f"Acquiring {self.name}")
        self.metrics['acquired_at'] = time.time()
        self.resource = self._acquire_resource()
        return self

    def _acquire_resource(self):
        # Simulate resource acquisition
        return f"Resource:{self.name}"

    def defer_cleanup(self, cleanup_func):
        """Register cleanup function (LIFO execution)"""
        self.cleanup_tasks.append(cleanup_func)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.metrics['released_at'] = time.time()
        duration = self.metrics['released_at'] - self.metrics['acquired_at']

        # Execute cleanup tasks in reverse order
        exceptions = []
        for cleanup_task in reversed(self.cleanup_tasks):
            try:
                cleanup_task()
            except Exception as e:
                exceptions.append(e)

        try:
            self._release_resource()
        except Exception as e:
            exceptions.append(e)

        print(f"Released {self.name} (duration: {duration:.2f}s)")

        # Handle exceptions during cleanup
        if exceptions:
            if exc_val:
                # Already in exception context
                exc_val.cleanup_errors = exceptions
            else:
                # Cleanup errors without original exception
                if len(exceptions) == 1:
                    raise exceptions[0]
                else:
                    raise ExceptionGroup("Cleanup errors", exceptions)

        # Don't suppress original exception
        return False

    def _release_resource(self):
        if self.resource:
            print(f"Closing {self.resource}")

# Usage with deferred cleanup
with AdvancedContextManager('Database') as ctx:
    ctx.defer_cleanup(lambda: print("Cleanup 1"))
    ctx.defer_cleanup(lambda: print("Cleanup 2"))
    # Cleanup executes in reverse order: 2, then 1

๐Ÿ”„ ExitStack for Complex Cleanup

from contextlib import ExitStack

# Managing multiple resources with ExitStack
class ResourceManager:
    """Manage multiple resources with proper cleanup"""

    def __init__(self):
        self.exit_stack = ExitStack()

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        return self.exit_stack.__exit__(*exc_info)

    def open_file(self, filename, mode='r'):
        """Open file and register for cleanup"""
        file = open(filename, mode)
        self.exit_stack.callback(file.close)
        return file

    def get_database_connection(self, connection_string):
        """Get DB connection and register for cleanup"""
        conn = connect_to_db(connection_string)
        self.exit_stack.callback(conn.close)
        return conn

    def register_cleanup(self, cleanup_func, *args, **kwargs):
        """Register arbitrary cleanup function"""
        self.exit_stack.callback(cleanup_func, *args, **kwargs)

# Usage
with ResourceManager() as rm:
    file1 = rm.open_file('data1.txt')
    file2 = rm.open_file('data2.txt')
    conn = rm.get_database_connection('postgresql://...')

    # All resources closed automatically in reverse order
    data1 = file1.read()
    data2 = file2.read()
    result = conn.execute("SELECT * FROM users")

# ExitStack standalone usage
with ExitStack() as stack:
    # Register multiple exit callbacks
    stack.callback(cleanup1)
    stack.callback(cleanup2)
    stack.callback(cleanup3)

    # Perform operations
    do_work()
    # Cleanup happens in reverse order: 3, 2, 1

๐Ÿ›ก๏ธ Cleanup with Exception Safety

# Ensure cleanup even with exceptions
class SafeResourceManager:
    """Resource manager with exception safety"""

    def __init__(self, resource):
        self.resource = resource
        self.cleanup_count = 0

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Always cleanup, regardless of exception
        try:
            self._cleanup()
        except Exception as cleanup_exc:
            if exc_val:
                # Chain cleanup exception to original
                raise ExceptionGroup(
                    "Original and cleanup errors",
                    [exc_val, cleanup_exc]
                ) from None
            else:
                # No original exception, just raise cleanup error
                raise cleanup_exc

        return False  # Don't suppress original exception

    def _cleanup(self):
        """Cleanup with state tracking"""
        self.cleanup_count += 1
        if self.cleanup_count > 1:
            raise RuntimeError("Cleanup called multiple times")

        print(f"Cleaning up {self.resource}")

# Nested cleanup safety
def complex_operation_with_safety():
    """Multiple nested resources with safety"""
    try:
        with SafeResourceManager('Resource1') as rm1:
            try:
                with SafeResourceManager('Resource2') as rm2:
                    try:
                        with SafeResourceManager('Resource3') as rm3:
                            # If exception here, all cleanup happens
                            raise ValueError("Operation failed")
                    except SafeResourceException:
                        pass  # Cleanup still happens
            except SafeResourceException:
                pass
    except SafeResourceException:
        pass

๐Ÿ” Cleanup Monitoring and Metrics

# Track cleanup performance
class MonitoredContextManager:
    """Context manager that tracks metrics"""

    cleanup_stats = {
        'total_acquisitions': 0,
        'successful_cleanups': 0,
        'failed_cleanups': 0,
        'total_resource_time': 0,
        'cleanup_times': []
    }

    def __init__(self, name, resource_factory):
        self.name = name
        self.resource_factory = resource_factory
        self.resource = None
        self.acquire_time = None
        self.cleanup_start = None

    def __enter__(self):
        MonitoredContextManager.cleanup_stats['total_acquisitions'] += 1
        self.acquire_time = time.time()
        self.resource = self.resource_factory()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cleanup_start = time.time()

        try:
            self._cleanup_resource()
            MonitoredContextManager.cleanup_stats['successful_cleanups'] += 1
        except Exception as e:
            MonitoredContextManager.cleanup_stats['failed_cleanups'] += 1
            logger.error(f"Cleanup failed for {self.name}: {e}")

        # Record metrics
        cleanup_duration = time.time() - self.cleanup_start
        resource_duration = self.cleanup_start - self.acquire_time

        MonitoredContextManager.cleanup_stats['total_resource_time'] += resource_duration
        MonitoredContextManager.cleanup_stats['cleanup_times'].append(cleanup_duration)

        return False

    def _cleanup_resource(self):
        if self.resource:
            self.resource.close()

    @classmethod
    def get_stats(cls):
        """Get cleanup statistics"""
        return {
            **cls.cleanup_stats,
            'average_resource_time': (
                cls.cleanup_stats['total_resource_time'] /
                max(cls.cleanup_stats['total_acquisitions'], 1)
            ),
            'average_cleanup_time': (
                sum(cls.cleanup_stats['cleanup_times']) /
                max(len(cls.cleanup_stats['cleanup_times']), 1)
            )
        }

# Usage with monitoring
for i in range(10):
    with MonitoredContextManager(f'Resource{i}', get_resource):
        time.sleep(0.1)

print(MonitoredContextManager.get_stats())

๐ŸŽฏ Graceful Shutdown Patterns

# Coordinated shutdown of multiple resources
class GracefulShutdownManager:
    """Manages coordinated shutdown"""

    def __init__(self, timeout=30):
        self.timeout = timeout
        self.resources = []
        self.shutdown_started = False

    def register_resource(self, resource, cleanup_func):
        """Register resource for coordinated cleanup"""
        self.resources.append((resource, cleanup_func))

    def shutdown(self):
        """Perform graceful shutdown"""
        if self.shutdown_started:
            return

        self.shutdown_started = True
        logger.info(f"Starting graceful shutdown with {len(self.resources)} resources")

        # Stop accepting new work
        for resource, _ in self.resources:
            if hasattr(resource, 'stop_accepting_work'):
                resource.stop_accepting_work()

        # Wait for in-flight operations
        deadline = time.time() + self.timeout
        for resource, _ in self.resources:
            if hasattr(resource, 'wait_for_pending'):
                remaining_time = deadline - time.time()
                if remaining_time > 0:
                    try:
                        resource.wait_for_pending(timeout=remaining_time)
                    except TimeoutError:
                        logger.warning(f"Timeout waiting for {resource}")

        # Cleanup in reverse order
        for resource, cleanup_func in reversed(self.resources):
            try:
                logger.info(f"Cleaning up {resource}")
                cleanup_func()
            except Exception as e:
                logger.error(f"Error cleaning up {resource}: {e}")

        logger.info("Graceful shutdown complete")

# Usage
shutdown_mgr = GracefulShutdownManager(timeout=30)

web_server = start_web_server()
shutdown_mgr.register_resource(web_server, web_server.stop)

db_pool = create_connection_pool()
shutdown_mgr.register_resource(db_pool, db_pool.close)

# Register signal handler for graceful shutdown
import signal
signal.signal(signal.SIGTERM, lambda s, f: shutdown_mgr.shutdown())
signal.signal(signal.SIGINT, lambda s, f: shutdown_mgr.shutdown())

๐Ÿ”— Cleanup with Weakref for Memory Management

import weakref

# Use weakref for automatic cleanup when objects are garbage collected
class AutoCleanupResource:
    """Resource that cleans up automatically"""

    def __init__(self, name):
        self.name = name
        self.is_open = True

    def close(self):
        if self.is_open:
            print(f"Closing {self.name}")
            self.is_open = False

    def __del__(self):
        """Called when object is garbage collected"""
        if self.is_open:
            print(f"WARNING: {self.name} not explicitly closed")
            self.close()

# Weakref for cleanup registry
class CleanupRegistry:
    """Track resources for cleanup"""

    _resources = weakref.WeakSet()

    @classmethod
    def register(cls, resource):
        """Register resource for tracking"""
        cls._resources.add(resource)
        return resource

    @classmethod
    def cleanup_all(cls):
        """Cleanup all registered resources"""
        for resource in list(cls._resources):
            if hasattr(resource, 'close'):
                try:
                    resource.close()
                except Exception as e:
                    logger.error(f"Error closing {resource}: {e}")

# Usage
resource = CleanupRegistry.register(AutoCleanupResource("MyResource"))
# ... use resource ...
# When resource is no longer referenced, __del__ is called automatically

๐ŸŽฏ Key Takeaways

  • โœ… Use ExitStack for managing multiple resources
  • โœ… Implement deferred cleanup with LIFO ordering
  • โœ… Handle exceptions during cleanup properly
  • โœ… Chain cleanup exceptions to original exceptions
  • โœ… Monitor cleanup performance and metrics
  • โœ… Implement graceful shutdown patterns
  • โœ… Use weakref for automatic resource tracking


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep โ€ข Build Real โ€ข Verify Skills โ€ข Launch Forward

Courses

PythonFastapiReactJSCloud

ยฉ 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn