
Python
Master techniques for handling massive CSV files and streaming data efficiently.
Process large files that don't fit in memory.
import pandas as pd
def process_large_csv(filepath, chunk_size=10000, processor=None):
"""Process CSV in chunks without loading entire file"""
chunks_processed = 0
results = []
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
chunks_processed += 1
print(f"Processing chunk {chunks_processed}...")
if processor:
result = processor(chunk)
results.append(result)
else:
results.append(chunk)
return pd.concat(results, ignore_index=True) if results else None
# Example processor
def process_chunk(chunk):
"""Filter and transform chunk"""
chunk['total'] = chunk['quantity'] * chunk['price']
return chunk[chunk['total'] > 100]
df = process_large_csv('large_sales.csv', chunk_size=50000, processor=process_chunk)import pandas as pd
from io import StringIO
# Reading with type conversion
csv_data = """date,amount,category,tags
2024-01-01,100.50,Sales,"urgent,review"
2024-01-02,200.75,Marketing,"quarterly"
2024-01-03,50.25,Operations,"routine,archived" """
df = pd.read_csv(
StringIO(csv_data),
dtype={'amount': 'float32', 'category': 'category'},
parse_dates=['date'],
converters={
'tags': lambda x: x.split(',') # Convert to list
}
)
print(df.dtypes)
print(df['tags'].iloc[0]) # ['urgent', 'review']
# Custom parsing function
def parse_currency(x):
if pd.isna(x):
return 0.0
return float(str(x).replace('$', '').replace(',', ''))
df = pd.read_csv(
'prices.csv',
converters={'price': parse_currency}
)import csv
from pathlib import Path
class CSVStreamProcessor:
"""Process CSV row-by-row with minimal memory"""
def __init__(self, filepath):
self.filepath = filepath
def process(self, processor_func, output_file=None):
"""Stream process and optionally write output"""
output_writer = None
output_file_obj = None
with open(self.filepath, 'r') as infile:
reader = csv.DictReader(infile)
fieldnames = reader.fieldnames
if output_file:
output_file_obj = open(output_file, 'w', newline='')
output_writer = csv.DictWriter(output_file_obj, fieldnames=fieldnames)
output_writer.writeheader()
for row in reader:
processed = processor_func(row)
if processed and output_writer:
output_writer.writerow(processed)
if output_file_obj:
output_file_obj.close()
# Usage
processor = CSVStreamProcessor('large_data.csv')
def filter_and_transform(row):
"""Process individual row"""
try:
amount = float(row['amount'])
if amount > 100:
row['category'] = 'high_value'
return row
except ValueError:
return None
processor.process(filter_and_transform, output_file='filtered_data.csv')import pandas as pd
from pathlib import Path
def merge_csv_files(directory, pattern='*.csv', on=None):
"""Merge multiple CSV files into one"""
csv_files = list(Path(directory).glob(pattern))
dfs = []
for file in sorted(csv_files):
print(f"Reading {file.name}...")
df = pd.read_csv(file)
dfs.append(df)
if on:
# Merge on common column
result = dfs[0]
for df in dfs[1:]:
result = pd.merge(result, df, on=on, how='outer')
return result
else:
# Concatenate (stack rows)
return pd.concat(dfs, ignore_index=True)
# Merge sales data from monthly files
monthly_data = merge_csv_files('./data/sales/', pattern='sales_*.csv')
print(f"Combined {len(monthly_data)} records")import pandas as pd
from pathlib import Path
def partition_and_save(df, partition_column, output_dir):
"""Split DataFrame and save as separate CSV files"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
for value in df[partition_column].unique():
subset = df[df[partition_column] == value]
filename = f"{output_dir}/{partition_column}_{value}.csv"
subset.to_csv(filename, index=False)
print(f"Saved {len(subset)} rows to {filename}")
# Example: Partition sales by region
df = pd.DataFrame({
'region': ['North', 'South', 'North', 'East', 'South'],
'sales': [100, 150, 120, 200, 180]
})
partition_and_save(df, 'region', './output/by_region')import pandas as pd
import hashlib
class SecureCSVHandler:
"""Handle sensitive data in CSVs"""
@staticmethod
def anonymize_column(df, column, preserve_length=False):
"""Hash sensitive data"""
def hash_value(val):
hashed = hashlib.sha256(str(val).encode()).hexdigest()
return hashed[:len(str(val))] if preserve_length else hashed
df[column] = df[column].apply(hash_value)
return df
@staticmethod
def redact_column(df, column, placeholder='***'):
"""Replace sensitive values"""
df[column] = placeholder
return df
@staticmethod
def encrypt_sensitive_columns(df, columns, key):
"""Encrypt sensitive columns"""
from cryptography.fernet import Fernet
cipher = Fernet(key)
for col in columns:
df[col] = df[col].apply(
lambda x: cipher.encrypt(str(x).encode()).decode()
)
return df
# Usage
df = pd.DataFrame({
'name': ['Alice', 'Bob'],
'ssn': ['123-45-6789', '987-65-4321'],
'salary': [50000, 60000]
})
# Anonymize SSN
df = SecureCSVHandler.anonymize_column(df, 'ssn')
# Redact salary
df = SecureCSVHandler.redact_column(df, 'salary')
print(df)import pandas as pd
# CSV to Parquet (efficient columnar format)
df = pd.read_csv('data.csv')
df.to_parquet('data.parquet', index=False)
# Parquet back to CSV
df = pd.read_parquet('data.parquet')
df.to_csv('data.csv', index=False)
# CSV to JSON
df.to_json('data.json', orient='records')
# CSV to Excel
df.to_excel('data.xlsx', sheet_name='Data', index=False)
# CSV to SQL Database
from sqlalchemy import create_engine
engine = create_engine('sqlite:///database.db')
df.to_sql('table_name', engine, if_exists='append', index=False)import pandas as pd
import time
filepath = 'large_file.csv'
# Method 1: Load entire file (memory intensive)
start = time.time()
df1 = pd.read_csv(filepath)
time1 = time.time() - start
# Method 2: Chunked reading (memory efficient)
start = time.time()
chunks = pd.read_csv(filepath, chunksize=50000)
df2 = pd.concat(chunks, ignore_index=True)
time2 = time.time() - start
# Method 3: Process with streaming (most efficient)
start = time.time()
count = 0
for chunk in pd.read_csv(filepath, chunksize=50000):
count += len(chunk)
time3 = time.time() - start
print(f"Full load: {time1:.2f}s")
print(f"Chunked: {time2:.2f}s")
print(f"Streaming: {time3:.2f}s")Continue: Advanced Pandas | Advanced DataFrames
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep • Build Real • Verify Skills • Launch Forward