
Python
Loop control goes beyond simple break and continue statements. Advanced control flow includes exception handling within loops, context managers, and sophisticated patterns for robust, maintainable code.
# Basic try-except in loops
data = [1, 2, 'three', 4, None, 6]
results = []
for item in data:
try:
result = 10 / int(item)
results.append(result)
except ValueError:
print(f"Skipping non-integer: {item}")
except ZeroDivisionError:
print(f"Division by zero at {item}")
except Exception as e:
print(f"Unexpected error: {e}")
# Using else clause (executes if no exception)
for item in data:
try:
result = 10 / int(item)
except (ValueError, ZeroDivisionError):
print(f"Error processing {item}")
else:
print(f"Successfully processed {item}: {result}")
# Using finally clause (always executes)
import tempfile
import os
files_created = []
for i in range(3):
temp_file = None
try:
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(f"Data {i}".encode())
files_created.append(temp_file.name)
except IOError as e:
print(f"Error writing file: {e}")
finally:
if temp_file:
temp_file.close()
print(f"Created {len(files_created)} files")import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_items(items, error_threshold=0.5):
"""
Process items with error tracking and logging.
Raises RuntimeError if error rate exceeds threshold.
"""
results = []
errors = []
successful = 0
for index, item in enumerate(items):
try:
# Simulate processing
if item < 0:
raise ValueError(f"Negative value: {item}")
if item == 0:
raise ZeroDivisionError("Zero encountered")
result = 100 / item
results.append(result)
successful += 1
except ValueError as e:
logger.warning(f"Item {index}: {e}")
errors.append((index, str(e)))
except ZeroDivisionError as e:
logger.error(f"Item {index}: {e}")
errors.append((index, str(e)))
except Exception as e:
logger.exception(f"Unexpected error at item {index}")
errors.append((index, str(e)))
# Check error rate
error_rate = len(errors) / len(items) if items else 0
if error_rate > error_threshold:
logger.critical(f"Error rate {error_rate:.1%} exceeds threshold {error_threshold:.1%}")
raise RuntimeError(f"Too many errors: {len(errors)}/{len(items)}")
logger.info(f"Processed {successful} items successfully, {len(errors)} errors")
return results, errors
# Usage
try:
results, errors = process_items([1, 2, -1, 0, 4], error_threshold=0.4)
except RuntimeError as e:
print(f"Processing failed: {e}")class ProcessingError(Exception):
"""Base exception for processing errors."""
def __init__(self, message, item_index, item_value):
super().__init__(message)
self.item_index = item_index
self.item_value = item_value
class ItemSkippedError(ProcessingError):
"""Item was skipped due to invalid format."""
pass
class ItemProcessedError(ProcessingError):
"""Item caused an error during processing."""
pass
def process_with_custom_exceptions(items):
"""
Process items with custom exception hierarchy.
"""
results = []
skip_count = 0
error_count = 0
for index, item in enumerate(items):
try:
# Validation
if not isinstance(item, (int, float)):
raise ItemSkippedError(
f"Invalid type: {type(item).__name__}",
index, item
)
if item < 0:
raise ItemProcessedError(
f"Negative value not allowed",
index, item
)
# Processing
result = item ** 2
results.append(result)
except ItemSkippedError as e:
print(f"Skipped item {e.item_index}: {e}")
skip_count += 1
except ItemProcessedError as e:
print(f"Error at item {e.item_index}: {e}")
error_count += 1
return results, skip_count, error_count
# Usage
results, skipped, errors = process_with_custom_exceptions([1, '2', -3, 4, 5.5])
print(f"Results: {results}, Skipped: {skipped}, Errors: {errors}")import json
from contextlib import contextmanager
# Basic context manager usage
files = ['data1.json', 'data2.json', 'data3.json']
for filename in files:
try:
with open(filename, 'r') as f:
data = json.load(f)
process_data(data)
except FileNotFoundError:
print(f"File not found: {filename}")
except json.JSONDecodeError:
print(f"Invalid JSON in {filename}")
except Exception as e:
print(f"Error processing {filename}: {e}")
def process_data(data):
"""Placeholder for data processing."""
pass
# Multiple context managers
import csv
import sqlite3
def process_csv_to_database(csv_file, db_file):
with open(csv_file, 'r') as csvfile, \
sqlite3.connect(db_file) as conn:
reader = csv.DictReader(csvfile)
cursor = conn.cursor()
for row in reader:
try:
cursor.execute(
"INSERT INTO data (name, value) VALUES (?, ?)",
(row['name'], row['value'])
)
except sqlite3.IntegrityError:
print(f"Duplicate entry: {row['name']}")
except Exception as e:
print(f"Error inserting {row}: {e}")
conn.rollback()
raise
conn.commit()from contextlib import contextmanager
import time
@contextmanager
def timer(label="Operation"):
"""Context manager for timing operations."""
start = time.time()
try:
print(f"{label} started...")
yield
finally:
elapsed = time.time() - start
print(f"{label} took {elapsed:.3f} seconds")
# Using timer in loops
for i in range(3):
with timer(f"Iteration {i}"):
time.sleep(0.1)
# Do some work
pass
@contextmanager
def error_handler(error_list, item_id):
"""Context manager for collecting errors."""
try:
yield
except Exception as e:
error_list.append({'item_id': item_id, 'error': str(e)})
print(f"Error in item {item_id}: {e}")
# Using error handler
errors = []
for i in range(5):
with error_handler(errors, i):
if i == 2:
raise ValueError("Test error")
print(f"Processing item {i}")
print(f"Total errors: {len(errors)}")class ResourcePool:
"""Context manager for managing a pool of resources."""
def __init__(self, resources, max_concurrent=2):
self.resources = resources
self.available = list(resources)
self.max_concurrent = max_concurrent
self.in_use = 0
def __enter__(self):
if not self.available:
raise RuntimeError("No available resources")
resource = self.available.pop()
self.in_use += 1
return resource
def __exit__(self, exc_type, exc_val, exc_tb):
self.in_use -= 1
@contextmanager
def acquire(self):
"""Context manager for acquiring a resource."""
if self.in_use >= self.max_concurrent:
raise RuntimeError(f"Max concurrent ({self.max_concurrent}) reached")
with self:
yield
# Using resource pool in loops
pool = ResourcePool(['resource1', 'resource2', 'resource3'], max_concurrent=2)
for i in range(5):
try:
with pool.acquire() as resource:
print(f"Task {i} using {resource}")
time.sleep(0.1)
except RuntimeError as e:
print(f"Task {i} failed: {e}")class Sentinel:
"""Marker object to signal end of iteration."""
def __init__(self, value):
self.value = value
def __eq__(self, other):
return isinstance(other, Sentinel) and self.value == other.value
def __repr__(self):
return f"Sentinel({self.value})"
def read_data_with_sentinel(source, sentinel=None):
"""
Read data until sentinel is encountered.
Common pattern in C and low-level programming.
"""
if sentinel is None:
sentinel = Sentinel("END")
data = []
for item in source:
if item == sentinel:
break
data.append(item)
return data
# Usage
input_data = [1, 2, 3, Sentinel("END"), 4, 5]
result = read_data_with_sentinel(input_data)
print(result) # [1, 2, 3]def loop_until(predicate, generator, max_iterations=None):
"""
Loop until predicate becomes True.
Useful for algorithms with complex termination conditions.
"""
iteration = 0
for value in generator:
if max_iterations and iteration >= max_iterations:
break
if predicate(value):
return value, iteration
iteration += 1
return None, iteration
# Usage: Find first number greater than 100
from itertools import count
result, iterations = loop_until(
lambda x: x > 100,
(x**2 for x in count(1)),
max_iterations=50
)
if result:
print(f"Found {result} after {iterations} iterations")
def loop_while(predicate, generator, max_iterations=None):
"""
Loop while predicate is True.
Complementary to loop_until.
"""
iteration = 0
for value in generator:
if max_iterations and iteration >= max_iterations:
break
if not predicate(value):
return iteration
iteration += 1
return iteration
# Usage: Count while less than 100
count_result = loop_while(
lambda x: x < 100,
(x**2 for x in count(1))
)
print(f"Stopped after {count_result} iterations")def safe_early_return(cleanup_fn):
"""Decorator to ensure cleanup on early return."""
def decorator(fn):
def wrapper(*args, **kwargs):
cleanup_registered = []
try:
# Inject cleanup registration
def register_cleanup(fn):
cleanup_registered.append(fn)
result = fn(*args, cleanup=register_cleanup, **kwargs)
return result
finally:
# Execute cleanup in reverse order
for cleanup in reversed(cleanup_registered):
try:
cleanup()
except Exception as e:
print(f"Cleanup error: {e}")
return wrapper
return decorator
@safe_early_return(None)
def process_with_cleanup(items, cleanup=None):
"""Process items with guaranteed cleanup."""
file = open('temp.txt', 'w')
cleanup(file.close)
resources = []
cleanup(lambda: print("Closing resources"))
for item in items:
if item == 'stop':
return results # Cleanup still runs!
results.append(process(item))
return results
def process(item):
return item.upper()from enum import Enum
class LoopState(Enum):
RUNNING = "running"
PAUSED = "paused"
STOPPED = "stopped"
ERROR = "error"
class LoopStateMachine:
"""Loop with state management."""
def __init__(self):
self.state = LoopState.RUNNING
self.iteration = 0
self.error_message = None
def process_iteration(self, item):
"""Process single iteration based on state."""
if self.state == LoopState.STOPPED:
return False
if self.state == LoopState.ERROR:
return False
if self.state == LoopState.PAUSED:
return True # Continue without processing
if self.state == LoopState.RUNNING:
try:
self._do_work(item)
self.iteration += 1
return True
except Exception as e:
self.state = LoopState.ERROR
self.error_message = str(e)
return False
return False
def _do_work(self, item):
"""Perform actual work."""
print(f"Processing {item}")
# Actual processing logic
pass
def pause(self):
"""Pause the loop."""
if self.state == LoopState.RUNNING:
self.state = LoopState.PAUSED
def resume(self):
"""Resume the loop."""
if self.state == LoopState.PAUSED:
self.state = LoopState.RUNNING
def stop(self):
"""Stop the loop."""
self.state = LoopState.STOPPED
# Usage
machine = LoopStateMachine()
items = list(range(10))
for item in items:
if machine.state == LoopState.STOPPED:
break
if machine.state == LoopState.ERROR:
print(f"Error occurred: {machine.error_message}")
break
machine.process_iteration(item)
# Pause every 3 iterations
if machine.iteration % 3 == 0:
machine.pause()
print("Paused")
machine.resume()
print("Resumed")
print(f"Final state: {machine.state}")
print(f"Completed iterations: {machine.iteration}")class LoopContext:
"""Encapsulates loop execution context."""
def __init__(self, items):
self.items = items
self.index = 0
self.current_item = None
self.results = []
self.errors = []
self.completed = False
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.items):
self.completed = True
raise StopIteration
self.current_item = self.items[self.index]
self.index += 1
return self.current_item
def record_result(self, result):
"""Record successful result."""
self.results.append({
'item': self.current_item,
'result': result,
'index': self.index - 1
})
def record_error(self, error):
"""Record error."""
self.errors.append({
'item': self.current_item,
'error': error,
'index': self.index - 1
})
def get_summary(self):
"""Get execution summary."""
return {
'total': len(self.items),
'completed': len(self.results),
'errors': len(self.errors),
'success_rate': len(self.results) / len(self.items) if self.items else 0
}
# Usage
ctx = LoopContext([1, 2, 3, 4, 5])
for item in ctx:
try:
result = item ** 2
ctx.record_result(result)
except Exception as e:
ctx.record_error(str(e))
print(ctx.get_summary())
# Output: {'total': 5, 'completed': 5, 'errors': 0, 'success_rate': 1.0}import signal
import sys
class InterruptibleLoop:
"""Loop that handles interrupts gracefully."""
def __init__(self, items):
self.items = items
self.interrupted = False
signal.signal(signal.SIGINT, self._handle_interrupt)
def _handle_interrupt(self, signum, frame):
"""Handle SIGINT (Ctrl+C)."""
print("\nInterrupt received, cleaning up...")
self.interrupted = True
def run(self):
"""Run loop with interrupt handling."""
processed = 0
for item in self.items:
if self.interrupted:
print("Loop interrupted by user")
break
try:
self.process_item(item)
processed += 1
except Exception as e:
print(f"Error processing {item}: {e}")
print(f"Processed {processed}/{len(self.items)} items")
return processed
def process_item(self, item):
"""Process single item."""
print(f"Processing {item}...")
time.sleep(0.5) # Simulate work| Pattern | Use Case | Complexity | Safety |
|---|---|---|---|
| Try-Except | Error handling | Low | Essential |
| Finally Clause | Resource cleanup | Low | Essential |
| Context Manager | Resource management | Medium | High |
| Custom Exceptions | Domain-specific errors | Medium | High |
| State Machine | Complex control flow | High | Very High |
| Sentinel Values | Loop termination | Low | Medium |
| Predicate Functions | Conditional loops | Low | Good |
Explore these advanced topics:
Master loop control and write bulletproof Python code!
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward