
Python
Professional Python development requires understanding best practices, design patterns, and modern concurrency paradigms. This section covers Pythonic idioms, functional approaches, and concurrent programming.
# Principle 1: Prefer readability over cleverness
# LESS Pythonic: Overcomplicated
result = [y for x in [[1, 2], [3, 4]] for y in x if y % 2 == 0 if y > 1]
# MORE Pythonic: Clear and readable
nested_lists = [[1, 2], [3, 4]]
result = [
y for x in nested_lists
for y in x
if y % 2 == 0
if y > 1
]
# Principle 2: Use built-ins and standard library
# LESS Pythonic: Manual implementation
def find_max(items):
max_val = items[0]
for item in items[1:]:
if item > max_val:
max_val = item
return max_val
# MORE Pythonic: Use built-in
max_val = max(items)
# Principle 3: Explicit loops for complex logic
# LESS Pythonic: Forced comprehension
result = [process(x) if x > 10 else fallback(x) for x in data if valid(x)]
# MORE Pythonic: Explicit loop for clarity
result = []
for x in data:
if valid(x):
if x > 10:
result.append(process(x))
else:
result.append(fallback(x))
# Principle 4: Use enumerate for index and value
# LESS Pythonic: Manual indexing
for i in range(len(items)):
print(i, items[i])
# MORE Pythonic: Use enumerate
for i, item in enumerate(items):
print(i, item)
# Principle 5: Use zip for parallel iteration
# LESS Pythonic: Index-based
for i in range(len(names)):
print(f"{names[i]}: {ages[i]}")
# MORE Pythonic: Use zip
for name, age in zip(names, ages):
print(f"{name}: {age}")from itertools import count, cycle, repeat, takewhile, dropwhile, combinations, permutations
# IDIOM 1: Infinite iteration with count
def fibonacci_iterator():
"""Generate Fibonacci numbers indefinitely."""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# Use with takewhile for safety
from itertools import takewhile
fibs = takewhile(lambda x: x < 1000, fibonacci_iterator())
print(list(fibs)) # [0, 1, 1, 2, 3, 5, 8, 13, ...]
# IDIOM 2: Cycle for round-robin scheduling
tasks = ['task1', 'task2', 'task3']
scheduler = cycle(tasks)
for _ in range(10):
task = next(scheduler)
print(f"Running {task}")
# IDIOM 3: Repeat for constant values
repeated = repeat('x', 5)
print(list(repeated)) # ['x', 'x', 'x', 'x', 'x']
# IDIOM 4: Combinations and permutations
items = [1, 2, 3]
# All 2-element combinations (order doesn't matter)
for combo in combinations(items, 2):
print(combo) # (1, 2), (1, 3), (2, 3)
# All 2-element permutations (order matters)
for perm in permutations(items, 2):
print(perm) # (1, 2), (1, 3), (2, 1), (2, 3), (3, 1), (3, 2)# IDIOM 1: Inverting dictionary
data = {'a': 1, 'b': 2, 'c': 3}
inverted = {v: k for k, v in data.items()}
print(inverted) # {1: 'a', 2: 'b', 3: 'c'}
# IDIOM 2: Grouping by key
from itertools import groupby
from operator import itemgetter
people = [
{'name': 'Alice', 'age': 25},
{'name': 'Bob', 'age': 30},
{'name': 'Charlie', 'age': 25},
]
# Group by age
by_age = {}
for age, group in groupby(people, key=itemgetter('age')):
by_age[age] = list(group)
# IDIOM 3: Collecting duplicates
from collections import Counter
items = [1, 2, 2, 3, 3, 3, 4]
counts = Counter(items)
duplicates = {item: count for item, count in counts.items() if count > 1}
print(duplicates) # {2: 2, 3: 3}
# IDIOM 4: Set operations
a = {1, 2, 3, 4}
b = {3, 4, 5, 6}
union = a | b # {1, 2, 3, 4, 5, 6}
intersection = a & b # {3, 4}
difference = a - b # {1, 2}
symmetric_diff = a ^ b # {1, 2, 5, 6}from functools import reduce
# Traditional approach
numbers = [1, 2, 3, 4, 5]
# Filtering
even_nums = [x for x in numbers if x % 2 == 0]
# Functional approach (often less Pythonic)
even_nums_func = list(filter(lambda x: x % 2 == 0, numbers))
# However, map can be useful
squared = list(map(lambda x: x**2, numbers))
# But comprehensions are usually clearer
squared_comp = [x**2 for x in numbers]
# Reduce is useful for aggregation
from functools import reduce
product = reduce(lambda x, y: x * y, numbers)
print(product) # 120
# More Pythonic: use built-ins or comprehensions
product_builtin = 1
for x in numbers:
product_builtin *= x
# Or with math.factorial for specific use case
import math
product_factorial = math.factorial(5)# PATTERN 1: Pipeline of functions
def pipeline(*functions):
"""Compose functions left to right."""
def pipe(value):
for func in functions:
value = func(value)
return value
return pipe
# Example
double = lambda x: x * 2
add_one = lambda x: x + 1
square = lambda x: x ** 2
process = pipeline(double, add_one, square)
print(process(5)) # ((5 * 2) + 1) ^ 2 = 121
# PATTERN 2: Decorator for function composition
def compose(*functions):
"""Compose functions right to left."""
def composed(arg):
for func in reversed(functions):
arg = func(arg)
return arg
return composed
# Example
compose_process = compose(square, add_one, double)
print(compose_process(5)) # 5 * 2 + 1 = 11, then squared = 121
# PATTERN 3: Using functools.reduce for composition
from functools import reduce
from operator import mul
def compose_reduce(*functions):
return reduce(lambda f, g: lambda x: f(g(x)), functions)
# More practical: reduce for data aggregation
values = [1, 2, 3, 4, 5]
product = reduce(mul, values) # 120import threading
import time
from queue import Queue
def process_queue_threaded(tasks):
"""Process tasks using thread pool."""
results = []
result_lock = threading.Lock()
def worker(queue):
"""Worker thread processes items from queue."""
while True:
task = queue.get()
if task is None: # Sentinel value
break
try:
result = task() # Process task
with result_lock:
results.append(result)
except Exception as e:
print(f"Error: {e}")
finally:
queue.task_done()
# Create thread pool
num_threads = 4
queue = Queue()
threads = []
# Start workers
for _ in range(num_threads):
t = threading.Thread(target=worker, args=(queue,))
t.start()
threads.append(t)
# Queue tasks
for task in tasks:
queue.put(task)
# Signal workers to stop
for _ in range(num_threads):
queue.put(None)
# Wait for completion
for t in threads:
t.join()
return results
# Usage
def make_task(x):
return lambda: x ** 2
tasks = [make_task(x) for x in range(10)]
results = process_queue_threaded(tasks)
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]import multiprocessing
from multiprocessing import Pool
def cpu_intensive_task(x):
"""Simulate CPU-intensive work."""
total = 0
for i in range(x * 1000000):
total += i
return total
def process_with_multiprocessing():
"""Use multiprocessing for parallel computation."""
data = list(range(1, 5))
# Use Pool for automatic worker management
with Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, data)
return results
# Usage
# results = process_with_multiprocessing()
# For more control over workers
def process_with_process_pool():
"""Lower-level process pool control."""
from multiprocessing import Process
from queue import Queue
def worker(input_queue, output_queue):
while True:
task = input_queue.get()
if task is None:
break
index, value = task
result = cpu_intensive_task(value)
output_queue.put((index, result))
# Create processes
input_queue = Queue()
output_queue = Queue()
num_workers = 4
processes = []
for _ in range(num_workers):
p = Process(target=worker, args=(input_queue, output_queue))
p.start()
processes.append(p)
# Queue work
for i, value in enumerate(range(1, 5)):
input_queue.put((i, value))
# Signal end
for _ in range(num_workers):
input_queue.put(None)
# Collect results
results = []
for _ in range(4):
results.append(output_queue.get())
# Clean up
for p in processes:
p.join()
return sorted(results, key=lambda x: x[0])import asyncio
async def fetch_data(url, delay=1):
"""Simulate async I/O operation."""
print(f"Fetching {url}")
await asyncio.sleep(delay) # Simulate network delay
print(f"Got data from {url}")
return f"data from {url}"
async def fetch_multiple_sequential(urls):
"""Fetch URLs sequentially (slow)."""
results = []
for url in urls:
result = await fetch_data(url)
results.append(result)
return results
async def fetch_multiple_concurrent(urls):
"""Fetch URLs concurrently (fast)."""
# Create tasks for all URLs
tasks = [fetch_data(url) for url in urls]
# Wait for all to complete
results = await asyncio.gather(*tasks)
return results
async def fetch_with_timeout(urls):
"""Fetch with timeout protection."""
try:
results = await asyncio.wait_for(
fetch_multiple_concurrent(urls),
timeout=5.0
)
return results
except asyncio.TimeoutError:
print("Fetch operation timed out")
return []
# Usage
async def main():
urls = [f"http://api{i}.example.com" for i in range(3)]
# Sequential (takes ~3 seconds)
print("Sequential fetch:")
# results = await fetch_multiple_sequential(urls)
# Concurrent (takes ~1 second)
print("Concurrent fetch:")
# results = await fetch_multiple_concurrent(urls)
# With timeout
# results = await fetch_with_timeout(urls)
# Run async code
# asyncio.run(main())
# Async generators
async def async_generator(start, end):
"""Async generator for streaming data."""
for i in range(start, end):
await asyncio.sleep(0.1)
yield i
async def iterate_async_generator():
"""Iterate over async generator."""
results = []
async for value in async_generator(1, 5):
results.append(value)
return results
# asyncio.run(iterate_async_generator())
# Async comprehension
async def async_comprehension():
"""Use async comprehension."""
results = [x async for x in async_generator(1, 5)]
return results
# asyncio.run(async_comprehension())class CustomIterator:
"""Implement custom iteration protocol."""
def __init__(self, data):
self.data = data
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
value = self.data[self.index]
self.index += 1
return value
# Usage
for item in CustomIterator([1, 2, 3]):
print(item)from abc import ABC, abstractmethod
class ProcessingStrategy(ABC):
"""Abstract base for processing strategies."""
@abstractmethod
def process(self, items):
pass
class SequentialProcessing(ProcessingStrategy):
"""Process items sequentially."""
def process(self, items):
results = []
for item in items:
results.append(self._transform(item))
return results
@staticmethod
def _transform(item):
return item ** 2
class ParallelProcessing(ProcessingStrategy):
"""Process items in parallel."""
def process(self, items):
from multiprocessing import Pool
with Pool() as pool:
return pool.map(self._transform, items)
@staticmethod
def _transform(item):
return item ** 2
class AsyncProcessing(ProcessingStrategy):
"""Process items asynchronously."""
async def process(self, items):
tasks = [self._transform_async(item) for item in items]
return await asyncio.gather(*tasks)
@staticmethod
async def _transform_async(item):
await asyncio.sleep(0.1)
return item ** 2
# Usage
class DataProcessor:
def __init__(self, strategy: ProcessingStrategy):
self.strategy = strategy
def process(self, items):
return self.strategy.process(items)
# Choose strategy at runtime
processor = DataProcessor(SequentialProcessing())
results = processor.process([1, 2, 3, 4, 5])
print(results)# PATTERN 1: Comprehensive error handling
def robust_loop(items, error_handler=None):
"""Loop with proper error handling."""
results = []
errors = []
for index, item in enumerate(items):
try:
result = process_item(item)
results.append(result)
except ValueError as e:
error_info = {'index': index, 'item': item, 'error': str(e)}
errors.append(error_info)
if error_handler:
error_handler(error_info)
except Exception as e:
# Re-raise unexpected exceptions
raise RuntimeError(f"Unexpected error at index {index}: {e}") from e
return results, errors
def process_item(item):
"""Process single item."""
if not isinstance(item, int):
raise ValueError(f"Expected int, got {type(item)}")
return item ** 2
# PATTERN 2: Using contextlib for cleanup
import contextlib
@contextlib.contextmanager
def loop_context(name):
"""Context manager for loop lifecycle."""
print(f"Starting {name}")
try:
yield
finally:
print(f"Finishing {name}")
def looping_with_context(items):
with loop_context("data processing"):
for item in items:
print(f"Processing {item}")| Pattern | Use Case | Complexity | Performance |
|---|---|---|---|
| List comprehension | Simple transformations | Low | Fast |
| Generator expression | Large data sets | Low | Efficient |
| Functional pipeline | Chained operations | Medium | Good |
| Threading | I/O-bound tasks | Medium | Concurrent |
| Multiprocessing | CPU-bound tasks | High | Parallel |
| Async/await | I/O-bound tasks | High | Very efficient |
| Custom iterators | Complex iteration | Medium | Flexible |
Explore these advanced topics:
Master Pythonic patterns and unlock professional-grade loop design!
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward