
Python
Take your database performance to the next level with sophisticated profiling, analysis techniques, and optimization strategies for production systems.
Understanding query execution plans is key to optimization.
Example 1: Analyzing Query Plans
import sqlite3
conn = sqlite3.connect('bookstore.db')
cursor = conn.cursor()
# Create test table with indexes
cursor.execute("""
CREATE TABLE IF NOT EXISTS books (
book_id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
author_id INTEGER,
price REAL,
year_published INTEGER
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_author_id ON books(author_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_price ON books(price)")
def analyze_query_plan(query):
"""Analyze and display query execution plan"""
print(f"\nQuery: {query[:60]}...")
cursor.execute(f"EXPLAIN QUERY PLAN {query}")
print("Execution Plan:")
for row in cursor.fetchall():
# row format: (id, parent, notused, detail)
detail = row[3]
# Interpret the plan
if "SCAN TABLE" in detail:
print(f" โ ๏ธ FULL TABLE SCAN - {detail}")
elif "SEARCH TABLE" in detail:
print(f" โ INDEX USED - {detail}")
elif "INDEX SCAN" in detail:
print(f" โ INDEX SCAN - {detail}")
else:
print(f" {detail}")
# Analyze different queries
analyze_query_plan("""
SELECT * FROM books WHERE author_id = 5
""")
analyze_query_plan("""
SELECT * FROM books WHERE title = 'The Great Gatsby'
""")
analyze_query_plan("""
SELECT * FROM books WHERE price > 20 AND author_id = 5
""")
analyze_query_plan("""
SELECT * FROM books WHERE LOWER(title) = 'the great gatsby'
""")
conn.close()Example 2: Detailed Query Profiling
import sqlite3
import time
import statistics
class QueryProfiler:
def __init__(self, conn):
self.conn = conn
self.cursor = conn.cursor()
self.results = {}
def profile_query(self, name, query, params=(), iterations=100):
"""Profile a query multiple times and collect metrics"""
times = []
# Enable profiling
self.cursor.execute("PRAGMA query_only = ON")
for _ in range(iterations):
start = time.perf_counter()
self.cursor.execute(query, params)
self.cursor.fetchall()
elapsed = time.perf_counter() - start
times.append(elapsed * 1000) # Convert to ms
metrics = {
'name': name,
'query': query[:60],
'iterations': iterations,
'min': min(times),
'max': max(times),
'mean': statistics.mean(times),
'median': statistics.median(times),
'stdev': statistics.stdev(times) if len(times) > 1 else 0,
}
self.results[name] = metrics
return metrics
def print_report(self):
"""Print profiling report"""
print("\n" + "="*70)
print("QUERY PROFILING REPORT")
print("="*70)
for name, metrics in self.results.items():
print(f"\n{name}:")
print(f" Query: {metrics['query']}")
print(f" Iterations: {metrics['iterations']}")
print(f" Min: {metrics['min']:.3f} ms")
print(f" Max: {metrics['max']:.3f} ms")
print(f" Mean: {metrics['mean']:.3f} ms")
print(f" Median: {metrics['median']:.3f} ms")
print(f" StdDev: {metrics['stdev']:.3f} ms")
# Usage
conn = sqlite3.connect('bookstore.db')
profiler = QueryProfiler(conn)
# Profile different queries
profiler.profile_query(
"Simple WHERE",
"SELECT COUNT(*) FROM books WHERE author_id = ?"
)
profiler.profile_query(
"Complex JOIN",
"""
SELECT b.title, a.name, COUNT(r.id)
FROM books b
JOIN authors a ON b.author_id = a.author_id
LEFT JOIN reviews r ON b.book_id = r.book_id
GROUP BY b.book_id
"""
)
profiler.print_report()
conn.close()Example 3: Composite Indexes and Index Design
import sqlite3
conn = sqlite3.connect('ecommerce.db')
cursor = conn.cursor()
# Create table
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date TEXT NOT NULL,
total_amount REAL,
status TEXT,
shipping_address TEXT
)
""")
# Single column indexes for frequent WHERE clauses
cursor.execute("CREATE INDEX IF NOT EXISTS idx_customer ON orders(customer_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_status ON orders(status)")
# Composite index for queries filtering by multiple columns
# Order matters: place most selective column first
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_customer_date
ON orders(customer_id, order_date DESC)
""")
# Covering index: includes all columns needed for the query
# Avoids table lookup
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_covering
ON orders(customer_id, order_date, total_amount)
""")
conn.commit()
# These queries benefit from the indexes
queries = [
# Uses idx_customer
"SELECT * FROM orders WHERE customer_id = 42",
# Uses idx_customer_date
"SELECT order_id, order_date FROM orders WHERE customer_id = 42 ORDER BY order_date DESC",
# Uses idx_covering (no table lookup needed!)
"SELECT order_id, total_amount FROM orders WHERE customer_id = 42",
]
for query in queries:
cursor.execute(f"EXPLAIN QUERY PLAN {query}")
print(f"\nQuery: {query}")
for row in cursor.fetchall():
if "SEARCH" in row[3] or "INDEX" in row[3]:
print(f" โ {row[3]}")
conn.close()Example 4: Partial Indexes
import sqlite3
conn = sqlite3.connect('orders.db')
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id INTEGER PRIMARY KEY,
status TEXT,
created_at TEXT,
amount REAL
)
""")
# Partial index: only indexes active orders (smaller, faster)
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_active_orders
ON orders(created_at)
WHERE status IN ('pending', 'processing')
""")
# Useful for archived/deleted records that are rarely queried
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_live_orders
ON orders(order_id, amount)
WHERE status != 'cancelled'
""")
conn.commit()
# Query that uses partial index
cursor.execute("""
EXPLAIN QUERY PLAN
SELECT * FROM orders
WHERE status = 'pending'
ORDER BY created_at DESC
""")
print("Partial Index Query Plan:")
for row in cursor.fetchall():
print(f" {row[3]}")
conn.close()Example 5: Maintenance Operations
import sqlite3
import os
conn = sqlite3.connect('bookstore.db')
cursor = conn.cursor()
# Check database file size before optimization
db_size_before = os.path.getsize('bookstore.db')
print(f"Database size before: {db_size_before / 1024:.1f} KB")
# VACUUM: Rebuild database to reclaim unused space
# Run after deleting large amounts of data
cursor.execute("VACUUM")
print("VACUUM completed - unused space reclaimed")
# ANALYZE: Gather statistics about table/index distribution
# SQLite uses these stats to choose better query plans
cursor.execute("ANALYZE")
print("ANALYZE completed - query optimization stats updated")
# PRAGMA optimize: Automated optimization
cursor.execute("PRAGMA optimize")
print("PRAGMA optimize completed")
db_size_after = os.path.getsize('bookstore.db')
print(f"Database size after: {db_size_after / 1024:.1f} KB")
print(f"Space reclaimed: {(db_size_before - db_size_after) / 1024:.1f} KB")
conn.close()Example 6: Connection Pooling with Monitoring
import sqlite3
from queue import Queue
from threading import Lock, Thread
import time
class MonitoredConnectionPool:
def __init__(self, database, pool_size=5):
self.database = database
self.pool = Queue(maxsize=pool_size)
self.lock = Lock()
self.stats = {
'connections_created': 0,
'connections_reused': 0,
'queries_executed': 0,
'total_query_time': 0,
}
# Create initial connections
for _ in range(pool_size):
conn = sqlite3.connect(database, check_same_thread=False)
self.pool.put(conn)
self.stats['connections_created'] += 1
def get_connection(self):
"""Get connection from pool"""
conn = self.pool.get()
self.stats['connections_reused'] += 1
return conn
def return_connection(self, conn):
"""Return connection to pool"""
self.pool.put(conn)
def execute_query(self, query, params=()):
"""Execute query with monitoring"""
conn = self.get_connection()
try:
start = time.perf_counter()
cursor = conn.cursor()
cursor.execute(query, params)
elapsed = time.perf_counter() - start
self.stats['queries_executed'] += 1
self.stats['total_query_time'] += elapsed
return cursor.fetchall(), elapsed
finally:
self.return_connection(conn)
def print_stats(self):
"""Print connection pool statistics"""
with self.lock:
avg_time = (self.stats['total_query_time'] /
self.stats['queries_executed'] if self.stats['queries_executed'] > 0 else 0)
print(f"""
Connection Pool Statistics:
Connections created: {self.stats['connections_created']}
Connections reused: {self.stats['connections_reused']}
Queries executed: {self.stats['queries_executed']}
Total query time: {self.stats['total_query_time']:.3f}s
Average query time: {avg_time*1000:.3f}ms
""")
# Usage
pool = MonitoredConnectionPool('bookstore.db', pool_size=5)
# Execute queries through pool
for i in range(10):
results, elapsed = pool.execute_query("SELECT COUNT(*) FROM books")
pool.print_stats()Example 7: Optimized Batch Processing
import sqlite3
def optimize_batch_operation(conn, data, batch_size=10000):
"""Optimized batch operation with multiple techniques"""
cursor = conn.cursor()
# Disable synchronous mode for faster writes
cursor.execute("PRAGMA synchronous = OFF")
# Use memory journal for better performance
cursor.execute("PRAGMA journal_mode = MEMORY")
# Increase cache size
cursor.execute("PRAGMA cache_size = -64000") # 64MB
total = 0
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
# Use transaction for batch
cursor.execute("BEGIN TRANSACTION")
try:
cursor.executemany(
"INSERT INTO items (name, value) VALUES (?, ?)",
batch
)
cursor.execute("COMMIT")
total += len(batch)
print(f"Inserted {total} records...")
except Exception as e:
cursor.execute("ROLLBACK")
print(f"Error: {e}")
raise
# Re-enable safety features
cursor.execute("PRAGMA synchronous = FULL")
cursor.execute("PRAGMA journal_mode = DELETE")
return total
# Usage
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
cursor.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, value INTEGER)")
conn.commit()
# Generate test data
data = [(f'Item {i}', i) for i in range(50000)]
total = optimize_batch_operation(conn, data, batch_size=5000)
print(f"Total inserted: {total}")
conn.close()Become a database expert:
Resources
Ojasa Mirai
Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.
Learn Deep โข Build Real โข Verify Skills โข Launch Forward