
Python
Master production-grade logging systems and integration patterns.
import logging
import json
from datetime import datetime
import socket
class StructuredLogFormatter(logging.Formatter):
"""Formatter for structured (JSON) logs"""
def format(self, record):
"""Format log as JSON"""
log_obj = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'process': record.process,
'thread': record.thread,
'hostname': socket.gethostname(),
}
# Add exception info if present
if record.exc_info:
log_obj['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': self.formatException(record.exc_info)
}
# Add custom fields
if hasattr(record, 'context'):
log_obj['context'] = record.context
return json.dumps(log_obj)
# Setup structured logging
def setup_structured_logging(log_file):
"""Setup structured logging to file"""
logger = logging.getLogger('app')
logger.setLevel(logging.DEBUG)
# File handler with JSON formatting
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(StructuredLogFormatter())
logger.addHandler(file_handler)
return logger
# Usage with context
class ContextFilter(logging.Filter):
"""Add context to log records"""
def __init__(self, context):
self.context = context
def filter(self, record):
record.context = self.context
return True
logger = setup_structured_logging('app.log')
# Add context to all logs
user_context = {'user_id': 123, 'session': 'abc123'}
logger.addFilter(ContextFilter(user_context))
logger.info("User logged in", extra={'context': user_context})# Centralized log aggregation
class LogAggregator:
"""Aggregate logs from multiple sources"""
def __init__(self):
self.logs = []
self.filters = []
self.processors = []
def add_filter(self, filter_func):
"""Add filter function"""
self.filters.append(filter_func)
return self
def add_processor(self, processor_func):
"""Add log processor"""
self.processors.append(processor_func)
return self
def log(self, level, message, **context):
"""Process log entry"""
log_entry = {
'timestamp': datetime.now(),
'level': level,
'message': message,
'context': context
}
# Apply filters
for filter_func in self.filters:
if not filter_func(log_entry):
return
# Apply processors
for processor_func in self.processors:
log_entry = processor_func(log_entry)
self.logs.append(log_entry)
def query(self, level=None, min_time=None, max_time=None):
"""Query logs"""
results = self.logs
if level:
results = [l for l in results if l['level'] == level]
if min_time:
results = [l for l in results if l['timestamp'] >= min_time]
if max_time:
results = [l for l in results if l['timestamp'] <= max_time]
return results
# Setup with filtering and processing
aggregator = LogAggregator()
# Filter: Only ERROR and above
aggregator.add_filter(lambda log: log['level'] in ('ERROR', 'CRITICAL'))
# Processor: Redact sensitive data
def redact_sensitive(log_entry):
context = log_entry['context']
if 'password' in context:
context['password'] = '***'
return log_entry
aggregator.add_processor(redact_sensitive)
# Usage
aggregator.log('INFO', 'User login', user_id=123, password='secret')
aggregator.log('ERROR', 'Connection failed', host='localhost')
# Query
errors = aggregator.query(level='ERROR')# Send logs to monitoring/alerting system
import requests
from enum import Enum
class AlertSeverity(Enum):
"""Severity levels for alerting"""
CRITICAL = 1
ERROR = 2
WARNING = 3
INFO = 4
class MonitoringIntegration:
"""Send alerts to external monitoring system"""
def __init__(self, webhook_url, severity_threshold=AlertSeverity.WARNING):
self.webhook_url = webhook_url
self.severity_threshold = severity_threshold
def should_alert(self, severity):
"""Check if should send alert"""
return severity.value <= self.severity_threshold.value
def send_alert(self, message, severity, context=None):
"""Send alert to monitoring system"""
if not self.should_alert(severity):
return
payload = {
'message': message,
'severity': severity.name,
'timestamp': datetime.utcnow().isoformat(),
'context': context or {}
}
try:
response = requests.post(
self.webhook_url,
json=payload,
timeout=5
)
response.raise_for_status()
except Exception as e:
# Don't let monitoring failures affect application
print(f"Failed to send alert: {e}")
class AlertingLogger(logging.Handler):
"""Handler that sends errors to monitoring"""
def __init__(self, monitoring):
super().__init__()
self.monitoring = monitoring
def emit(self, record):
"""Send log record to monitoring"""
severity_map = {
'CRITICAL': AlertSeverity.CRITICAL,
'ERROR': AlertSeverity.ERROR,
'WARNING': AlertSeverity.WARNING,
}
severity = severity_map.get(record.levelname)
if severity:
self.monitoring.send_alert(
message=record.getMessage(),
severity=severity,
context={
'logger': record.name,
'function': record.funcName,
'line': record.lineno
}
)
# Setup monitoring integration
monitoring = MonitoringIntegration('https://alerts.example.com/log')
alerting_handler = AlertingLogger(monitoring)
logger = logging.getLogger('app')
logger.addHandler(alerting_handler)
# Usage
logger.critical("Database connection failed", extra={'context': {'db': 'primary'}})# Maintain context across function calls
import contextvars
# Context variable for request tracking
request_context = contextvars.ContextVar('request_context', default=None)
class RequestContext:
"""Manage request context for logging"""
def __init__(self, request_id, user_id=None):
self.request_id = request_id
self.user_id = user_id
self.start_time = datetime.now()
self.operations = []
def log_operation(self, operation_name, **details):
"""Log operation within request"""
self.operations.append({
'operation': operation_name,
'details': details,
'timestamp': datetime.now()
})
class ContextualLoggingFormatter(logging.Formatter):
"""Add request context to logs"""
def format(self, record):
ctx = request_context.get()
if ctx:
record.request_id = ctx.request_id
record.user_id = ctx.user_id
return super().format(record)
# Usage
def handle_request(request_id, user_id):
"""Handle request with context"""
ctx = RequestContext(request_id, user_id)
token = request_context.set(ctx)
try:
logger = logging.getLogger('app')
logger.info("Request started", extra={
'request_id': request_id,
'user_id': user_id
})
# Operations
ctx.log_operation('database_query', query='SELECT * FROM users')
ctx.log_operation('cache_hit', key='users_list')
logger.info("Request completed")
finally:
request_context.reset(token)
print(f"Operations: {ctx.operations}")
# Simulate concurrent requests
import threading
for i in range(3):
thread = threading.Thread(
target=handle_request,
args=(f'REQ_{i}', f'USER_{i}')
)
thread.start()# Log performance metrics
import functools
import time
class PerformanceLogger:
"""Log performance metrics for functions"""
def __init__(self, logger, threshold_ms=100):
self.logger = logger
self.threshold_ms = threshold_ms
def log_performance(self, func):
"""Decorator to log function performance"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
exception = None
try:
result = func(*args, **kwargs)
return result
except Exception as e:
exception = e
raise
finally:
duration_ms = (time.perf_counter() - start_time) * 1000
# Log if exceeds threshold
if duration_ms > self.threshold_ms:
level = 'WARNING' if duration_ms > self.threshold_ms * 10 else 'INFO'
self.logger.log(
getattr(logging, level),
f"{func.__name__} took {duration_ms:.2f}ms",
extra={
'function': func.__name__,
'duration_ms': duration_ms,
'exception': type(exception).__name__ if exception else None
}
)
return wrapper
# Usage
logger = logging.getLogger('app')
perf_logger = PerformanceLogger(logger, threshold_ms=50)
@perf_logger.log_performance
def slow_operation():
time.sleep(0.2) # 200ms
@perf_logger.log_performance
def fast_operation():
pass # < 50ms
slow_operation() # Logs warning
fast_operation() # Doesn't log# Log securely without exposing secrets
class SecureLogger:
"""Logger that redacts sensitive information"""
SENSITIVE_PATTERNS = {
'password': r'password["\']?\s*:\s*["\']?[^"\'}\s]+',
'token': r'token["\']?\s*:\s*["\']?[^"\'}\s]+',
'api_key': r'api[_-]?key["\']?\s*:\s*["\']?[^"\'}\s]+',
'secret': r'secret["\']?\s*:\s*["\']?[^"\'}\s]+',
}
@staticmethod
def redact(message):
"""Redact sensitive information from message"""
import re
result = message
for key, pattern in SecureLogger.SENSITIVE_PATTERNS.items():
result = re.sub(
pattern,
f'{key}=***REDACTED***',
result,
flags=re.IGNORECASE
)
return result
@staticmethod
def safe_log(logger, level, message, **context):
"""Log with automatic redaction"""
redacted_message = SecureLogger.redact(message)
redacted_context = {
k: SecureLogger.redact(str(v))
for k, v in context.items()
}
logger.log(level, redacted_message, extra={'context': redacted_context})
# Usage
logger = logging.getLogger('app')
SecureLogger.safe_log(
logger,
logging.INFO,
"User login successful",
user_id=123,
token='secret_token_12345',
api_key='key_987654'
)
# Logs: token=***REDACTED***, api_key=***REDACTED***Completed: Advanced Error Handling | Back to Overview
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward