Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
📖 File Fundamentals📖 Reading Files Effectively✍️ Writing Files Correctly🗂️ Working with File Paths🤝 Context Managers & Safety📊 CSV Data Processing🔄 JSON Parsing & Serialization🔐 Binary Files & Encoding⚙️ Performance & Best Practices
Python/File Io/Writing Files

✍️ Advanced Writing Strategies — Safe and Efficient

Master atomic writes, async I/O, and concurrent file operations for robustness and performance.


🎯 Atomic File Writing

Atomic writes prevent data corruption if program crashes mid-write.

import tempfile
import os
from pathlib import Path

def atomic_write(file_path, content):
    """Write file atomically (all or nothing)"""
    file_path = Path(file_path)

    # Write to temporary file
    with tempfile.NamedTemporaryFile(
        mode="w", dir=file_path.parent, delete=False
    ) as tmp:
        tmp.write(content)
        tmp_name = tmp.name

    # Atomic rename (all at once, no partial writes)
    Path(tmp_name).replace(file_path)

# Safe even if crash occurs
atomic_write("config.json", '{"safe": true}')

💡 Async File I/O for Concurrency

import asyncio

async def read_file_async(path):
    """Non-blocking file read using asyncio"""
    # asyncio doesn't have native async file I/O
    # Use thread pool instead
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, open(path).read)

async def process_multiple_files(file_paths):
    """Read multiple files concurrently"""
    tasks = [read_file_async(path) for path in file_paths]
    results = await asyncio.gather(*tasks)
    return results

# Concurrent execution
asyncio.run(process_multiple_files(['file1.txt', 'file2.txt']))

🎨 Batch Writing Optimization

import io

def batch_write_optimized(file_path, data_generator, buffer_size=1024*1024):
    """Write with buffer accumulation"""
    buffer = io.StringIO()
    buffer_count = 0

    with open(file_path, "w") as file:
        for item in data_generator:
            buffer.write(str(item) + "\n")
            buffer_count += 1

            # Flush when buffer reaches size
            if buffer.getvalue().__sizeof__() > buffer_size:
                file.write(buffer.getvalue())
                buffer = io.StringIO()

        # Final flush
        if buffer_count > 0:
            file.write(buffer.getvalue())

# Benchmark: 10000 writes
def generator():
    for i in range(10000):
        yield f"Line {i}"

import timeit

# Individual writes (slow)
slow = timeit.timeit(
    lambda: write_slow('out.txt', generator()),
    number=1
)

# Buffered writes (fast)
fast = timeit.timeit(
    lambda: batch_write_optimized('out.txt', generator()),
    number=1
)

print(f"Speedup: {slow/fast:.1f}x")

📊 Performance Comparison

MethodSpeedSafetyUse Case
Individual writesSlowMediumInteractive
Buffered writesFastMediumBatch
Async I/OMediumMediumConcurrent
Atomic writesSlowBestCritical data

🔑 Key Takeaways

  • ✅ Atomic writes for data integrity
  • ✅ Batch buffering for performance
  • ✅ Async I/O for concurrency
  • ✅ Temporary files prevent corruption
  • ✅ Flush strategically based on data importance

Ready to practice? Challenges | Quiz


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn