
Python
Master sophisticated resource management patterns for production systems.
# Comprehensive context manager implementation
class AdvancedContextManager:
"""Context manager with advanced features"""
def __init__(self, name):
self.name = name
self.resource = None
self.cleanup_tasks = []
self.metrics = {'acquired_at': None, 'released_at': None}
def __enter__(self):
print(f"Acquiring {self.name}")
self.metrics['acquired_at'] = time.time()
self.resource = self._acquire_resource()
return self
def _acquire_resource(self):
# Simulate resource acquisition
return f"Resource:{self.name}"
def defer_cleanup(self, cleanup_func):
"""Register cleanup function (LIFO execution)"""
self.cleanup_tasks.append(cleanup_func)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.metrics['released_at'] = time.time()
duration = self.metrics['released_at'] - self.metrics['acquired_at']
# Execute cleanup tasks in reverse order
exceptions = []
for cleanup_task in reversed(self.cleanup_tasks):
try:
cleanup_task()
except Exception as e:
exceptions.append(e)
try:
self._release_resource()
except Exception as e:
exceptions.append(e)
print(f"Released {self.name} (duration: {duration:.2f}s)")
# Handle exceptions during cleanup
if exceptions:
if exc_val:
# Already in exception context
exc_val.cleanup_errors = exceptions
else:
# Cleanup errors without original exception
if len(exceptions) == 1:
raise exceptions[0]
else:
raise ExceptionGroup("Cleanup errors", exceptions)
# Don't suppress original exception
return False
def _release_resource(self):
if self.resource:
print(f"Closing {self.resource}")
# Usage with deferred cleanup
with AdvancedContextManager('Database') as ctx:
ctx.defer_cleanup(lambda: print("Cleanup 1"))
ctx.defer_cleanup(lambda: print("Cleanup 2"))
# Cleanup executes in reverse order: 2, then 1from contextlib import ExitStack
# Managing multiple resources with ExitStack
class ResourceManager:
"""Manage multiple resources with proper cleanup"""
def __init__(self):
self.exit_stack = ExitStack()
def __enter__(self):
return self
def __exit__(self, *exc_info):
return self.exit_stack.__exit__(*exc_info)
def open_file(self, filename, mode='r'):
"""Open file and register for cleanup"""
file = open(filename, mode)
self.exit_stack.callback(file.close)
return file
def get_database_connection(self, connection_string):
"""Get DB connection and register for cleanup"""
conn = connect_to_db(connection_string)
self.exit_stack.callback(conn.close)
return conn
def register_cleanup(self, cleanup_func, *args, **kwargs):
"""Register arbitrary cleanup function"""
self.exit_stack.callback(cleanup_func, *args, **kwargs)
# Usage
with ResourceManager() as rm:
file1 = rm.open_file('data1.txt')
file2 = rm.open_file('data2.txt')
conn = rm.get_database_connection('postgresql://...')
# All resources closed automatically in reverse order
data1 = file1.read()
data2 = file2.read()
result = conn.execute("SELECT * FROM users")
# ExitStack standalone usage
with ExitStack() as stack:
# Register multiple exit callbacks
stack.callback(cleanup1)
stack.callback(cleanup2)
stack.callback(cleanup3)
# Perform operations
do_work()
# Cleanup happens in reverse order: 3, 2, 1# Ensure cleanup even with exceptions
class SafeResourceManager:
"""Resource manager with exception safety"""
def __init__(self, resource):
self.resource = resource
self.cleanup_count = 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Always cleanup, regardless of exception
try:
self._cleanup()
except Exception as cleanup_exc:
if exc_val:
# Chain cleanup exception to original
raise ExceptionGroup(
"Original and cleanup errors",
[exc_val, cleanup_exc]
) from None
else:
# No original exception, just raise cleanup error
raise cleanup_exc
return False # Don't suppress original exception
def _cleanup(self):
"""Cleanup with state tracking"""
self.cleanup_count += 1
if self.cleanup_count > 1:
raise RuntimeError("Cleanup called multiple times")
print(f"Cleaning up {self.resource}")
# Nested cleanup safety
def complex_operation_with_safety():
"""Multiple nested resources with safety"""
try:
with SafeResourceManager('Resource1') as rm1:
try:
with SafeResourceManager('Resource2') as rm2:
try:
with SafeResourceManager('Resource3') as rm3:
# If exception here, all cleanup happens
raise ValueError("Operation failed")
except SafeResourceException:
pass # Cleanup still happens
except SafeResourceException:
pass
except SafeResourceException:
pass# Track cleanup performance
class MonitoredContextManager:
"""Context manager that tracks metrics"""
cleanup_stats = {
'total_acquisitions': 0,
'successful_cleanups': 0,
'failed_cleanups': 0,
'total_resource_time': 0,
'cleanup_times': []
}
def __init__(self, name, resource_factory):
self.name = name
self.resource_factory = resource_factory
self.resource = None
self.acquire_time = None
self.cleanup_start = None
def __enter__(self):
MonitoredContextManager.cleanup_stats['total_acquisitions'] += 1
self.acquire_time = time.time()
self.resource = self.resource_factory()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup_start = time.time()
try:
self._cleanup_resource()
MonitoredContextManager.cleanup_stats['successful_cleanups'] += 1
except Exception as e:
MonitoredContextManager.cleanup_stats['failed_cleanups'] += 1
logger.error(f"Cleanup failed for {self.name}: {e}")
# Record metrics
cleanup_duration = time.time() - self.cleanup_start
resource_duration = self.cleanup_start - self.acquire_time
MonitoredContextManager.cleanup_stats['total_resource_time'] += resource_duration
MonitoredContextManager.cleanup_stats['cleanup_times'].append(cleanup_duration)
return False
def _cleanup_resource(self):
if self.resource:
self.resource.close()
@classmethod
def get_stats(cls):
"""Get cleanup statistics"""
return {
**cls.cleanup_stats,
'average_resource_time': (
cls.cleanup_stats['total_resource_time'] /
max(cls.cleanup_stats['total_acquisitions'], 1)
),
'average_cleanup_time': (
sum(cls.cleanup_stats['cleanup_times']) /
max(len(cls.cleanup_stats['cleanup_times']), 1)
)
}
# Usage with monitoring
for i in range(10):
with MonitoredContextManager(f'Resource{i}', get_resource):
time.sleep(0.1)
print(MonitoredContextManager.get_stats())# Coordinated shutdown of multiple resources
class GracefulShutdownManager:
"""Manages coordinated shutdown"""
def __init__(self, timeout=30):
self.timeout = timeout
self.resources = []
self.shutdown_started = False
def register_resource(self, resource, cleanup_func):
"""Register resource for coordinated cleanup"""
self.resources.append((resource, cleanup_func))
def shutdown(self):
"""Perform graceful shutdown"""
if self.shutdown_started:
return
self.shutdown_started = True
logger.info(f"Starting graceful shutdown with {len(self.resources)} resources")
# Stop accepting new work
for resource, _ in self.resources:
if hasattr(resource, 'stop_accepting_work'):
resource.stop_accepting_work()
# Wait for in-flight operations
deadline = time.time() + self.timeout
for resource, _ in self.resources:
if hasattr(resource, 'wait_for_pending'):
remaining_time = deadline - time.time()
if remaining_time > 0:
try:
resource.wait_for_pending(timeout=remaining_time)
except TimeoutError:
logger.warning(f"Timeout waiting for {resource}")
# Cleanup in reverse order
for resource, cleanup_func in reversed(self.resources):
try:
logger.info(f"Cleaning up {resource}")
cleanup_func()
except Exception as e:
logger.error(f"Error cleaning up {resource}: {e}")
logger.info("Graceful shutdown complete")
# Usage
shutdown_mgr = GracefulShutdownManager(timeout=30)
web_server = start_web_server()
shutdown_mgr.register_resource(web_server, web_server.stop)
db_pool = create_connection_pool()
shutdown_mgr.register_resource(db_pool, db_pool.close)
# Register signal handler for graceful shutdown
import signal
signal.signal(signal.SIGTERM, lambda s, f: shutdown_mgr.shutdown())
signal.signal(signal.SIGINT, lambda s, f: shutdown_mgr.shutdown())import weakref
# Use weakref for automatic cleanup when objects are garbage collected
class AutoCleanupResource:
"""Resource that cleans up automatically"""
def __init__(self, name):
self.name = name
self.is_open = True
def close(self):
if self.is_open:
print(f"Closing {self.name}")
self.is_open = False
def __del__(self):
"""Called when object is garbage collected"""
if self.is_open:
print(f"WARNING: {self.name} not explicitly closed")
self.close()
# Weakref for cleanup registry
class CleanupRegistry:
"""Track resources for cleanup"""
_resources = weakref.WeakSet()
@classmethod
def register(cls, resource):
"""Register resource for tracking"""
cls._resources.add(resource)
return resource
@classmethod
def cleanup_all(cls):
"""Cleanup all registered resources"""
for resource in list(cls._resources):
if hasattr(resource, 'close'):
try:
resource.close()
except Exception as e:
logger.error(f"Error closing {resource}: {e}")
# Usage
resource = CleanupRegistry.register(AutoCleanupResource("MyResource"))
# ... use resource ...
# When resource is no longer referenced, __del__ is called automaticallyResources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep โข Build Real โข Verify Skills โข Launch Forward