Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
📊 Why Databases Matter🗄️ SQLite Basics📋 Creating Tables➕ Inserting Data🔍 Querying Data✏️ Updating & Deleting🔗 Joins & Relations🐍 SQLite with Python⚡ Performance & Best Practices
Python/Database Basics/Sqlite With Python

🚀 Advanced Python SQLite Patterns

Scale your SQLite usage with connection pooling, async operations, caching, and custom type adapters for production-grade applications.

🔌 Connection Pooling

Reuse database connections to improve performance and resource efficiency.

Example 1: Simple Connection Pool

import sqlite3
from queue import Queue
from threading import Lock

class SQLitePool:
    def __init__(self, database, pool_size=5):
        self.database = database
        self.pool = Queue(maxsize=pool_size)
        self.lock = Lock()
        
        # Pre-create connections
        for _ in range(pool_size):
            conn = sqlite3.connect(database, check_same_thread=False)
            conn.row_factory = sqlite3.Row
            self.pool.put(conn)
    
    def get_connection(self):
        """Get a connection from the pool"""
        return self.pool.get()
    
    def return_connection(self, conn):
        """Return a connection to the pool"""
        self.pool.put(conn)
    
    def close_all(self):
        """Close all connections in the pool"""
        while not self.pool.empty():
            conn = self.pool.get()
            conn.close()

# Usage
pool = SQLitePool('bookstore.db', pool_size=10)

try:
    conn = pool.get_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM books")
    count = cursor.fetchone()[0]
    print(f"Total books: {count}")
finally:
    pool.return_connection(conn)

pool.close_all()

📝 Custom Type Adapters and Converters

Map Python objects to SQL types and vice versa.

Example 2: Custom Adapter for Objects

import sqlite3
from datetime import datetime
import json

class Book:
    def __init__(self, title, author, price):
        self.title = title
        self.author = author
        self.price = price
    
    def __repr__(self):
        return f"Book('{self.title}', '{self.author}', ${self.price})"

# Register adapter to convert Book to string for storage
def book_adapter(book):
    return json.dumps({
        'title': book.title,
        'author': book.author,
        'price': book.price
    })

sqlite3.register_adapter(Book, book_adapter)

# Register converter to restore Book from stored string
def book_converter(json_string):
    data = json.loads(json_string)
    return Book(data['title'], data['author'], data['price'])

sqlite3.register_converter("book", book_converter)

# Usage
conn = sqlite3.connect(':memory:', detect_types=sqlite3.PARSE_DECLTYPES)
cursor = conn.cursor()

cursor.execute("CREATE TABLE books (id INTEGER PRIMARY KEY, data book)")
book = Book("1984", "George Orwell", 19.99)
cursor.execute("INSERT INTO books (data) VALUES (?)", (book,))
conn.commit()

cursor.execute("SELECT data FROM books")
retrieved_book = cursor.fetchone()[0]
print(f"Retrieved: {retrieved_book}")

conn.close()

Example 3: Custom Converter for Dates

import sqlite3
from datetime import datetime

# Register adapter for datetime to ISO format
sqlite3.register_adapter(datetime, lambda d: d.isoformat())

# Register converter to restore datetime
def datetime_converter(date_string):
    return datetime.fromisoformat(date_string.decode() if isinstance(date_string, bytes) else date_string)

sqlite3.register_converter("timestamp", datetime_converter)

# Usage
conn = sqlite3.connect(':memory:', detect_types=sqlite3.PARSE_DECLTYPES)
cursor = conn.cursor()

cursor.execute("CREATE TABLE events (id INTEGER PRIMARY KEY, event_date timestamp)")

now = datetime.now()
cursor.execute("INSERT INTO events (event_date) VALUES (?)", (now,))
conn.commit()

cursor.execute("SELECT event_date FROM events")
retrieved_date = cursor.fetchone()[0]
print(f"Stored: {now}")
print(f"Retrieved: {retrieved_date}")
print(f"Type: {type(retrieved_date)}")

conn.close()

⚡ Async Operations with asyncio and aiosqlite

Non-blocking database operations for async applications.

Example 4: Async Database Operations

import asyncio
import aiosqlite

async def setup_database():
    # Create tables asynchronously
    async with aiosqlite.connect('async_db.db') as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE
            )
        """)
        await db.commit()
        print("Database setup complete")

async def insert_user(name, email):
    async with aiosqlite.connect('async_db.db') as db:
        await db.execute(
            "INSERT INTO users (name, email) VALUES (?, ?)",
            (name, email)
        )
        await db.commit()
        print(f"Inserted user: {name}")

async def query_users():
    async with aiosqlite.connect('async_db.db') as db:
        db.row_factory = aiosqlite.Row
        async with db.execute("SELECT * FROM users") as cursor:
            rows = await cursor.fetchall()
            for row in rows:
                print(f"  {row['name']} ({row['email']})")

async def main():
    await setup_database()
    
    # Insert multiple users concurrently
    await asyncio.gather(
        insert_user("Alice", "alice@example.com"),
        insert_user("Bob", "bob@example.com"),
        insert_user("Charlie", "charlie@example.com"),
    )
    
    print("All users:")
    await query_users()

# Run async operations
asyncio.run(main())

💾 Query Result Caching

Cache query results to reduce database load.

Example 5: Simple Query Cache

import sqlite3
import hashlib
import json
from functools import wraps
from time import time

class QueryCache:
    def __init__(self, ttl_seconds=300):
        self.cache = {}
        self.ttl = ttl_seconds
    
    def _make_key(self, query, params):
        """Generate cache key from query and parameters"""
        content = f"{query}:{json.dumps(params, default=str)}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def get(self, query, params):
        """Get cached result if valid"""
        key = self._make_key(query, params)
        if key in self.cache:
            result, timestamp = self.cache[key]
            if time() - timestamp < self.ttl:
                print(f"Cache hit for: {query[:50]}...")
                return result
            else:
                # Expired
                del self.cache[key]
        return None
    
    def set(self, query, params, result):
        """Cache query result"""
        key = self._make_key(query, params)
        self.cache[key] = (result, time())
    
    def clear(self):
        """Clear entire cache"""
        self.cache.clear()

# Usage
cache = QueryCache(ttl_seconds=60)
conn = sqlite3.connect('bookstore.db')
cursor = conn.cursor()

def cached_query(query, params=()):
    # Check cache first
    cached_result = cache.get(query, params)
    if cached_result is not None:
        return cached_result
    
    # Execute query if not cached
    cursor.execute(query, params)
    result = cursor.fetchall()
    cache.set(query, params, result)
    print(f"Query executed and cached: {query[:50]}...")
    return result

# First call: executes query
result1 = cached_query("SELECT * FROM books WHERE price > ?", (20,))

# Second call: returns from cache
result2 = cached_query("SELECT * FROM books WHERE price > ?", (20,))

conn.close()

🔐 Prepared Statements and Statement Caching

Prepare and reuse compiled statements for better performance.

Example 6: Statement Caching

import sqlite3

class PreparedStatements:
    def __init__(self, database):
        self.conn = sqlite3.connect(database)
        self.statements = {}
    
    def prepare(self, name, query):
        """Prepare and cache a statement"""
        if name not in self.statements:
            self.statements[name] = self.conn.cursor()
        return self.statements[name]
    
    def execute(self, name, query, params=()):
        """Execute prepared statement"""
        cursor = self.prepare(name, query)
        cursor.execute(query, params)
        return cursor
    
    def fetchall(self, name, query, params=()):
        """Execute and fetch all"""
        cursor = self.execute(name, query, params)
        return cursor.fetchall()
    
    def close(self):
        """Close all statements and connection"""
        for cursor in self.statements.values():
            cursor.close()
        self.conn.close()

# Usage
ps = PreparedStatements('bookstore.db')

# Reuse the same prepared statement multiple times
results1 = ps.fetchall(
    'get_books_by_price',
    "SELECT title FROM books WHERE price > ?",
    (20,)
)

results2 = ps.fetchall(
    'get_books_by_price',
    "SELECT title FROM books WHERE price > ?",
    (30,)
)

print(f"Books over $20: {len(results1)}")
print(f"Books over $30: {len(results2)}")

ps.close()

📊 Bulk Operations and Batch Processing

Optimize performance for large-scale data operations.

Example 7: Bulk Insert with Optimization

import sqlite3
import time

def bulk_insert_naive(conn, data):
    """Slower approach: commit after each insert"""
    cursor = conn.cursor()
    start = time.time()
    
    for item in data:
        cursor.execute(
            "INSERT INTO items (name, value) VALUES (?, ?)",
            (item['name'], item['value'])
        )
        conn.commit()  # Too many commits!
    
    return time.time() - start

def bulk_insert_optimized(conn, data):
    """Faster approach: batch commit"""
    cursor = conn.cursor()
    start = time.time()
    
    # Insert all data
    cursor.executemany(
        "INSERT INTO items (name, value) VALUES (?, ?)",
        [(item['name'], item['value']) for item in data]
    )
    
    # Single commit
    conn.commit()
    
    return time.time() - start

# Test data
test_data = [{'name': f'Item {i}', 'value': i} for i in range(10000)]

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, value INTEGER)")

# Compare performance
naive_time = bulk_insert_naive(conn, test_data)
print(f"Naive approach: {naive_time:.4f}s")

# Reset for optimized test
conn.execute("DELETE FROM items")
optimized_time = bulk_insert_optimized(conn, test_data)
print(f"Optimized approach: {optimized_time:.4f}s")
print(f"Speedup: {naive_time/optimized_time:.1f}x faster")

conn.close()

Example 8: Batch Processing with Progress

import sqlite3
from itertools import islice

def batch_insert_with_progress(conn, data, batch_size=1000):
    """Insert data in batches with progress indication"""
    cursor = conn.cursor()
    total = 0
    
    # Process in batches
    while True:
        batch = list(islice(data, batch_size))
        if not batch:
            break
        
        cursor.executemany(
            "INSERT INTO items (name, value) VALUES (?, ?)",
            batch
        )
        conn.commit()
        
        total += len(batch)
        print(f"Inserted {total} records...")
    
    print(f"Batch insert complete: {total} total records")

# Usage
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, value INTEGER)")

# Generator for large dataset
def data_generator():
    for i in range(100000):
        yield (f'Item {i}', i)

batch_insert_with_progress(conn, data_generator(), batch_size=5000)
conn.close()

🔍 Query Analysis and Optimization

Profile and optimize database queries.

Example 9: Query Profiling

import sqlite3
import time

conn = sqlite3.connect('bookstore.db')
cursor = conn.cursor()

# Enable profiling
conn.set_trace(lambda stmt: print(f"SQL: {stmt}"))

# Query with EXPLAIN QUERY PLAN
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM books WHERE author_id = 5")
plan = cursor.fetchall()
print("Query Execution Plan:")
for row in plan:
    print(f"  {row}")

# Time complex query
start = time.time()
cursor.execute("""
    SELECT b.title, a.name, COUNT(r.review_id) as review_count
    FROM books b
    JOIN authors a ON b.author_id = a.author_id
    LEFT JOIN reviews r ON b.book_id = r.book_id
    GROUP BY b.book_id
    ORDER BY review_count DESC
""")
results = cursor.fetchall()
elapsed = time.time() - start

print(f"Query returned {len(results)} rows in {elapsed:.4f}s")

conn.close()

🔑 Key Takeaways

  • **Connection pooling** improves performance for concurrent applications
  • **Custom adapters and converters** enable seamless Python-to-SQL type mapping
  • **Async operations** (aiosqlite) support non-blocking database access
  • **Query caching** reduces database load for frequently accessed data
  • **Prepared statements** can be reused for better performance
  • **Batch operations** with executemany() are significantly faster than individual inserts
  • **Query profiling** with EXPLAIN QUERY PLAN identifies performance bottlenecks
  • **Transactions and commits** should be minimized for bulk operations

📚 Further Learning

Master performance optimization:

  • [Performance Optimization](/courses/python/database_basics/advanced/database_performance) - Query profiling and index design
  • [Advanced Querying](/courses/python/database_basics/advanced/querying_data) - Complex SQL patterns
  • [Challenges & Quizzes](/#) - Practice advanced SQLite patterns

Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn