Ojasa Mirai

Ojasa Mirai

Python

Loading...

Learning Level

🟢 Beginner🔵 Advanced
REST API BasicsHTTP RequestsStatus CodesJSON SerializationError HandlingAPI AuthenticationRate LimitingBuilding APIsWeb Scraping Basics
Python/Apis Json/Webscraping Basics

🕷️ Advanced Web Scraping — Large-Scale Data Extraction

Scale beyond simple parsing. Master JavaScript rendering, distributed scraping, proxy rotation, and building production-grade data pipelines with Scrapy and Selenium.


🤖 Selenium for JavaScript-Heavy Sites

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

# Headless browser setup
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')

driver = webdriver.Chrome(options=chrome_options)

try:
    # Navigate to page
    driver.get('https://example.com/dynamic-content')

    # Wait for element to load (up to 10 seconds)
    element = WebDriverWait(driver, 10).until(
        EC.presence_of_all_elements_located((By.CLASS_NAME, 'product'))
    )

    # Extract data
    products = []
    for item in driver.find_elements(By.CLASS_NAME, 'product'):
        product = {
            'title': item.find_element(By.CLASS_NAME, 'title').text,
            'price': item.find_element(By.CLASS_NAME, 'price').text,
            'url': item.find_element(By.TAG_NAME, 'a').get_attribute('href')
        }
        products.append(product)

    print(f"Scraped {len(products)} products")

finally:
    driver.quit()

🔄 Scrapy Framework

import scrapy
from scrapy.crawler import CrawlerProcess

# Define spider
class ProductSpider(scrapy.Spider):
    """Scrapy spider for product scraping"""

    name = 'products'
    start_urls = ['https://example.com/products']

    def parse(self, response):
        """Parse product pages"""

        # Extract product links
        for product in response.css('div.product'):
            yield scrapy.Request(
                url=product.css('a::attr(href)').get(),
                callback=self.parse_product
            )

        # Handle pagination
        next_page = response.css('a.next::attr(href)').get()
        if next_page:
            yield scrapy.Request(url=next_page, callback=self.parse)

    def parse_product(self, response):
        """Parse individual product page"""

        yield {
            'name': response.css('h1::text').get(),
            'price': response.css('span.price::text').get(),
            'description': response.css('div.description::text').getall(),
            'rating': response.css('span.rating::attr(data-rating)').get(),
            'url': response.url
        }

# Run spider
process = CrawlerProcess({
    'USER_AGENT': 'Mozilla/5.0...',
    'ROBOTSTXT_OBEY': False
})

process.crawl(ProductSpider)
process.start()

🔀 Proxy Rotation

import requests
from itertools import cycle

class ProxyRotator:
    """Rotate through proxy list"""

    def __init__(self, proxies):
        self.proxy_pool = cycle(proxies)
        self.current_proxy = None

    def get_next_proxy(self):
        """Get next proxy from pool"""
        self.current_proxy = next(self.proxy_pool)
        return self.current_proxy

    def make_request(self, url, max_retries=3):
        """Make request with proxy rotation"""

        for attempt in range(max_retries):
            try:
                proxy = self.get_next_proxy()

                response = requests.get(
                    url,
                    proxies={
                        'http': proxy,
                        'https': proxy
                    },
                    timeout=5
                )

                if response.status_code == 200:
                    return response

            except requests.exceptions.RequestException:
                continue

        raise Exception(f'Failed after {max_retries} attempts')

# Usage
proxies = [
    'http://proxy1.com:8080',
    'http://proxy2.com:8080',
    'http://proxy3.com:8080'
]

rotator = ProxyRotator(proxies)

response = rotator.make_request('https://example.com')

🚫 Handling Anti-Scraping Measures

import requests
from time import sleep
import random

class RespectfulScraper:
    """Scraper that respects site limits and anti-scraping measures"""

    def __init__(self, base_url, delay_range=(2, 5)):
        self.base_url = base_url
        self.delay_range = delay_range
        self.session = requests.Session()

        # Realistic user agent rotation
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]

    def get_headers(self):
        """Get headers that look like real browser"""
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept-Encoding': 'gzip, deflate',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        }

    def fetch_url(self, url):
        """Fetch URL with delays and headers"""

        # Random delay between requests
        delay = random.uniform(*self.delay_range)
        sleep(delay)

        # Make request with realistic headers
        response = self.session.get(
            url,
            headers=self.get_headers(),
            timeout=10
        )

        return response

    def check_robots_txt(self):
        """Check robots.txt before scraping"""

        robots_url = f'{self.base_url}/robots.txt'

        try:
            response = requests.get(robots_url, timeout=5)
            print(f"robots.txt:\n{response.text}")

        except:
            print("Could not fetch robots.txt")

# Usage
scraper = RespectfulScraper('https://example.com', delay_range=(2, 5))

scraper.check_robots_txt()

response = scraper.fetch_url('https://example.com/page1')
print(response.status_code)

📊 Distributed Scraping with Celery

from celery import Celery
from bs4 import BeautifulSoup
import requests

app = Celery('scraper', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def scrape_url(self, url):
    """Celery task for scraping URL"""

    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, 'html.parser')

        data = {
            'url': url,
            'title': soup.title.string if soup.title else None,
            'links': [a['href'] for a in soup.find_all('a', href=True)],
            'text_length': len(soup.get_text())
        }

        return data

    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

@app.task
def process_urls(urls):
    """Process multiple URLs in parallel"""

    # Queue all scraping tasks
    job = group([scrape_url.s(url) for url in urls])

    # Execute all tasks in parallel
    result = job.apply_async()

    return result.get()

# Usage
urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page3'
]

# Start scraping
process_urls.delay(urls)

💾 Data Pipeline and Storage

from datetime import datetime
import sqlite3
import json

class ScrapingPipeline:
    """Process and store scraped data"""

    def __init__(self, db_path='scraped_data.db'):
        self.db_path = db_path
        self.setup_database()

    def setup_database(self):
        """Create database tables"""

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS scraped_items (
                id INTEGER PRIMARY KEY,
                source_url TEXT,
                title TEXT,
                data JSON,
                scraped_at TIMESTAMP,
                processed INTEGER DEFAULT 0
            )
        ''')

        conn.commit()
        conn.close()

    def add_item(self, source_url, title, data):
        """Add scraped item to database"""

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            INSERT INTO scraped_items (source_url, title, data, scraped_at)
            VALUES (?, ?, ?, ?)
        ''', (source_url, title, json.dumps(data), datetime.now()))

        conn.commit()
        conn.close()

    def get_unprocessed_items(self, limit=100):
        """Get items not yet processed"""

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            SELECT id, source_url, title, data
            FROM scraped_items
            WHERE processed = 0
            LIMIT ?
        ''', (limit,))

        items = cursor.fetchall()
        conn.close()

        return items

    def mark_processed(self, item_id):
        """Mark item as processed"""

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            UPDATE scraped_items
            SET processed = 1
            WHERE id = ?
        ''', (item_id,))

        conn.commit()
        conn.close()

# Usage
pipeline = ScrapingPipeline()

# Add scraped data
pipeline.add_item(
    'https://example.com/product/1',
    'Product Title',
    {'price': 99.99, 'rating': 4.5}
)

# Process data later
items = pipeline.get_unprocessed_items()
for item_id, url, title, data in items:
    # Process item
    print(f"Processing: {title}")
    pipeline.mark_processed(item_id)

🔍 JavaScript Execution with Playwright

from playwright.async_api import async_playwright
import asyncio

async def scrape_with_playwright():
    """Use Playwright for modern JavaScript-heavy sites"""

    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()

        # Navigate to page
        await page.goto('https://example.com')

        # Wait for dynamic content
        await page.wait_for_selector('.product', timeout=5000)

        # Execute JavaScript
        data = await page.evaluate('''
            () => {
                return {
                    title: document.title,
                    products: Array.from(document.querySelectorAll('.product')).map(p => ({
                        name: p.querySelector('.name').textContent,
                        price: p.querySelector('.price').textContent
                    }))
                }
            }
        ''')

        print(data)

        await browser.close()

# Run async scraper
asyncio.run(scrape_with_playwright())

✅ Key Takeaways

ToolUse CaseAdvantage
BeautifulSoupStatic HTMLSimple, lightweight
SeleniumJavaScript heavyHandles dynamic content
ScrapyLarge scaleFramework, distributed
PlaywrightModern sitesFast, reliable
Proxy RotationAnti-scrapingAvoid blocking
CeleryDistributedParallel processing
PipelineData storageOrganized processing

🎓 Conclusion

You've mastered APIs and web scraping:

1. ✅ REST API fundamentals

2. ✅ HTTP communication

3. ✅ JSON serialization

4. ✅ Error handling

5. ✅ Authentication

6. ✅ Rate limiting

7. ✅ Building APIs

8. ✅ Web scraping

Next: Apply these skills to real-world projects!


Ready for advanced challenges? Try advanced challenges


Resources

Python Docs

Ojasa Mirai

Master AI-powered development skills through structured learning, real projects, and verified credentials. Whether you're upskilling your team or launching your career, we deliver the skills companies actually need.

Learn Deep • Build Real • Verify Skills • Launch Forward

Courses

PythonFastapiReactJSCloud

© 2026 Ojasa Mirai. All rights reserved.

TwitterGitHubLinkedIn