← Back to blog

Storing Scraped Data: SQLite, PostgreSQL, CSV, and JSON Compared (2026)

Storing Scraped Data: SQLite, PostgreSQL, CSV, and JSON Compared (2026)

Your scraper works. Data is flowing. Now where does it go?

This sounds like a trivial decision until you're three weeks into a project, your CSV has 400K rows with mangled unicode, duplicate entries everywhere, and you can't tell which URLs you've already visited. Storage choice matters more than most scrapers realize — it determines whether your scraper can resume after crashes, whether you can deduplicate records, and whether anyone else can actually query the data you've collected.

The wrong choice doesn't just cause inconvenience. It can cost you days of wasted scraping time when a partial run overwrites your complete dataset, or weeks of cleanup when you realize your flat file has no way to handle the schema evolution that happens as target sites change their HTML structure. I've seen teams lose weeks of collected data because they stored everything in a single JSON file that got corrupted on a crash. I've watched scrapers re-fetch 50,000 pages because there was no deduplication layer to say "you already have this one."

This guide covers the four most common storage options for scraped data — CSV, JSON/JSONL, SQLite, and PostgreSQL — with complete, working Python code for each. We'll cover deduplication patterns, incremental scraping, schema design, error handling, and the real-world tradeoffs that determine which option fits your project. By the end, you'll have a clear decision framework and copy-paste code for whichever storage backend you choose.

Whether you're scraping product listings for price monitoring, collecting research data from academic sites, or building datasets for machine learning, the storage layer is what turns raw HTML extractions into usable, queryable data. Let's get it right from the start.

The Four Options at a Glance

Format Best for Worst for Max practical size Dedup support
CSV One-off scripts, Excel handoff, flat data Nested data, deduplication, large scale ~500K rows None built-in
JSON/JSONL Semi-structured data, nested fields, streaming Queries, deduplication, tabular analysis ~1M records Manual only
SQLite Local projects, dedup, incremental scraping Team access, concurrent writes ~10M rows Built-in (PRIMARY KEY)
PostgreSQL Team access, large scale, cloud deployment Quick scripts, zero-setup needs Billions of rows Built-in (UPSERT)

Pick based on what happens after scraping, not during. If someone's opening the output in Excel — CSV. If you're building a pipeline that runs daily and must skip already-scraped URLs — SQLite minimum.

CSV: Simple but Fragile

Basic CSV Storage

For a one-off scrape where you just need a file to hand off:

import csv
import os
from datetime import datetime

def save_to_csv(records, filename="output.csv"):
    """Append records to CSV, creating headers if file is new."""
    if not records:
        return

    file_exists = os.path.exists(filename)

    with open(filename, "a", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=records[0].keys())
        if not file_exists or os.path.getsize(filename) == 0:
            writer.writeheader()
        writer.writerows(records)
    print(f"Saved {len(records)} records to {filename}")

The utf-8-sig encoding adds a BOM (Byte Order Mark) that tells Excel to read UTF-8 correctly. Without it, names with accents, non-Latin characters, or emoji turn into garbage when someone double-clicks the file in Excel. Small detail, saves you an email from whoever opens the file.

Handling Edge Cases in CSV

CSV has several gotchas that bite scrapers:

import csv
import re

def clean_for_csv(value):
    """Clean a value for safe CSV storage."""
    if value is None:
        return ""
    if isinstance(value, str):
        # Remove control characters that break CSV parsers
        value = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', value)
        # Normalize whitespace (newlines in cells cause issues)
        value = ' '.join(value.split())
        # Truncate extremely long values
        if len(value) > 32000:
            value = value[:32000] + "... [truncated]"
    return value

def save_products_csv(products, filename="products.csv"):
    """Save product data with proper cleaning and error handling."""
    if not products:
        print("No products to save")
        return

    fieldnames = ["url", "title", "price", "currency", "description",
                  "category", "rating", "review_count", "scraped_at"]

    file_exists = os.path.exists(filename) and os.path.getsize(filename) > 0

    with open(filename, "a", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(
            f, fieldnames=fieldnames,
            extrasaction="ignore",  # Skip fields not in fieldnames
            quoting=csv.QUOTE_ALL   # Quote everything to avoid delimiter issues
        )
        if not file_exists:
            writer.writeheader()

        for product in products:
            cleaned = {k: clean_for_csv(v) for k, v in product.items()}
            cleaned["scraped_at"] = datetime.now().isoformat()
            writer.writerow(cleaned)

    print(f"Appended {len(products)} products to {filename}")

Reading CSV Back Efficiently

When you need to process a large CSV without loading it all into memory:

def read_csv_lazy(filename):
    """Read CSV lazily — one row at a time, no memory explosion."""
    with open(filename, "r", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

def csv_to_dataframe(filename):
    """Read CSV into pandas with proper type handling."""
    import pandas as pd
    df = pd.read_csv(
        filename,
        encoding="utf-8-sig",
        dtype={"price": float, "review_count": "Int64"},
        parse_dates=["scraped_at"],
        na_values=["", "N/A", "null", "None"]
    )
    return df

# Process a large CSV in chunks
def process_large_csv(filename, chunk_size=10000):
    """Process a huge CSV in memory-efficient chunks."""
    import pandas as pd
    total_rows = 0
    for chunk in pd.read_csv(filename, chunksize=chunk_size, encoding="utf-8-sig"):
        # Process each chunk
        total_rows += len(chunk)
        print(f"Processed {total_rows} rows so far...")
        # Do analysis on chunk here
    return total_rows

When CSV Falls Apart

CSV falls apart when your data has:

If any of these apply, move to JSONL or SQLite.

JSON and JSONL: Flexible Structure

Regular JSON vs JSONL

Regular JSON stores everything in one array:

[{"title": "Product A", "price": 29.99}, {"title": "Product B", "price": 39.99}]

To append a record, you must read the entire file, parse it, append to the list, and write it all back. With a 2GB file, that means loading 2GB into RAM just to add one record.

JSONL (JSON Lines) puts one JSON object per line:

{"title": "Product A", "price": 29.99}
{"title": "Product B", "price": 39.99}

You can append one record at a time. Your scraper can crash at any point and you don't lose everything before the crash. This is the format you want for web scraping.

JSONL Writer with Rotation and Validation

import json
import os
from datetime import datetime

class JSONLWriter:
    """Append-mode JSONL writer with validation and file rotation."""

    def __init__(self, filename="output.jsonl", max_size_mb=500):
        self.filename = filename
        self.max_size_bytes = max_size_mb * 1024 * 1024
        self.records_written = 0

    def _get_current_file(self):
        """Rotate file if it exceeds max size."""
        if os.path.exists(self.filename):
            size = os.path.getsize(self.filename)
            if size > self.max_size_bytes:
                base, ext = os.path.splitext(self.filename)
                i = 1
                while os.path.exists(f"{base}.{i}{ext}"):
                    i += 1
                os.rename(self.filename, f"{base}.{i}{ext}")
                print(f"Rotated {self.filename} -> {base}.{i}{ext}")
        return self.filename

    def write(self, record):
        """Write a single record to JSONL file."""
        if not isinstance(record, dict):
            raise ValueError("Record must be a dictionary")

        record["_scraped_at"] = datetime.now().isoformat()

        filepath = self._get_current_file()
        with open(filepath, "a", encoding="utf-8") as f:
            f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
        self.records_written += 1

    def write_batch(self, records):
        """Write multiple records efficiently."""
        filepath = self._get_current_file()
        with open(filepath, "a", encoding="utf-8") as f:
            for record in records:
                record["_scraped_at"] = datetime.now().isoformat()
                f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
        self.records_written += len(records)
        print(f"Wrote {len(records)} records (total: {self.records_written})")

# Usage
writer = JSONLWriter("products.jsonl")
writer.write({"url": "https://example.com/p/1", "title": "Widget", "price": 29.99})
writer.write_batch([
    {"url": "https://example.com/p/2", "title": "Gadget", "price": 49.99},
    {"url": "https://example.com/p/3", "title": "Doohickey", "price": 19.99},
])

JSONL Reader with Filtering and Error Recovery

def read_jsonl(filename, filter_fn=None):
    """Read JSONL lazily with optional filtering and error recovery."""
    with open(filename, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue
            try:
                record = json.loads(line)
                if filter_fn is None or filter_fn(record):
                    yield record
            except json.JSONDecodeError as e:
                print(f"Skipping malformed line {line_num}: {e}")

# Read all records
all_records = list(read_jsonl("products.jsonl"))

# Read only expensive products
expensive = list(read_jsonl(
    "products.jsonl",
    filter_fn=lambda r: r.get("price", 0) > 100
))

# Read products from a specific category
electronics = list(read_jsonl(
    "products.jsonl",
    filter_fn=lambda r: r.get("category") == "Electronics"
))

# Convert to pandas for analysis
import pandas as pd
df = pd.DataFrame(read_jsonl("products.jsonl"))
print(df.describe())

JSONL Deduplication (Post-Processing)

Since JSONL has no built-in dedup, you need to handle it yourself:

def deduplicate_jsonl(input_file, output_file, key_field="url"):
    """Remove duplicate records from a JSONL file based on a key field."""
    seen = set()
    dupes = 0
    kept = 0

    with open(output_file, "w", encoding="utf-8") as out:
        for record in read_jsonl(input_file):
            key = record.get(key_field)
            if key and key not in seen:
                seen.add(key)
                out.write(json.dumps(record, ensure_ascii=False) + "\n")
                kept += 1
            else:
                dupes += 1

    print(f"Kept {kept} records, removed {dupes} duplicates")
    return kept, dupes

Handling Nested Data — Where JSONL Shines

This is where JSONL dominates CSV:

# Product with variants, images, and nested specs — all stored naturally
product = {
    "url": "https://store.example.com/laptop-pro-16",
    "title": "Laptop Pro 16",
    "price": 1499.99,
    "variants": [
        {"sku": "LP16-256", "storage": "256GB", "price": 1499.99},
        {"sku": "LP16-512", "storage": "512GB", "price": 1699.99},
        {"sku": "LP16-1TB", "storage": "1TB", "price": 1999.99},
    ],
    "images": [
        "https://cdn.example.com/img/lp16-front.jpg",
        "https://cdn.example.com/img/lp16-side.jpg",
    ],
    "specs": {
        "display": "16 inch Retina",
        "cpu": "M3 Pro",
        "ram": "18GB",
        "battery": "22 hours",
    },
    "reviews_summary": {
        "average": 4.7,
        "count": 1243,
        "distribution": {"5": 890, "4": 210, "3": 80, "2": 40, "1": 23}
    }
}

writer = JSONLWriter("laptops.jsonl")
writer.write(product)

Try representing that in CSV. You'd end up with columns like variant_1_sku, variant_1_storage, variant_2_sku... it's a mess that breaks when a product has 4 variants instead of 3.

SQLite: The Sweet Spot for Most Scrapers

SQLite needs no server, ships with Python, and gives you the one thing flat files can't: deduplication and resume capability. This is where most scrapers should land.

Schema Design for Web Scraping

import sqlite3
from datetime import datetime
from contextlib import contextmanager

def init_db(db_path="scraper.db"):
    """Initialize database with production-quality schema for web scraping."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")  # Better concurrent read performance
    conn.execute("PRAGMA foreign_keys=ON")
    conn.execute("PRAGMA busy_timeout=5000")  # Wait up to 5s on locked DB

    conn.executescript("""
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            url TEXT UNIQUE NOT NULL,
            title TEXT,
            price REAL,
            currency TEXT DEFAULT 'USD',
            description TEXT,
            category TEXT,
            rating REAL,
            review_count INTEGER,
            in_stock INTEGER DEFAULT 1,
            raw_html TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_products_category ON products(category);
        CREATE INDEX IF NOT EXISTS idx_products_price ON products(price);
        CREATE INDEX IF NOT EXISTS idx_products_scraped ON products(scraped_at);

        CREATE TABLE IF NOT EXISTS product_images (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_url TEXT NOT NULL,
            image_url TEXT NOT NULL,
            position INTEGER DEFAULT 0,
            FOREIGN KEY (product_url) REFERENCES products(url),
            UNIQUE(product_url, image_url)
        );

        CREATE TABLE IF NOT EXISTS scrape_log (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            url TEXT NOT NULL,
            status_code INTEGER,
            success INTEGER DEFAULT 1,
            error_message TEXT,
            response_time_ms INTEGER,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS scrape_queue (
            url TEXT PRIMARY KEY,
            priority INTEGER DEFAULT 0,
            added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            claimed_at TIMESTAMP,
            completed_at TIMESTAMP,
            retry_count INTEGER DEFAULT 0
        );
    """)
    conn.commit()
    return conn

@contextmanager
def get_db(db_path="scraper.db"):
    """Context manager for database connections."""
    conn = init_db(db_path)
    try:
        yield conn
    finally:
        conn.close()

The Core Deduplication Pattern

The INSERT ... ON CONFLICT pattern is the heart of any serious scraper:

def save_product(conn, product):
    """Save or update a product with deduplication."""
    conn.execute("""
        INSERT INTO products (url, title, price, currency, description,
                              category, rating, review_count, in_stock)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(url) DO UPDATE SET
            title = excluded.title,
            price = excluded.price,
            description = excluded.description,
            rating = excluded.rating,
            review_count = excluded.review_count,
            in_stock = excluded.in_stock,
            updated_at = CURRENT_TIMESTAMP
    """, (
        product["url"], product.get("title"), product.get("price"),
        product.get("currency", "USD"), product.get("description"),
        product.get("category"), product.get("rating"),
        product.get("review_count"), product.get("in_stock", True)
    ))
    conn.commit()

def save_products_batch(conn, products):
    """Batch insert with deduplication — 10x faster than individual inserts."""
    conn.executemany("""
        INSERT INTO products (url, title, price, currency, description,
                              category, rating, review_count)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(url) DO UPDATE SET
            title = excluded.title,
            price = excluded.price,
            updated_at = CURRENT_TIMESTAMP
    """, [
        (p["url"], p.get("title"), p.get("price"), p.get("currency", "USD"),
         p.get("description"), p.get("category"), p.get("rating"),
         p.get("review_count"))
        for p in products
    ])
    conn.commit()
    print(f"Upserted {len(products)} products")

def already_scraped(conn, url):
    """Check if a URL has been scraped recently."""
    row = conn.execute(
        "SELECT 1 FROM products WHERE url = ?", (url,)
    ).fetchone()
    return row is not None

def get_stale_urls(conn, hours=24):
    """Find products that haven't been updated recently — for re-scraping."""
    return [row[0] for row in conn.execute("""
        SELECT url FROM products
        WHERE updated_at < datetime('now', ? || ' hours')
        ORDER BY updated_at ASC
    """, (f"-{hours}",)).fetchall()]

Resumable Scraping with a Queue

This is the pattern that makes your scraper production-ready. It can crash, get killed, or hit rate limits — and pick up exactly where it left off:

def add_to_queue(conn, urls, priority=0):
    """Add URLs to the scrape queue."""
    conn.executemany("""
        INSERT OR IGNORE INTO scrape_queue (url, priority)
        VALUES (?, ?)
    """, [(url, priority) for url in urls])
    conn.commit()
    print(f"Added {len(urls)} URLs to queue")

def get_next_url(conn):
    """Get the next URL to scrape (highest priority, oldest first)."""
    row = conn.execute("""
        SELECT url FROM scrape_queue
        WHERE completed_at IS NULL
          AND (claimed_at IS NULL OR claimed_at < datetime('now', '-10 minutes'))
          AND retry_count < 5
        ORDER BY priority DESC, added_at ASC
        LIMIT 1
    """).fetchone()
    if row:
        conn.execute(
            "UPDATE scrape_queue SET claimed_at = CURRENT_TIMESTAMP WHERE url = ?",
            (row[0],)
        )
        conn.commit()
        return row[0]
    return None

def mark_completed(conn, url):
    """Mark a URL as successfully scraped."""
    conn.execute(
        "UPDATE scrape_queue SET completed_at = CURRENT_TIMESTAMP WHERE url = ?",
        (url,)
    )
    conn.commit()

def mark_failed(conn, url, error):
    """Increment retry count for a failed URL so it can be retried."""
    conn.execute("""
        UPDATE scrape_queue
        SET claimed_at = NULL, retry_count = retry_count + 1
        WHERE url = ?
    """, (url,))
    conn.execute(
        "INSERT INTO scrape_log (url, success, error_message) VALUES (?, 0, ?)",
        (url, str(error))
    )
    conn.commit()

def queue_stats(conn):
    """Get overview of queue progress."""
    total = conn.execute("SELECT COUNT(*) FROM scrape_queue").fetchone()[0]
    done = conn.execute(
        "SELECT COUNT(*) FROM scrape_queue WHERE completed_at IS NOT NULL"
    ).fetchone()[0]
    failed = conn.execute(
        "SELECT COUNT(*) FROM scrape_queue WHERE retry_count >= 5"
    ).fetchone()[0]
    pending = total - done - failed
    print(f"Queue: {done}/{total} done, {pending} pending, {failed} permanently failed")
    return {"total": total, "done": done, "pending": pending, "failed": failed}

# Main scraping loop — fully resumable
def run_scraper(conn, scrape_fn):
    """Run scraper with automatic resume capability."""
    while True:
        url = get_next_url(conn)
        if not url:
            stats = queue_stats(conn)
            if stats["pending"] == 0:
                print("Queue empty — scraping complete")
                break
            print("No URLs available right now, waiting...")
            import time
            time.sleep(30)
            continue

        try:
            print(f"Scraping: {url}")
            product = scrape_fn(url)
            save_product(conn, product)
            mark_completed(conn, url)
        except Exception as e:
            print(f"Failed: {url} — {e}")
            mark_failed(conn, url, e)

Price History Tracking

A common use case — tracking price changes over time for deal alerts:

def init_price_tracking(conn):
    """Add price history table for monitoring."""
    conn.execute("""
        CREATE TABLE IF NOT EXISTS price_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_url TEXT NOT NULL,
            price REAL NOT NULL,
            currency TEXT DEFAULT 'USD',
            recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (product_url) REFERENCES products(url)
        )
    """)
    conn.execute(
        "CREATE INDEX IF NOT EXISTS idx_price_url ON price_history(product_url)"
    )
    conn.commit()

def record_price(conn, url, price, currency="USD"):
    """Record a price point, only if it actually changed."""
    last_price = conn.execute("""
        SELECT price FROM price_history
        WHERE product_url = ?
        ORDER BY recorded_at DESC LIMIT 1
    """, (url,)).fetchone()

    if last_price is None or abs(last_price[0] - price) > 0.01:
        conn.execute(
            "INSERT INTO price_history (product_url, price, currency) VALUES (?, ?, ?)",
            (url, price, currency)
        )
        conn.commit()
        if last_price:
            change = price - last_price[0]
            direction = "UP" if change > 0 else "DOWN"
            print(f"Price {direction}: ${last_price[0]:.2f} -> ${price:.2f} "
                  f"({'+' if change > 0 else ''}{change:.2f})")

def get_price_drops(conn, min_drop_pct=10):
    """Find products with significant recent price drops — deal finder."""
    return conn.execute("""
        WITH ranked AS (
            SELECT product_url, price,
                   LAG(price) OVER (
                       PARTITION BY product_url ORDER BY recorded_at
                   ) as prev_price,
                   recorded_at
            FROM price_history
        )
        SELECT product_url, prev_price, price,
               ROUND((prev_price - price) / prev_price * 100, 1) as drop_pct
        FROM ranked
        WHERE prev_price IS NOT NULL
          AND (prev_price - price) / prev_price * 100 >= ?
        ORDER BY drop_pct DESC
    """, (min_drop_pct,)).fetchall()

SQLite Analytics and Reporting

def scraping_stats(conn):
    """Get a full overview of scraping progress and health."""
    stats = {}
    stats["total_products"] = conn.execute(
        "SELECT COUNT(*) FROM products"
    ).fetchone()[0]
    stats["scraped_today"] = conn.execute("""
        SELECT COUNT(*) FROM products
        WHERE DATE(scraped_at) = DATE('now')
    """).fetchone()[0]
    stats["queue_remaining"] = conn.execute("""
        SELECT COUNT(*) FROM scrape_queue
        WHERE completed_at IS NULL
    """).fetchone()[0]
    stats["failed_today"] = conn.execute("""
        SELECT COUNT(*) FROM scrape_log
        WHERE success = 0 AND DATE(scraped_at) = DATE('now')
    """).fetchone()[0]
    stats["avg_price"] = conn.execute(
        "SELECT ROUND(AVG(price), 2) FROM products WHERE price > 0"
    ).fetchone()[0]
    stats["categories"] = conn.execute(
        "SELECT COUNT(DISTINCT category) FROM products WHERE category IS NOT NULL"
    ).fetchone()[0]

    for key, value in stats.items():
        print(f"  {key}: {value}")
    return stats

def category_breakdown(conn):
    """Product count and price stats by category."""
    return conn.execute("""
        SELECT category,
               COUNT(*) as count,
               ROUND(AVG(price), 2) as avg_price,
               ROUND(MIN(price), 2) as min_price,
               ROUND(MAX(price), 2) as max_price
        FROM products
        WHERE category IS NOT NULL AND price > 0
        GROUP BY category
        ORDER BY count DESC
    """).fetchall()

def export_to_csv(conn, query, filename):
    """Export any SQL query result to CSV."""
    import csv
    cur = conn.execute(query)
    columns = [desc[0] for desc in cur.description]
    rows = cur.fetchall()

    with open(filename, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.writer(f)
        writer.writerow(columns)
        writer.writerows(rows)
    print(f"Exported {len(rows)} rows to {filename}")

PostgreSQL: Team and Scale

When multiple people need to query the data, or you're deploying scrapers to the cloud, PostgreSQL is the move. The deduplication pattern is the same — ON CONFLICT DO NOTHING replaces INSERT OR IGNORE.

Setup and Connection Management

import psycopg2
from psycopg2.extras import execute_values, RealDictCursor
from contextlib import contextmanager

DATABASE_URL = "postgresql://scraper:password@localhost:5432/scraping"

@contextmanager
def get_pg_conn():
    """PostgreSQL connection with auto-cleanup."""
    conn = psycopg2.connect(DATABASE_URL)
    try:
        yield conn
    finally:
        conn.close()

def init_pg_schema(conn):
    """Create the scraping schema in PostgreSQL."""
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE IF NOT EXISTS products (
                url TEXT PRIMARY KEY,
                title TEXT,
                price NUMERIC(10, 2),
                currency TEXT DEFAULT 'USD',
                description TEXT,
                category TEXT,
                rating NUMERIC(3, 2),
                review_count INTEGER,
                in_stock BOOLEAN DEFAULT true,
                metadata JSONB DEFAULT '{}',
                scraped_at TIMESTAMPTZ DEFAULT NOW(),
                updated_at TIMESTAMPTZ DEFAULT NOW()
            );

            CREATE INDEX IF NOT EXISTS idx_products_category
                ON products(category);
            CREATE INDEX IF NOT EXISTS idx_products_metadata
                ON products USING GIN(metadata);
            CREATE INDEX IF NOT EXISTS idx_products_price
                ON products(price);

            CREATE TABLE IF NOT EXISTS price_history (
                id SERIAL PRIMARY KEY,
                product_url TEXT REFERENCES products(url),
                price NUMERIC(10, 2) NOT NULL,
                recorded_at TIMESTAMPTZ DEFAULT NOW()
            );

            CREATE INDEX IF NOT EXISTS idx_price_url_time
                ON price_history(product_url, recorded_at DESC);
        """)
    conn.commit()

Batch Upsert — Fast Bulk Operations

def upsert_products_pg(conn, products):
    """Batch upsert products — much faster than individual inserts."""
    import json
    with conn.cursor() as cur:
        execute_values(cur, """
            INSERT INTO products (url, title, price, currency, category, metadata)
            VALUES %s
            ON CONFLICT (url) DO UPDATE SET
                title = EXCLUDED.title,
                price = EXCLUDED.price,
                category = EXCLUDED.category,
                metadata = products.metadata || EXCLUDED.metadata,
                updated_at = NOW()
        """, [
            (p["url"], p.get("title"), p.get("price"),
             p.get("currency", "USD"), p.get("category"),
             json.dumps(p.get("metadata", {})))
            for p in products
        ])
    conn.commit()
    print(f"Upserted {len(products)} products")

JSONB for Flexible Metadata

PostgreSQL's JSONB type lets you store semi-structured data without schema changes — perfect for scraping where different pages have different fields:

import json

def save_with_metadata(conn, url, title, price, extra_data):
    """Store structured fields + flexible metadata in JSONB."""
    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO products (url, title, price, metadata)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (url) DO UPDATE SET
                title = EXCLUDED.title,
                price = EXCLUDED.price,
                metadata = products.metadata || EXCLUDED.metadata,
                updated_at = NOW()
        """, (url, title, price, json.dumps(extra_data)))
    conn.commit()

# Query JSONB fields — find products by metadata
def find_by_metadata(conn, key, value):
    """Find products by any metadata field."""
    with conn.cursor(cursor_factory=RealDictCursor) as cur:
        cur.execute("""
            SELECT url, title, price, metadata->>%s as field_value
            FROM products
            WHERE metadata->>%s = %s
        """, (key, key, value))
        return cur.fetchall()

# Full-text search within JSONB
def search_metadata(conn, search_term):
    """Search across all metadata fields."""
    with conn.cursor(cursor_factory=RealDictCursor) as cur:
        cur.execute("""
            SELECT url, title, price
            FROM products
            WHERE metadata::text ILIKE %s
        """, (f"%{search_term}%",))
        return cur.fetchall()

Async PostgreSQL with asyncpg

For async scrapers using httpx or aiohttp — critical for high-throughput scraping:

import asyncio
import asyncpg

async def init_pool():
    """Create a connection pool for async operations."""
    return await asyncpg.create_pool(
        DATABASE_URL,
        min_size=5,
        max_size=20
    )

async def save_product_async(pool, product):
    """Async product save with connection pool."""
    async with pool.acquire() as conn:
        await conn.execute("""
            INSERT INTO products (url, title, price, category)
            VALUES ($1, $2, $3, $4)
            ON CONFLICT (url) DO UPDATE SET
                title = EXCLUDED.title,
                price = EXCLUDED.price,
                updated_at = NOW()
        """, product["url"], product.get("title"),
             product.get("price"), product.get("category"))

async def scrape_and_store_async(pool, urls, scrape_fn):
    """Scrape multiple URLs concurrently with async storage."""
    semaphore = asyncio.Semaphore(10)  # Limit concurrency

    async def process_url(url):
        async with semaphore:
            product = await scrape_fn(url)
            await save_product_async(pool, product)

    await asyncio.gather(*[process_url(url) for url in urls])

Using Proxies with Your Storage Layer

When scraping at scale, you need reliable IP rotation to avoid blocks. Your storage layer should track which proxy was used for each request — this helps debug issues when certain IP ranges get blocked.

ThorData residential proxies integrate cleanly with any storage backend:

import requests
import time
import random

THORDATA_PROXY = "http://USERNAME:[email protected]:9000"

def scrape_with_tracking(conn, url):
    """Scrape via ThorData proxy with result logging."""
    proxies = {"http": THORDATA_PROXY, "https": THORDATA_PROXY}
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                       "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
    }

    time.sleep(random.uniform(1.5, 4.0))
    start = time.time()

    try:
        resp = requests.get(url, headers=headers, proxies=proxies, timeout=30)
        elapsed = int((time.time() - start) * 1000)
        resp.raise_for_status()

        conn.execute(
            "INSERT INTO scrape_log (url, status_code, success, response_time_ms) "
            "VALUES (?, ?, 1, ?)",
            (url, resp.status_code, elapsed)
        )
        conn.commit()
        return resp.text

    except requests.RequestException as e:
        elapsed = int((time.time() - start) * 1000)
        conn.execute(
            "INSERT INTO scrape_log (url, status_code, success, error_message, "
            "response_time_ms) VALUES (?, ?, 0, ?, ?)",
            (url, getattr(e.response, 'status_code', None), str(e), elapsed)
        )
        conn.commit()
        raise

def scrape_with_retry(conn, url, max_retries=3):
    """Scrape with exponential backoff through ThorData proxy rotation."""
    for attempt in range(max_retries):
        try:
            return scrape_with_tracking(conn, url)
        except requests.RequestException as e:
            wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1}/{max_retries} failed: {e}")
            if attempt < max_retries - 1:
                print(f"Retrying in {wait:.1f}s...")
                time.sleep(wait)
            else:
                raise

Using httpx with Async Proxy Rotation

import httpx
import asyncio

async def scrape_batch_with_proxies(conn, urls, proxy_url):
    """Scrape multiple URLs concurrently through ThorData proxies."""
    async with httpx.AsyncClient(
        proxy=proxy_url,
        timeout=30,
        headers={
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                          "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
            "Accept-Language": "en-US,en;q=0.9",
        },
        follow_redirects=True,
        limits=httpx.Limits(max_connections=10, max_keepalive_connections=5)
    ) as client:
        for url in urls:
            if already_scraped(conn, url):
                continue
            try:
                await asyncio.sleep(random.uniform(2, 5))
                resp = await client.get(url)
                resp.raise_for_status()
                product = parse_product(url, resp.text)
                save_product(conn, product)
                mark_completed(conn, url)
                print(f"OK: {url}")
            except httpx.HTTPError as e:
                mark_failed(conn, url, e)
                print(f"FAIL: {url} — {e}")

Anti-Detection: Headers, Delays, and Fingerprint Spoofing

Realistic Request Headers

Bare requests.get(url) sends a python-requests/2.31.0 User-Agent that screams "bot." Always set realistic headers:

import random

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
]

def get_headers(referer=None):
    """Generate realistic browser headers with rotation."""
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "DNT": "1",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "none",
        "Sec-Fetch-User": "?1",
        "Cache-Control": "max-age=0",
    }
    if referer:
        headers["Referer"] = referer
        headers["Sec-Fetch-Site"] = "same-origin"
    return headers

Smart Delay Patterns

Fixed delays are detectable. Real users have variable timing:

import time
import random

def human_delay(min_seconds=1.5, max_seconds=5.0):
    """Sleep for a random, human-like duration."""
    delay = random.uniform(min_seconds, max_seconds)
    # Occasionally take a longer pause (like a user reading a page)
    if random.random() < 0.1:
        delay += random.uniform(5, 15)
    time.sleep(delay)

def adaptive_delay(response_time_ms):
    """Adjust delay based on server response time — slow server = back off more."""
    base_delay = max(2.0, response_time_ms / 1000 * 2)
    jitter = random.uniform(0.5, 2.0)
    time.sleep(base_delay + jitter)

Output Schemas and Data Validation

Define what your scraped data should look like before you start scraping:

from dataclasses import dataclass, field, asdict
from typing import Optional
from datetime import datetime

@dataclass
class ScrapedProduct:
    """Schema for scraped product data."""
    url: str
    title: str
    price: Optional[float] = None
    currency: str = "USD"
    description: Optional[str] = None
    category: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    in_stock: bool = True
    images: list[str] = field(default_factory=list)
    variants: list[dict] = field(default_factory=list)
    scraped_at: str = field(default_factory=lambda: datetime.now().isoformat())

    def validate(self):
        """Validate before storage — catch data quality issues early."""
        errors = []
        if not self.url or not self.url.startswith("http"):
            errors.append(f"Invalid URL: {self.url}")
        if self.price is not None and self.price < 0:
            errors.append(f"Negative price: {self.price}")
        if self.rating is not None and not (0 <= self.rating <= 5):
            errors.append(f"Rating out of range: {self.rating}")
        if self.title and len(self.title) > 1000:
            errors.append(f"Title too long: {len(self.title)} chars")
        if errors:
            raise ValueError(f"Validation errors: {'; '.join(errors)}")
        return True

    def to_dict(self):
        return asdict(self)

    def to_csv_row(self):
        """Flatten for CSV output (skip nested fields)."""
        d = self.to_dict()
        d["images"] = "|".join(d["images"])
        d.pop("variants")
        return d

# Usage
product = ScrapedProduct(
    url="https://store.example.com/widget",
    title="Super Widget",
    price=29.99,
    category="Widgets",
    rating=4.5,
    review_count=128,
    images=["https://cdn.example.com/img1.jpg"]
)
product.validate()

Example Output Schemas

E-commerce product:

{
    "url": "https://store.example.com/product-123",
    "title": "Wireless Bluetooth Headphones",
    "price": 79.99,
    "currency": "USD",
    "description": "Noise-cancelling over-ear headphones with 30h battery",
    "category": "Electronics > Audio > Headphones",
    "rating": 4.3,
    "review_count": 2841,
    "in_stock": true,
    "images": [
        "https://cdn.example.com/products/123/main.jpg",
        "https://cdn.example.com/products/123/side.jpg"
    ],
    "variants": [
        {"color": "Black", "sku": "BT-HP-BLK", "price": 79.99, "in_stock": true},
        {"color": "White", "sku": "BT-HP-WHT", "price": 79.99, "in_stock": false}
    ],
    "scraped_at": "2026-03-30T14:22:33.456789"
}

Job listing:

{
    "url": "https://jobs.example.com/posting/456",
    "title": "Senior Python Developer",
    "company": "TechCorp",
    "location": "Remote (US)",
    "salary_min": 150000,
    "salary_max": 200000,
    "salary_currency": "USD",
    "employment_type": "full-time",
    "skills": ["Python", "FastAPI", "PostgreSQL", "Docker"],
    "posted_date": "2026-03-28",
    "scraped_at": "2026-03-30T14:22:33.456789"
}

Real estate listing:

{
    "url": "https://realty.example.com/listing/789",
    "title": "3-Bedroom Apartment in Downtown",
    "price": 450000,
    "currency": "USD",
    "property_type": "apartment",
    "bedrooms": 3,
    "bathrooms": 2,
    "area_sqft": 1200,
    "address": {
        "street": "123 Main St",
        "city": "Austin",
        "state": "TX",
        "zip": "78701"
    },
    "features": ["parking", "pool", "gym", "balcony"],
    "scraped_at": "2026-03-30T14:22:33.456789"
}

Error Handling and Data Integrity

Safe Data Extraction

from bs4 import BeautifulSoup
import re

def safe_extract(soup, selector, attribute=None, default=None):
    """Extract data with fallback — never crash on missing elements."""
    element = soup.select_one(selector)
    if element is None:
        return default
    if attribute:
        return element.get(attribute, default)
    text = element.get_text(strip=True)
    return text if text else default

def parse_price(price_str):
    """Parse price from various international formats."""
    if not price_str:
        return None
    cleaned = re.sub(r'[^\d.,]', '', price_str)
    if ',' in cleaned and '.' in cleaned:
        cleaned = cleaned.replace(',', '')  # 1,299.99 -> 1299.99
    elif ',' in cleaned and len(cleaned.split(',')[-1]) == 2:
        cleaned = cleaned.replace(',', '.')  # 29,99 -> 29.99
    elif ',' in cleaned:
        cleaned = cleaned.replace(',', '')  # 1,299 -> 1299
    try:
        return round(float(cleaned), 2)
    except ValueError:
        return None

Transaction Safety for Batch Operations

def atomic_batch_save(conn, products):
    """Save a batch atomically — all succeed or none do."""
    try:
        for product in products:
            conn.execute("""
                INSERT INTO products (url, title, price, category)
                VALUES (?, ?, ?, ?)
                ON CONFLICT(url) DO UPDATE SET
                    title = excluded.title,
                    price = excluded.price,
                    updated_at = CURRENT_TIMESTAMP
            """, (product["url"], product.get("title"),
                  product.get("price"), product.get("category")))
        conn.commit()
        print(f"Saved batch of {len(products)} products")
    except Exception as e:
        conn.rollback()
        print(f"Batch save failed, rolled back: {e}")
        raise

Real-World Use Cases with Code

1. Price Monitoring Dashboard

Track prices across multiple e-commerce sites and alert on drops:

def run_price_monitor(conn, product_urls):
    """Daily price check with change detection and alerting."""
    price_changes = []
    for url in product_urls:
        try:
            html = scrape_with_retry(conn, url)
            product = parse_product(url, html)
            if product.get("price"):
                record_price(conn, url, product["price"])
                save_product(conn, product)
        except Exception as e:
            print(f"Failed to check {url}: {e}")

    drops = get_price_drops(conn, min_drop_pct=15)
    if drops:
        print(f"\n{len(drops)} significant price drops detected!")
        for url, old, new, pct in drops:
            print(f"  {url}: ${old} -> ${new} (-{pct}%)")

2. Job Board Aggregator

def aggregate_jobs(conn, sources):
    """Scrape multiple job boards into unified storage."""
    for source_name, scrape_fn in sources.items():
        print(f"Scraping {source_name}...")
        jobs = scrape_fn()
        for job in jobs:
            job["source"] = source_name
            conn.execute("""
                INSERT INTO jobs (url, title, company, location, salary_min,
                                  salary_max, source, scraped_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
                ON CONFLICT(url) DO UPDATE SET
                    salary_min = excluded.salary_min,
                    salary_max = excluded.salary_max,
                    updated_at = CURRENT_TIMESTAMP
            """, (job["url"], job["title"], job["company"],
                  job["location"], job.get("salary_min"),
                  job.get("salary_max"), source_name))
        conn.commit()
        print(f"  Saved {len(jobs)} jobs from {source_name}")

3. Content Change Monitor

import hashlib

def monitor_pages(conn, urls):
    """Detect content changes on monitored pages."""
    changes = []
    for url in urls:
        html = scrape_with_retry(conn, url)
        content_hash = hashlib.sha256(html.encode()).hexdigest()

        last = conn.execute(
            "SELECT content_hash FROM snapshots WHERE url = ? "
            "ORDER BY scraped_at DESC LIMIT 1", (url,)
        ).fetchone()

        if last is None or last[0] != content_hash:
            conn.execute(
                "INSERT INTO snapshots (url, content, content_hash) VALUES (?, ?, ?)",
                (url, html, content_hash)
            )
            conn.commit()
            if last:
                changes.append(url)
                print(f"CHANGED: {url}")

    return changes

4. Research Data Pipeline

def research_pipeline(conn, queries, output_csv="research_data.csv"):
    """Collect, deduplicate, and export research data."""
    for query in queries:
        results = search_and_scrape(query)
        for r in results:
            conn.execute("""
                INSERT INTO research (url, title, content, query, source)
                VALUES (?, ?, ?, ?, ?)
                ON CONFLICT(url) DO UPDATE SET updated_at = CURRENT_TIMESTAMP
            """, (r["url"], r["title"], r["content"], query, r["source"]))
        conn.commit()
        print(f"Collected {len(results)} results for '{query}'")

    # Export deduplicated results
    export_to_csv(conn, "SELECT * FROM research ORDER BY scraped_at DESC", output_csv)

5. Real Estate Listing Tracker

def track_listings(conn, search_urls):
    """Track new listings and price changes in real estate."""
    for search_url in search_urls:
        listings = scrape_listing_page(search_url)
        for listing in listings:
            existing = conn.execute(
                "SELECT price FROM listings WHERE url = ?", (listing["url"],)
            ).fetchone()
            if existing is None:
                print(f"NEW: {listing['title']} — ${listing['price']:,.0f}")
            elif abs(existing[0] - listing["price"]) > 1:
                print(f"PRICE: {listing['title']}: "
                      f"${existing[0]:,.0f} -> ${listing['price']:,.0f}")
            conn.execute("""
                INSERT INTO listings (url, title, price, bedrooms, location)
                VALUES (?, ?, ?, ?, ?)
                ON CONFLICT(url) DO UPDATE SET
                    price = excluded.price, updated_at = CURRENT_TIMESTAMP
            """, (listing["url"], listing["title"], listing["price"],
                  listing.get("bedrooms"), listing.get("location")))
        conn.commit()

CAPTCHA and Rate Limiting Strategies

Rate Limit Detection and Backoff

def rate_limit_aware_scrape(conn, url, session=None):
    """Scrape with automatic rate limit detection and backoff."""
    if session is None:
        session = requests.Session()
        session.headers.update(get_headers())

    resp = session.get(url, timeout=30)

    if resp.status_code == 429:
        retry_after = int(resp.headers.get("Retry-After", 60))
        print(f"Rate limited. Waiting {retry_after}s...")
        time.sleep(retry_after)
        resp = session.get(url, timeout=30)

    if resp.status_code == 403:
        print(f"Blocked on {url}. Consider using residential proxies.")
        # ThorData provides automatic IP rotation to avoid blocks
        # https://thordata.partnerstack.com/partner/0a0x4nzh
        raise requests.HTTPError(f"403 Forbidden: {url}")

    resp.raise_for_status()
    return resp

CAPTCHA Detection

def check_for_captcha(html):
    """Detect common CAPTCHA challenges in response HTML."""
    captcha_indicators = [
        "captcha", "recaptcha", "hcaptcha", "cf-challenge",
        "challenge-running", "turnstile", "just a moment",
        "checking your browser", "verify you are human"
    ]
    html_lower = html.lower()
    for indicator in captcha_indicators:
        if indicator in html_lower:
            return True
    return False

def scrape_with_captcha_detection(conn, url):
    """Scrape with CAPTCHA detection and proxy fallback."""
    html = scrape_with_tracking(conn, url)

    if check_for_captcha(html):
        print(f"CAPTCHA detected on {url} — switching to residential proxy")
        # Retry through ThorData residential proxy
        html = scrape_with_retry(conn, url)

        if check_for_captcha(html):
            print(f"Still blocked after proxy — marking for manual review")
            mark_failed(conn, url, "CAPTCHA not bypassed")
            return None

    return html

The Decision Flowchart

Quick reference for choosing your storage backend:

  1. One-off scrape, someone wants a spreadsheet? → CSV with utf-8-sig
  2. Nested/semi-structured data, or streaming output? → JSONL
  3. Runs more than once, needs dedup, local project? → SQLite
  4. Team access, cloud deployment, millions of rows? → PostgreSQL
  5. Need flexible schema + SQL queries? → PostgreSQL with JSONB
  6. Don't want to manage storage at all? → Managed platform

Migration: SQLite to PostgreSQL

Start with SQLite. It handles 90% of scraping projects. When you outgrow it, migration is straightforward because the SQL is nearly identical:

def migrate_sqlite_to_pg(sqlite_path, pg_conn_string):
    """Migrate scraped data from SQLite to PostgreSQL."""
    import sqlite3
    import psycopg2
    from psycopg2.extras import execute_values

    sqlite_conn = sqlite3.connect(sqlite_path)
    sqlite_conn.row_factory = sqlite3.Row
    pg_conn = psycopg2.connect(pg_conn_string)

    rows = sqlite_conn.execute("SELECT * FROM products").fetchall()
    if not rows:
        print("No data to migrate")
        return

    columns = rows[0].keys()
    with pg_conn.cursor() as cur:
        execute_values(cur, f"""
            INSERT INTO products ({', '.join(columns)})
            VALUES %s
            ON CONFLICT (url) DO NOTHING
        """, [tuple(row) for row in rows])

    pg_conn.commit()
    print(f"Migrated {len(rows)} products from SQLite to PostgreSQL")
    sqlite_conn.close()
    pg_conn.close()

The deduplication pattern is the core: INSERT OR IGNORE in SQLite becomes ON CONFLICT DO NOTHING in PostgreSQL. INSERT ... ON CONFLICT DO UPDATE works identically in both. If you start with SQLite and design your schema well, upgrading to PostgreSQL later is just changing the connection string and a few SQL keywords. Your scraping logic stays exactly the same.

Start with SQLite. Seriously. The deduplication pattern is three lines of SQL, it handles concurrent reads via WAL mode, and migrating to PostgreSQL later is a 20-minute job when you actually need it.