Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
Data Processing OverviewCSV Data HandlingPandas BasicsDataFramesData FilteringAggregation & GroupingData Cleaning & WranglingNumPy ArraysData Visualization Basics
Python/Data Processing/Data Processing Overview

🏗️ Advanced Data Processing Overview — Building Production Pipelines

Design scalable, efficient data processing systems for real-world applications.


🎯 Advanced Pipeline Architecture

from functools import reduce
from typing import Callable, List, Any

class DataPipeline:
    """Composable data processing pipeline"""

    def __init__(self):
        self.steps = []

    def add_step(self, func: Callable, name: str = None):
        """Add processing step"""
        self.steps.append({'func': func, 'name': name or func.__name__})
        return self

    def execute(self, data: Any):
        """Execute all steps in sequence"""
        result = data
        for step in self.steps:
            print(f"Executing: {step['name']}")
            result = step['func'](result)
        return result

# Example: E-commerce order processing
def validate_orders(orders):
    return [o for o in orders if o.get('amount') > 0]

def calculate_tax(orders):
    return [{**o, 'tax': o['amount'] * 0.1} for o in orders]

def calculate_total(orders):
    return [{**o, 'total': o['amount'] + o['tax']} for o in orders]

pipeline = DataPipeline()
pipeline.add_step(validate_orders, "Validation")
pipeline.add_step(calculate_tax, "Tax Calculation")
pipeline.add_step(calculate_total, "Total Calculation")

raw_orders = [{'id': 1, 'amount': 100}, {'id': 2, 'amount': -50}]
processed = pipeline.execute(raw_orders)

🔄 Lazy Evaluation and Generators

Handle large datasets without loading everything into memory.

def read_csv_lazy(filepath, chunk_size=1000):
    """Read CSV in chunks for memory efficiency"""
    with open(filepath) as f:
        batch = []
        for line in f:
            batch.append(line.strip().split(','))
            if len(batch) >= chunk_size:
                yield batch
                batch = []
        if batch:
            yield batch

# Process in memory-efficient way
def process_large_file(filepath):
    total = 0
    count = 0
    for chunk in read_csv_lazy(filepath):
        for row in chunk:
            count += 1
            if len(row) > 2:
                total += int(row[2])
    return total / count if count > 0 else 0

# Generator for filtering and transforming
def filter_transform(data, predicate, transformer):
    """Memory-efficient filter and map"""
    for item in data:
        if predicate(item):
            yield transformer(item)

# Usage
filtered = filter_transform(
    range(1000000),
    lambda x: x % 2 == 0,  # Even numbers
    lambda x: x ** 2        # Square them
)

📊 Handling Imbalanced Data

import pandas as pd
import numpy as np

def handle_class_imbalance(df, target_col, strategy='oversample'):
    """Balance imbalanced datasets"""

    if strategy == 'oversample':
        # Oversample minority class
        df_majority = df[df[target_col] == 0]
        df_minority = df[df[target_col] == 1]

        df_minority_oversampled = df_minority.sample(
            n=len(df_majority),
            replace=True,
            random_state=42
        )
        return pd.concat([df_majority, df_minority_oversampled])

    elif strategy == 'undersample':
        # Undersample majority class
        df_majority = df[df[target_col] == 0]
        df_minority = df[df[target_col] == 1]

        df_majority_undersampled = df_majority.sample(
            n=len(df_minority),
            random_state=42
        )
        return pd.concat([df_majority_undersampled, df_minority])

    elif strategy == 'smote':
        # Synthetic Minority Over-sampling Technique
        from imblearn.over_sampling import SMOTE
        X = df.drop(target_col, axis=1)
        y = df[target_col]
        smote = SMOTE()
        X_balanced, y_balanced = smote.fit_resample(X, y)
        return pd.DataFrame(X_balanced).assign(**{target_col: y_balanced})

🔐 Data Validation and Schema

from dataclasses import dataclass
from typing import Optional

@dataclass
class OrderSchema:
    """Schema validation for orders"""
    id: int
    amount: float
    customer_id: int
    status: str = 'pending'

    def validate(self):
        """Validate order data"""
        if self.amount <= 0:
            raise ValueError("Amount must be positive")
        if self.id <= 0:
            raise ValueError("ID must be positive")
        if self.status not in ['pending', 'processing', 'completed']:
            raise ValueError("Invalid status")
        return True

class DataValidator:
    """Validate data against schema"""

    def __init__(self, schema):
        self.schema = schema

    def validate_row(self, row_dict):
        """Validate single row"""
        try:
            obj = self.schema(**row_dict)
            obj.validate()
            return True, obj
        except Exception as e:
            return False, str(e)

    def validate_batch(self, rows):
        """Validate multiple rows"""
        results = []
        for row in rows:
            valid, result = self.validate_row(row)
            results.append({'valid': valid, 'result': result, 'row': row})
        return results

# Usage
validator = DataValidator(OrderSchema)
orders = [
    {'id': 1, 'amount': 100, 'customer_id': 1},
    {'id': 2, 'amount': -50, 'customer_id': 2},  # Invalid
    {'id': 3, 'amount': 75, 'customer_id': 3}
]
results = validator.validate_batch(orders)

⚡ Performance Optimization

import pandas as pd
import numpy as np
from functools import lru_cache

# Strategy 1: Vectorization vs loops
def slow_aggregate(df):
    """Inefficient: loops"""
    totals = {}
    for idx, row in df.iterrows():
        if row['category'] not in totals:
            totals[row['category']] = 0
        totals[row['category']] += row['amount']
    return totals

def fast_aggregate(df):
    """Efficient: vectorized"""
    return df.groupby('category')['amount'].sum().to_dict()

# Strategy 2: Chunking large operations
def process_in_chunks(df, chunk_size=10000):
    """Process large DataFrame in chunks"""
    results = []
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size]
        # Process chunk
        results.append(chunk.groupby('category').agg({'amount': 'sum'}))
    return pd.concat(results).groupby(level=0).sum()

# Strategy 3: Caching expensive operations
@lru_cache(maxsize=128)
def expensive_lookup(value):
    """Cache results of expensive computation"""
    return sum(int(d) for d in str(value))

# Strategy 4: Data type optimization
def optimize_dtypes(df):
    """Reduce memory usage with appropriate types"""
    for col in df.columns:
        col_type = df[col].dtype

        if col_type == 'float64':
            df[col] = df[col].astype('float32')
        elif col_type == 'int64':
            max_val = df[col].max()
            if max_val < 2**8:
                df[col] = df[col].astype('uint8')
            elif max_val < 2**16:
                df[col] = df[col].astype('uint16')

    return df

🔄 Error Handling and Resilience

import logging
from typing import Callable, Any

logger = logging.getLogger(__name__)

class ResilientProcessor:
    """Process data with error handling and recovery"""

    def __init__(self, error_strategy='log'):
        self.error_strategy = error_strategy
        self.errors = []

    def process_with_fallback(self, func: Callable, data: Any, fallback: Any = None):
        """Execute with fallback on error"""
        try:
            return func(data)
        except Exception as e:
            logger.error(f"Processing error: {e}")
            self.errors.append({'error': str(e), 'data': data})

            if self.error_strategy == 'log':
                return fallback
            elif self.error_strategy == 'raise':
                raise
            elif self.error_strategy == 'skip':
                return None

    def batch_process_safe(self, items, processor, skip_errors=True):
        """Process items, optionally skipping errors"""
        results = []
        for item in items:
            try:
                result = processor(item)
                results.append(result)
            except Exception as e:
                if skip_errors:
                    logger.warning(f"Skipped item due to error: {e}")
                    continue
                else:
                    raise
        return results

# Example: Robust CSV processing
processor = ResilientProcessor()
def safe_convert_amount(row):
    try:
        return float(row['amount'])
    except ValueError:
        return None

results = processor.batch_process_safe(
    [{'amount': '100'}, {'amount': 'invalid'}, {'amount': '200'}],
    safe_convert_amount
)

📊 Parallel Processing

from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor
import pandas as pd

# Multi-processing for CPU-bound tasks
def expensive_computation(x):
    """Simulate expensive calculation"""
    return sum(i**2 for i in range(x))

def parallel_process(data, func, n_workers=4):
    """Process data in parallel"""
    with Pool(n_workers) as pool:
        results = pool.map(func, data)
    return results

# Thread-based processing for I/O-bound tasks
def fetch_data(url):
    """Simulate I/O operation"""
    import time
    time.sleep(0.1)
    return len(url)

def parallel_fetch(urls, n_workers=4):
    """Fetch multiple URLs in parallel"""
    with ThreadPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(fetch_data, urls))
    return results

# Distributed DataFrame processing
def process_large_dataframe(df, func, n_workers=4):
    """Apply function to DataFrame partitions in parallel"""
    chunks = np.array_split(df, n_workers)
    with Pool(n_workers) as pool:
        results = pool.map(func, chunks)
    return pd.concat(results, ignore_index=True)

🔑 Key Takeaways

  • ✅ Design modular, composable pipelines
  • ✅ Use generators for memory-efficient processing
  • ✅ Validate data with schema definitions
  • ✅ Vectorize operations instead of looping
  • ✅ Chunk large datasets for memory efficiency
  • ✅ Implement error handling and recovery strategies
  • ✅ Use parallel processing for I/O and CPU-bound tasks

Explore more: Advanced CSV Handling | Advanced Pandas


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn