
Python
Debugging and profiling loops is crucial for optimizing performance. This section covers profiling tools, memory analysis, and bottleneck detection techniques.
import cProfile
import pstats
import io
def fibonacci(n):
"""Inefficient recursive fibonacci."""
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
def profile_function():
"""Profile a function using cProfile."""
pr = cProfile.Profile()
pr.enable()
# Code to profile
result = fibonacci(30)
pr.disable()
# Print statistics
ps = pstats.Stats(pr)
ps.sort_stats('cumulative')
ps.print_stats(10)
return result
# Run profiling
# profile_function()
# More detailed analysis
def detailed_profile():
"""Capture profiling output."""
pr = cProfile.Profile()
pr.enable()
result = fibonacci(28)
pr.disable()
# Capture output
s = io.StringIO()
ps = pstats.Stats(pr, stream=s)
ps.sort_stats('cumulative')
ps.print_stats()
output = s.getvalue()
print(output[:500]) # Print first 500 chars
# detailed_profile()# Install: pip install line_profiler
def slow_loop():
"""Function with performance bottlenecks."""
results = []
# This is slow
for i in range(1000):
for j in range(1000):
results.append(i * j)
return len(results)
# Usage with line_profiler:
# kernprof -l -v script.py
# Simulated line profiling
def simulate_line_profile():
"""Simulate line profiling output."""
import timeit
# Time individual operations
setup = "
"
test_cases = {
'List append': "results.append(42)",
'List extend': "results.extend([42])",
'Dictionary update': "d.update({'key': 'value'})",
'String concatenation': "s += 'test'",
}
for operation, code in test_cases.items():
time_taken = timeit.timeit(code, setup=f"results = []; d = {{}}; s = ''", number=100000)
print(f"{operation}: {time_taken:.6f}s")# Install: pip install memory-profiler
def analyze_memory_usage():
"""Analyze memory usage of loop operations."""
import tracemalloc
# Start tracing
tracemalloc.start()
# Create large list (memory intensive)
large_list = list(range(1000000))
# Get memory snapshot
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory: {current / 1024 / 1024:.2f} MB")
print(f"Peak memory: {peak / 1024 / 1024:.2f} MB")
# Detailed memory trace
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("\nTop 3 memory consumers:")
for stat in top_stats[:3]:
print(stat)
tracemalloc.stop()
# analyze_memory_usage()
def memory_efficient_loop():
"""Compare memory usage of different approaches."""
import tracemalloc
# Approach 1: Load everything into memory
tracemalloc.start()
all_data = list(range(1000000))
current1, peak1 = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Approach 2: Generator (lazy evaluation)
tracemalloc.start()
data_gen = (x for x in range(1000000))
current2, peak2 = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"List approach: {peak1 / 1024 / 1024:.2f} MB")
print(f"Generator approach: {peak2 / 1024 / 1024:.2f} MB")
print(f"Memory saved: {(peak1 - peak2) / 1024 / 1024:.2f} MB")import time
import contextlib
@contextlib.contextmanager
def timer(label):
"""Context manager for timing code sections."""
start = time.time()
try:
yield
finally:
elapsed = time.time() - start
print(f"{label}: {elapsed*1000:.3f}ms")
def process_data_with_timing():
"""Process data while measuring each step."""
with timer("Data preparation"):
data = list(range(10000))
with timer("First loop (filtering)"):
filtered = [x for x in data if x % 2 == 0]
with timer("Second loop (transformation)"):
transformed = [x**2 for x in filtered]
with timer("Final aggregation"):
result = sum(transformed)
return result
# process_data_with_timing()
# More sophisticated timing
class LoopTimer:
"""Track timing for different parts of loop."""
def __init__(self):
self.times = {}
self.counts = {}
def record(self, section, elapsed):
"""Record time for a section."""
if section not in self.times:
self.times[section] = 0
self.counts[section] = 0
self.times[section] += elapsed
self.counts[section] += 1
def report(self):
"""Print timing report."""
print("\nTiming Report:")
print("-" * 50)
for section in sorted(self.times.keys()):
total = self.times[section]
count = self.counts[section]
avg = total / count
print(f"{section:20s}: {total*1000:8.2f}ms ({count:4d} calls, avg {avg*1000:6.3f}ms)")
# Usage
timer = LoopTimer()
for i in range(100):
start = time.time()
# Section 1
result = sum(range(1000))
timer.record("Summation", time.time() - start)
start = time.time()
# Section 2
data = [x**2 for x in range(1000)]
timer.record("List comprehension", time.time() - start)
# timer.report()def detect_infinite_loop(max_iterations=1000000):
"""Detect and prevent infinite loops."""
def safe_loop(process_fn, initial, timeout=5):
"""Run loop with iteration limit and timeout."""
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Loop execution exceeded time limit")
# Set timeout (Unix only)
# signal.signal(signal.SIGALRM, timeout_handler)
# signal.alarm(timeout)
state = initial
iterations = 0
try:
while iterations < max_iterations:
state = process_fn(state)
iterations += 1
if iterations % 100000 == 0:
print(f"Iteration {iterations}...")
except TimeoutError:
print(f"Loop timed out after {iterations} iterations")
raise
if iterations >= max_iterations:
print(f"Warning: Loop hit iteration limit ({max_iterations})")
return state, iterations
# Test with potentially infinite loop
def process(state):
return state + 1
try:
result, iters = safe_loop(process, 0, timeout=5)
print(f"Completed {iters} iterations")
except Exception as e:
print(f"Error: {e}")def unfused_loops(data):
"""Multiple separate loops (inefficient)."""
# First pass: filter
filtered = []
for x in data:
if x > 0:
filtered.append(x)
# Second pass: transform
transformed = []
for x in filtered:
transformed.append(x**2)
# Third pass: sum
result = 0
for x in transformed:
result += x
return result
def fused_loop(data):
"""Combine loops (more efficient)."""
result = 0
for x in data:
if x > 0:
result += x**2
return result
# Performance comparison
import timeit
data = list(range(10000))
unfused_time = timeit.timeit(lambda: unfused_loops(data), number=1000)
fused_time = timeit.timeit(lambda: fused_loop(data), number=1000)
print(f"Unfused: {unfused_time:.4f}s")
print(f"Fused: {fused_time:.4f}s")
print(f"Speedup: {unfused_time/fused_time:.2f}x")def matrix_sum_naive(matrix):
"""Naive approach - may have poor cache locality."""
total = 0
for i in range(len(matrix)):
for j in range(len(matrix[0])):
total += matrix[i][j]
return total
def matrix_sum_tiled(matrix, tile_size=32):
"""
Tiled approach - better cache locality.
Process matrix in blocks to fit in CPU cache.
"""
total = 0
rows = len(matrix)
cols = len(matrix[0])
for i in range(0, rows, tile_size):
for j in range(0, cols, tile_size):
# Process tile
for ii in range(i, min(i + tile_size, rows)):
for jj in range(j, min(j + tile_size, cols)):
total += matrix[ii][jj]
return total
# Create test matrix
import random
matrix = [[random.randint(1, 100) for _ in range(1000)] for _ in range(1000)]
# Time both approaches
naive_time = timeit.timeit(lambda: matrix_sum_naive(matrix), number=10)
tiled_time = timeit.timeit(lambda: matrix_sum_tiled(matrix), number=10)
print(f"Naive: {naive_time:.3f}s")
print(f"Tiled: {tiled_time:.3f}s")
print(f"Speedup: {naive_time/tiled_time:.2f}x")import numpy as np
def python_loop():
"""Pure Python loop."""
result = []
for i in range(1000000):
result.append(i**2 + 2*i + 1)
return result
def numpy_vectorized():
"""Vectorized NumPy operation."""
i = np.arange(1000000)
return (i**2 + 2*i + 1).tolist()
# Performance comparison
python_time = timeit.timeit(python_loop, number=10)
numpy_time = timeit.timeit(numpy_vectorized, number=10)
print(f"Pure Python: {python_time:.4f}s")
print(f"NumPy: {numpy_time:.4f}s")
print(f"Speedup: {python_time/numpy_time:.2f}x")
# More complex example
def process_data_python():
"""Process 2D data in Python."""
result = []
for i in range(1000):
row = []
for j in range(1000):
row.append(i**2 + j**2)
result.append(row)
return result
def process_data_numpy():
"""Process 2D data with NumPy."""
i = np.arange(1000).reshape(-1, 1)
j = np.arange(1000).reshape(1, -1)
return (i**2 + j**2).tolist()
# Performance comparison
py_time = timeit.timeit(process_data_python, number=10)
np_time = timeit.timeit(process_data_numpy, number=10)
print(f"\nMatrix operation:")
print(f"Pure Python: {py_time:.4f}s")
print(f"NumPy: {np_time:.4f}s")
print(f"Speedup: {py_time/np_time:.1f}x")def safe_division_loop(numbers, divisor):
"""Loop with safety assertions."""
results = []
for num in numbers:
# Precondition assertion
assert isinstance(num, (int, float)), f"Invalid number: {num}"
assert divisor != 0, "Divisor cannot be zero"
result = num / divisor
# Postcondition assertion
assert isinstance(result, float), "Result should be float"
assert not (divisor > 0 and num > 0 and result < 0), "Sign mismatch"
results.append(result)
# Invariant assertion
assert len(results) == len(numbers), "Output length mismatch"
return results
# Usage with assertions enabled
try:
result = safe_division_loop([1, 2, 3], 2)
print(result)
except AssertionError as e:
print(f"Assertion failed: {e}")import logging
def process_with_logging(items):
"""Process items with detailed logging."""
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Add console handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
results = []
for index, item in enumerate(items):
logger.debug(f"Processing item {index}: {item}")
try:
if not isinstance(item, int):
logger.warning(f"Item {index} is not an integer: {type(item)}")
raise TypeError(f"Expected int, got {type(item)}")
if item < 0:
logger.warning(f"Item {index} is negative: {item}")
result = item ** 2
logger.debug(f"Item {index} result: {result}")
results.append(result)
except Exception as e:
logger.error(f"Error processing item {index}: {e}", exc_info=True)
continue
logger.info(f"Processing complete: {len(results)} successful, {len(items) - len(results)} failed")
return results
# process_with_logging([1, 2, -3, 'four', 5])class LoopInspector:
"""Inspect loop state and behavior."""
def __init__(self):
self.history = []
self.breakpoints = {}
def record_state(self, iteration, **state):
"""Record loop state."""
self.history.append({
'iteration': iteration,
**state
})
def set_breakpoint(self, condition, action):
"""Set conditional breakpoint."""
self.breakpoints[condition] = action
def inspect_at_iteration(self, iteration):
"""Inspect state at specific iteration."""
if iteration < len(self.history):
return self.history[iteration]
return None
def replay_iterations(self, start=0, end=None):
"""Replay iterations with their states."""
if end is None:
end = len(self.history)
for state in self.history[start:end]:
print(state)
# Usage
def process_with_inspection(data):
"""Process with loop inspection."""
inspector = LoopInspector()
results = []
for i, item in enumerate(data):
result = item ** 2
# Record state
inspector.record_state(
i,
input=item,
output=result,
running_sum=sum(results) + result
)
results.append(result)
# Check breakpoint
if result > 50:
print(f"Breakpoint hit at iteration {i}: {result}")
return results, inspector
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results, inspector = process_with_inspection(data)
# Inspect specific iteration
state = inspector.inspect_at_iteration(5)
print(f"State at iteration 5: {state}")# INEFFICIENT: Calling function in loop condition
def inefficient_filter():
"""Avoid function calls in loop conditions."""
data = list(range(1000))
result = []
for i in range(len(data)): # Bad: len() called each iteration
if i < len(data): # Bad: repeated function call
result.append(data[i])
return result
# EFFICIENT: Cache values before loop
def efficient_filter():
"""Cache values before loop."""
data = list(range(1000))
result = []
n = len(data)
for i in range(n): # Good: length cached
if i < n: # Good: reuse cached value
result.append(data[i])
return result
# INEFFICIENT: Type checking inside tight loop
def inefficient_type_check():
"""Avoid repeated type checking."""
for item in range(1000000):
if isinstance(item, int): # Called repeatedly
result = item ** 2
# EFFICIENT: Check once before loop
def efficient_type_check():
"""Type check before loop."""
data = range(1000000)
if isinstance(data, (list, tuple)):
for item in data:
result = item ** 2
# INEFFICIENT: String concatenation in loop
def inefficient_concatenation():
"""String concatenation is O(n²)."""
result = ""
for i in range(1000):
result += str(i) + ", " # Creates new string each time
return result
# EFFICIENT: Use list and join
def efficient_concatenation():
"""List append + join is O(n)."""
result = []
for i in range(1000):
result.append(str(i))
return ", ".join(result)| Tool | Purpose | Overhead | Best For |
|---|---|---|---|
| cProfile | Function timing | Low | Overall profiling |
| line_profiler | Line-by-line timing | Medium | Finding bottlenecks |
| memory_profiler | Memory usage | High | Memory analysis |
| timeit | Micro-benchmarking | Medium | Comparing approaches |
| tracemalloc | Memory tracking | Medium | Memory leaks |
Explore these advanced topics:
Master debugging and unlock optimal loop performance!
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward