
Python
Design scalable, efficient data processing systems for real-world applications.
from functools import reduce
from typing import Callable, List, Any
class DataPipeline:
"""Composable data processing pipeline"""
def __init__(self):
self.steps = []
def add_step(self, func: Callable, name: str = None):
"""Add processing step"""
self.steps.append({'func': func, 'name': name or func.__name__})
return self
def execute(self, data: Any):
"""Execute all steps in sequence"""
result = data
for step in self.steps:
print(f"Executing: {step['name']}")
result = step['func'](result)
return result
# Example: E-commerce order processing
def validate_orders(orders):
return [o for o in orders if o.get('amount') > 0]
def calculate_tax(orders):
return [{**o, 'tax': o['amount'] * 0.1} for o in orders]
def calculate_total(orders):
return [{**o, 'total': o['amount'] + o['tax']} for o in orders]
pipeline = DataPipeline()
pipeline.add_step(validate_orders, "Validation")
pipeline.add_step(calculate_tax, "Tax Calculation")
pipeline.add_step(calculate_total, "Total Calculation")
raw_orders = [{'id': 1, 'amount': 100}, {'id': 2, 'amount': -50}]
processed = pipeline.execute(raw_orders)Handle large datasets without loading everything into memory.
def read_csv_lazy(filepath, chunk_size=1000):
"""Read CSV in chunks for memory efficiency"""
with open(filepath) as f:
batch = []
for line in f:
batch.append(line.strip().split(','))
if len(batch) >= chunk_size:
yield batch
batch = []
if batch:
yield batch
# Process in memory-efficient way
def process_large_file(filepath):
total = 0
count = 0
for chunk in read_csv_lazy(filepath):
for row in chunk:
count += 1
if len(row) > 2:
total += int(row[2])
return total / count if count > 0 else 0
# Generator for filtering and transforming
def filter_transform(data, predicate, transformer):
"""Memory-efficient filter and map"""
for item in data:
if predicate(item):
yield transformer(item)
# Usage
filtered = filter_transform(
range(1000000),
lambda x: x % 2 == 0, # Even numbers
lambda x: x ** 2 # Square them
)import pandas as pd
import numpy as np
def handle_class_imbalance(df, target_col, strategy='oversample'):
"""Balance imbalanced datasets"""
if strategy == 'oversample':
# Oversample minority class
df_majority = df[df[target_col] == 0]
df_minority = df[df[target_col] == 1]
df_minority_oversampled = df_minority.sample(
n=len(df_majority),
replace=True,
random_state=42
)
return pd.concat([df_majority, df_minority_oversampled])
elif strategy == 'undersample':
# Undersample majority class
df_majority = df[df[target_col] == 0]
df_minority = df[df[target_col] == 1]
df_majority_undersampled = df_majority.sample(
n=len(df_minority),
random_state=42
)
return pd.concat([df_majority_undersampled, df_minority])
elif strategy == 'smote':
# Synthetic Minority Over-sampling Technique
from imblearn.over_sampling import SMOTE
X = df.drop(target_col, axis=1)
y = df[target_col]
smote = SMOTE()
X_balanced, y_balanced = smote.fit_resample(X, y)
return pd.DataFrame(X_balanced).assign(**{target_col: y_balanced})from dataclasses import dataclass
from typing import Optional
@dataclass
class OrderSchema:
"""Schema validation for orders"""
id: int
amount: float
customer_id: int
status: str = 'pending'
def validate(self):
"""Validate order data"""
if self.amount <= 0:
raise ValueError("Amount must be positive")
if self.id <= 0:
raise ValueError("ID must be positive")
if self.status not in ['pending', 'processing', 'completed']:
raise ValueError("Invalid status")
return True
class DataValidator:
"""Validate data against schema"""
def __init__(self, schema):
self.schema = schema
def validate_row(self, row_dict):
"""Validate single row"""
try:
obj = self.schema(**row_dict)
obj.validate()
return True, obj
except Exception as e:
return False, str(e)
def validate_batch(self, rows):
"""Validate multiple rows"""
results = []
for row in rows:
valid, result = self.validate_row(row)
results.append({'valid': valid, 'result': result, 'row': row})
return results
# Usage
validator = DataValidator(OrderSchema)
orders = [
{'id': 1, 'amount': 100, 'customer_id': 1},
{'id': 2, 'amount': -50, 'customer_id': 2}, # Invalid
{'id': 3, 'amount': 75, 'customer_id': 3}
]
results = validator.validate_batch(orders)import pandas as pd
import numpy as np
from functools import lru_cache
# Strategy 1: Vectorization vs loops
def slow_aggregate(df):
"""Inefficient: loops"""
totals = {}
for idx, row in df.iterrows():
if row['category'] not in totals:
totals[row['category']] = 0
totals[row['category']] += row['amount']
return totals
def fast_aggregate(df):
"""Efficient: vectorized"""
return df.groupby('category')['amount'].sum().to_dict()
# Strategy 2: Chunking large operations
def process_in_chunks(df, chunk_size=10000):
"""Process large DataFrame in chunks"""
results = []
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
# Process chunk
results.append(chunk.groupby('category').agg({'amount': 'sum'}))
return pd.concat(results).groupby(level=0).sum()
# Strategy 3: Caching expensive operations
@lru_cache(maxsize=128)
def expensive_lookup(value):
"""Cache results of expensive computation"""
return sum(int(d) for d in str(value))
# Strategy 4: Data type optimization
def optimize_dtypes(df):
"""Reduce memory usage with appropriate types"""
for col in df.columns:
col_type = df[col].dtype
if col_type == 'float64':
df[col] = df[col].astype('float32')
elif col_type == 'int64':
max_val = df[col].max()
if max_val < 2**8:
df[col] = df[col].astype('uint8')
elif max_val < 2**16:
df[col] = df[col].astype('uint16')
return dfimport logging
from typing import Callable, Any
logger = logging.getLogger(__name__)
class ResilientProcessor:
"""Process data with error handling and recovery"""
def __init__(self, error_strategy='log'):
self.error_strategy = error_strategy
self.errors = []
def process_with_fallback(self, func: Callable, data: Any, fallback: Any = None):
"""Execute with fallback on error"""
try:
return func(data)
except Exception as e:
logger.error(f"Processing error: {e}")
self.errors.append({'error': str(e), 'data': data})
if self.error_strategy == 'log':
return fallback
elif self.error_strategy == 'raise':
raise
elif self.error_strategy == 'skip':
return None
def batch_process_safe(self, items, processor, skip_errors=True):
"""Process items, optionally skipping errors"""
results = []
for item in items:
try:
result = processor(item)
results.append(result)
except Exception as e:
if skip_errors:
logger.warning(f"Skipped item due to error: {e}")
continue
else:
raise
return results
# Example: Robust CSV processing
processor = ResilientProcessor()
def safe_convert_amount(row):
try:
return float(row['amount'])
except ValueError:
return None
results = processor.batch_process_safe(
[{'amount': '100'}, {'amount': 'invalid'}, {'amount': '200'}],
safe_convert_amount
)from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
# Multi-processing for CPU-bound tasks
def expensive_computation(x):
"""Simulate expensive calculation"""
return sum(i**2 for i in range(x))
def parallel_process(data, func, n_workers=4):
"""Process data in parallel"""
with Pool(n_workers) as pool:
results = pool.map(func, data)
return results
# Thread-based processing for I/O-bound tasks
def fetch_data(url):
"""Simulate I/O operation"""
import time
time.sleep(0.1)
return len(url)
def parallel_fetch(urls, n_workers=4):
"""Fetch multiple URLs in parallel"""
with ThreadPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(fetch_data, urls))
return results
# Distributed DataFrame processing
def process_large_dataframe(df, func, n_workers=4):
"""Apply function to DataFrame partitions in parallel"""
chunks = np.array_split(df, n_workers)
with Pool(n_workers) as pool:
results = pool.map(func, chunks)
return pd.concat(results, ignore_index=True)Explore more: Advanced CSV Handling | Advanced Pandas
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward