Storing Scraped Data: SQLite, PostgreSQL, CSV, and JSON Compared (2026)
Storing Scraped Data: SQLite, PostgreSQL, CSV, and JSON Compared (2026)
Your scraper works. Data is flowing. Now where does it go?
This sounds like a trivial decision until you're three weeks into a project, your CSV has 400K rows with mangled unicode, duplicate entries everywhere, and you can't tell which URLs you've already visited. Storage choice matters more than most scrapers realize — it determines whether your scraper can resume after crashes, whether you can deduplicate records, and whether anyone else can actually query the data you've collected.
The wrong choice doesn't just cause inconvenience. It can cost you days of wasted scraping time when a partial run overwrites your complete dataset, or weeks of cleanup when you realize your flat file has no way to handle the schema evolution that happens as target sites change their HTML structure. I've seen teams lose weeks of collected data because they stored everything in a single JSON file that got corrupted on a crash. I've watched scrapers re-fetch 50,000 pages because there was no deduplication layer to say "you already have this one."
This guide covers the four most common storage options for scraped data — CSV, JSON/JSONL, SQLite, and PostgreSQL — with complete, working Python code for each. We'll cover deduplication patterns, incremental scraping, schema design, error handling, and the real-world tradeoffs that determine which option fits your project. By the end, you'll have a clear decision framework and copy-paste code for whichever storage backend you choose.
Whether you're scraping product listings for price monitoring, collecting research data from academic sites, or building datasets for machine learning, the storage layer is what turns raw HTML extractions into usable, queryable data. Let's get it right from the start.
The Four Options at a Glance
| Format | Best for | Worst for | Max practical size | Dedup support |
|---|---|---|---|---|
| CSV | One-off scripts, Excel handoff, flat data | Nested data, deduplication, large scale | ~500K rows | None built-in |
| JSON/JSONL | Semi-structured data, nested fields, streaming | Queries, deduplication, tabular analysis | ~1M records | Manual only |
| SQLite | Local projects, dedup, incremental scraping | Team access, concurrent writes | ~10M rows | Built-in (PRIMARY KEY) |
| PostgreSQL | Team access, large scale, cloud deployment | Quick scripts, zero-setup needs | Billions of rows | Built-in (UPSERT) |
Pick based on what happens after scraping, not during. If someone's opening the output in Excel — CSV. If you're building a pipeline that runs daily and must skip already-scraped URLs — SQLite minimum.
CSV: Simple but Fragile
Basic CSV Storage
For a one-off scrape where you just need a file to hand off:
import csv
import os
from datetime import datetime
def save_to_csv(records, filename="output.csv"):
"""Append records to CSV, creating headers if file is new."""
if not records:
return
file_exists = os.path.exists(filename)
with open(filename, "a", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
if not file_exists or os.path.getsize(filename) == 0:
writer.writeheader()
writer.writerows(records)
print(f"Saved {len(records)} records to {filename}")
The utf-8-sig encoding adds a BOM (Byte Order Mark) that tells Excel to read UTF-8 correctly. Without it, names with accents, non-Latin characters, or emoji turn into garbage when someone double-clicks the file in Excel. Small detail, saves you an email from whoever opens the file.
Handling Edge Cases in CSV
CSV has several gotchas that bite scrapers:
import csv
import re
def clean_for_csv(value):
"""Clean a value for safe CSV storage."""
if value is None:
return ""
if isinstance(value, str):
# Remove control characters that break CSV parsers
value = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', value)
# Normalize whitespace (newlines in cells cause issues)
value = ' '.join(value.split())
# Truncate extremely long values
if len(value) > 32000:
value = value[:32000] + "... [truncated]"
return value
def save_products_csv(products, filename="products.csv"):
"""Save product data with proper cleaning and error handling."""
if not products:
print("No products to save")
return
fieldnames = ["url", "title", "price", "currency", "description",
"category", "rating", "review_count", "scraped_at"]
file_exists = os.path.exists(filename) and os.path.getsize(filename) > 0
with open(filename, "a", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(
f, fieldnames=fieldnames,
extrasaction="ignore", # Skip fields not in fieldnames
quoting=csv.QUOTE_ALL # Quote everything to avoid delimiter issues
)
if not file_exists:
writer.writeheader()
for product in products:
cleaned = {k: clean_for_csv(v) for k, v in product.items()}
cleaned["scraped_at"] = datetime.now().isoformat()
writer.writerow(cleaned)
print(f"Appended {len(products)} products to {filename}")
Reading CSV Back Efficiently
When you need to process a large CSV without loading it all into memory:
def read_csv_lazy(filename):
"""Read CSV lazily — one row at a time, no memory explosion."""
with open(filename, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
yield row
def csv_to_dataframe(filename):
"""Read CSV into pandas with proper type handling."""
import pandas as pd
df = pd.read_csv(
filename,
encoding="utf-8-sig",
dtype={"price": float, "review_count": "Int64"},
parse_dates=["scraped_at"],
na_values=["", "N/A", "null", "None"]
)
return df
# Process a large CSV in chunks
def process_large_csv(filename, chunk_size=10000):
"""Process a huge CSV in memory-efficient chunks."""
import pandas as pd
total_rows = 0
for chunk in pd.read_csv(filename, chunksize=chunk_size, encoding="utf-8-sig"):
# Process each chunk
total_rows += len(chunk)
print(f"Processed {total_rows} rows so far...")
# Do analysis on chunk here
return total_rows
When CSV Falls Apart
CSV falls apart when your data has:
- Nested structures — product variants, multiple images, address objects. You end up with columns like
image_1,image_2,image_3which is fragile and wasteful. - Schema changes — when the target site adds a new field, your CSV headers are frozen from the first write.
- Deduplication needs — there's no built-in way to check "did I already scrape this URL?"
- Large datasets — reading a 2GB CSV into memory for analysis kills most machines.
- Concurrent writes — two scraper processes writing to the same CSV simultaneously corrupts the file.
If any of these apply, move to JSONL or SQLite.
JSON and JSONL: Flexible Structure
Regular JSON vs JSONL
Regular JSON stores everything in one array:
[{"title": "Product A", "price": 29.99}, {"title": "Product B", "price": 39.99}]
To append a record, you must read the entire file, parse it, append to the list, and write it all back. With a 2GB file, that means loading 2GB into RAM just to add one record.
JSONL (JSON Lines) puts one JSON object per line:
{"title": "Product A", "price": 29.99}
{"title": "Product B", "price": 39.99}
You can append one record at a time. Your scraper can crash at any point and you don't lose everything before the crash. This is the format you want for web scraping.
JSONL Writer with Rotation and Validation
import json
import os
from datetime import datetime
class JSONLWriter:
"""Append-mode JSONL writer with validation and file rotation."""
def __init__(self, filename="output.jsonl", max_size_mb=500):
self.filename = filename
self.max_size_bytes = max_size_mb * 1024 * 1024
self.records_written = 0
def _get_current_file(self):
"""Rotate file if it exceeds max size."""
if os.path.exists(self.filename):
size = os.path.getsize(self.filename)
if size > self.max_size_bytes:
base, ext = os.path.splitext(self.filename)
i = 1
while os.path.exists(f"{base}.{i}{ext}"):
i += 1
os.rename(self.filename, f"{base}.{i}{ext}")
print(f"Rotated {self.filename} -> {base}.{i}{ext}")
return self.filename
def write(self, record):
"""Write a single record to JSONL file."""
if not isinstance(record, dict):
raise ValueError("Record must be a dictionary")
record["_scraped_at"] = datetime.now().isoformat()
filepath = self._get_current_file()
with open(filepath, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
self.records_written += 1
def write_batch(self, records):
"""Write multiple records efficiently."""
filepath = self._get_current_file()
with open(filepath, "a", encoding="utf-8") as f:
for record in records:
record["_scraped_at"] = datetime.now().isoformat()
f.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
self.records_written += len(records)
print(f"Wrote {len(records)} records (total: {self.records_written})")
# Usage
writer = JSONLWriter("products.jsonl")
writer.write({"url": "https://example.com/p/1", "title": "Widget", "price": 29.99})
writer.write_batch([
{"url": "https://example.com/p/2", "title": "Gadget", "price": 49.99},
{"url": "https://example.com/p/3", "title": "Doohickey", "price": 19.99},
])
JSONL Reader with Filtering and Error Recovery
def read_jsonl(filename, filter_fn=None):
"""Read JSONL lazily with optional filtering and error recovery."""
with open(filename, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
if filter_fn is None or filter_fn(record):
yield record
except json.JSONDecodeError as e:
print(f"Skipping malformed line {line_num}: {e}")
# Read all records
all_records = list(read_jsonl("products.jsonl"))
# Read only expensive products
expensive = list(read_jsonl(
"products.jsonl",
filter_fn=lambda r: r.get("price", 0) > 100
))
# Read products from a specific category
electronics = list(read_jsonl(
"products.jsonl",
filter_fn=lambda r: r.get("category") == "Electronics"
))
# Convert to pandas for analysis
import pandas as pd
df = pd.DataFrame(read_jsonl("products.jsonl"))
print(df.describe())
JSONL Deduplication (Post-Processing)
Since JSONL has no built-in dedup, you need to handle it yourself:
def deduplicate_jsonl(input_file, output_file, key_field="url"):
"""Remove duplicate records from a JSONL file based on a key field."""
seen = set()
dupes = 0
kept = 0
with open(output_file, "w", encoding="utf-8") as out:
for record in read_jsonl(input_file):
key = record.get(key_field)
if key and key not in seen:
seen.add(key)
out.write(json.dumps(record, ensure_ascii=False) + "\n")
kept += 1
else:
dupes += 1
print(f"Kept {kept} records, removed {dupes} duplicates")
return kept, dupes
Handling Nested Data — Where JSONL Shines
This is where JSONL dominates CSV:
# Product with variants, images, and nested specs — all stored naturally
product = {
"url": "https://store.example.com/laptop-pro-16",
"title": "Laptop Pro 16",
"price": 1499.99,
"variants": [
{"sku": "LP16-256", "storage": "256GB", "price": 1499.99},
{"sku": "LP16-512", "storage": "512GB", "price": 1699.99},
{"sku": "LP16-1TB", "storage": "1TB", "price": 1999.99},
],
"images": [
"https://cdn.example.com/img/lp16-front.jpg",
"https://cdn.example.com/img/lp16-side.jpg",
],
"specs": {
"display": "16 inch Retina",
"cpu": "M3 Pro",
"ram": "18GB",
"battery": "22 hours",
},
"reviews_summary": {
"average": 4.7,
"count": 1243,
"distribution": {"5": 890, "4": 210, "3": 80, "2": 40, "1": 23}
}
}
writer = JSONLWriter("laptops.jsonl")
writer.write(product)
Try representing that in CSV. You'd end up with columns like variant_1_sku, variant_1_storage, variant_2_sku... it's a mess that breaks when a product has 4 variants instead of 3.
SQLite: The Sweet Spot for Most Scrapers
SQLite needs no server, ships with Python, and gives you the one thing flat files can't: deduplication and resume capability. This is where most scrapers should land.
Schema Design for Web Scraping
import sqlite3
from datetime import datetime
from contextlib import contextmanager
def init_db(db_path="scraper.db"):
"""Initialize database with production-quality schema for web scraping."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL") # Better concurrent read performance
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA busy_timeout=5000") # Wait up to 5s on locked DB
conn.executescript("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT,
price REAL,
currency TEXT DEFAULT 'USD',
description TEXT,
category TEXT,
rating REAL,
review_count INTEGER,
in_stock INTEGER DEFAULT 1,
raw_html TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_products_category ON products(category);
CREATE INDEX IF NOT EXISTS idx_products_price ON products(price);
CREATE INDEX IF NOT EXISTS idx_products_scraped ON products(scraped_at);
CREATE TABLE IF NOT EXISTS product_images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_url TEXT NOT NULL,
image_url TEXT NOT NULL,
position INTEGER DEFAULT 0,
FOREIGN KEY (product_url) REFERENCES products(url),
UNIQUE(product_url, image_url)
);
CREATE TABLE IF NOT EXISTS scrape_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
status_code INTEGER,
success INTEGER DEFAULT 1,
error_message TEXT,
response_time_ms INTEGER,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS scrape_queue (
url TEXT PRIMARY KEY,
priority INTEGER DEFAULT 0,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
claimed_at TIMESTAMP,
completed_at TIMESTAMP,
retry_count INTEGER DEFAULT 0
);
""")
conn.commit()
return conn
@contextmanager
def get_db(db_path="scraper.db"):
"""Context manager for database connections."""
conn = init_db(db_path)
try:
yield conn
finally:
conn.close()
The Core Deduplication Pattern
The INSERT ... ON CONFLICT pattern is the heart of any serious scraper:
def save_product(conn, product):
"""Save or update a product with deduplication."""
conn.execute("""
INSERT INTO products (url, title, price, currency, description,
category, rating, review_count, in_stock)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
title = excluded.title,
price = excluded.price,
description = excluded.description,
rating = excluded.rating,
review_count = excluded.review_count,
in_stock = excluded.in_stock,
updated_at = CURRENT_TIMESTAMP
""", (
product["url"], product.get("title"), product.get("price"),
product.get("currency", "USD"), product.get("description"),
product.get("category"), product.get("rating"),
product.get("review_count"), product.get("in_stock", True)
))
conn.commit()
def save_products_batch(conn, products):
"""Batch insert with deduplication — 10x faster than individual inserts."""
conn.executemany("""
INSERT INTO products (url, title, price, currency, description,
category, rating, review_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
title = excluded.title,
price = excluded.price,
updated_at = CURRENT_TIMESTAMP
""", [
(p["url"], p.get("title"), p.get("price"), p.get("currency", "USD"),
p.get("description"), p.get("category"), p.get("rating"),
p.get("review_count"))
for p in products
])
conn.commit()
print(f"Upserted {len(products)} products")
def already_scraped(conn, url):
"""Check if a URL has been scraped recently."""
row = conn.execute(
"SELECT 1 FROM products WHERE url = ?", (url,)
).fetchone()
return row is not None
def get_stale_urls(conn, hours=24):
"""Find products that haven't been updated recently — for re-scraping."""
return [row[0] for row in conn.execute("""
SELECT url FROM products
WHERE updated_at < datetime('now', ? || ' hours')
ORDER BY updated_at ASC
""", (f"-{hours}",)).fetchall()]
Resumable Scraping with a Queue
This is the pattern that makes your scraper production-ready. It can crash, get killed, or hit rate limits — and pick up exactly where it left off:
def add_to_queue(conn, urls, priority=0):
"""Add URLs to the scrape queue."""
conn.executemany("""
INSERT OR IGNORE INTO scrape_queue (url, priority)
VALUES (?, ?)
""", [(url, priority) for url in urls])
conn.commit()
print(f"Added {len(urls)} URLs to queue")
def get_next_url(conn):
"""Get the next URL to scrape (highest priority, oldest first)."""
row = conn.execute("""
SELECT url FROM scrape_queue
WHERE completed_at IS NULL
AND (claimed_at IS NULL OR claimed_at < datetime('now', '-10 minutes'))
AND retry_count < 5
ORDER BY priority DESC, added_at ASC
LIMIT 1
""").fetchone()
if row:
conn.execute(
"UPDATE scrape_queue SET claimed_at = CURRENT_TIMESTAMP WHERE url = ?",
(row[0],)
)
conn.commit()
return row[0]
return None
def mark_completed(conn, url):
"""Mark a URL as successfully scraped."""
conn.execute(
"UPDATE scrape_queue SET completed_at = CURRENT_TIMESTAMP WHERE url = ?",
(url,)
)
conn.commit()
def mark_failed(conn, url, error):
"""Increment retry count for a failed URL so it can be retried."""
conn.execute("""
UPDATE scrape_queue
SET claimed_at = NULL, retry_count = retry_count + 1
WHERE url = ?
""", (url,))
conn.execute(
"INSERT INTO scrape_log (url, success, error_message) VALUES (?, 0, ?)",
(url, str(error))
)
conn.commit()
def queue_stats(conn):
"""Get overview of queue progress."""
total = conn.execute("SELECT COUNT(*) FROM scrape_queue").fetchone()[0]
done = conn.execute(
"SELECT COUNT(*) FROM scrape_queue WHERE completed_at IS NOT NULL"
).fetchone()[0]
failed = conn.execute(
"SELECT COUNT(*) FROM scrape_queue WHERE retry_count >= 5"
).fetchone()[0]
pending = total - done - failed
print(f"Queue: {done}/{total} done, {pending} pending, {failed} permanently failed")
return {"total": total, "done": done, "pending": pending, "failed": failed}
# Main scraping loop — fully resumable
def run_scraper(conn, scrape_fn):
"""Run scraper with automatic resume capability."""
while True:
url = get_next_url(conn)
if not url:
stats = queue_stats(conn)
if stats["pending"] == 0:
print("Queue empty — scraping complete")
break
print("No URLs available right now, waiting...")
import time
time.sleep(30)
continue
try:
print(f"Scraping: {url}")
product = scrape_fn(url)
save_product(conn, product)
mark_completed(conn, url)
except Exception as e:
print(f"Failed: {url} — {e}")
mark_failed(conn, url, e)
Price History Tracking
A common use case — tracking price changes over time for deal alerts:
def init_price_tracking(conn):
"""Add price history table for monitoring."""
conn.execute("""
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_url TEXT NOT NULL,
price REAL NOT NULL,
currency TEXT DEFAULT 'USD',
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_url) REFERENCES products(url)
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_price_url ON price_history(product_url)"
)
conn.commit()
def record_price(conn, url, price, currency="USD"):
"""Record a price point, only if it actually changed."""
last_price = conn.execute("""
SELECT price FROM price_history
WHERE product_url = ?
ORDER BY recorded_at DESC LIMIT 1
""", (url,)).fetchone()
if last_price is None or abs(last_price[0] - price) > 0.01:
conn.execute(
"INSERT INTO price_history (product_url, price, currency) VALUES (?, ?, ?)",
(url, price, currency)
)
conn.commit()
if last_price:
change = price - last_price[0]
direction = "UP" if change > 0 else "DOWN"
print(f"Price {direction}: ${last_price[0]:.2f} -> ${price:.2f} "
f"({'+' if change > 0 else ''}{change:.2f})")
def get_price_drops(conn, min_drop_pct=10):
"""Find products with significant recent price drops — deal finder."""
return conn.execute("""
WITH ranked AS (
SELECT product_url, price,
LAG(price) OVER (
PARTITION BY product_url ORDER BY recorded_at
) as prev_price,
recorded_at
FROM price_history
)
SELECT product_url, prev_price, price,
ROUND((prev_price - price) / prev_price * 100, 1) as drop_pct
FROM ranked
WHERE prev_price IS NOT NULL
AND (prev_price - price) / prev_price * 100 >= ?
ORDER BY drop_pct DESC
""", (min_drop_pct,)).fetchall()
SQLite Analytics and Reporting
def scraping_stats(conn):
"""Get a full overview of scraping progress and health."""
stats = {}
stats["total_products"] = conn.execute(
"SELECT COUNT(*) FROM products"
).fetchone()[0]
stats["scraped_today"] = conn.execute("""
SELECT COUNT(*) FROM products
WHERE DATE(scraped_at) = DATE('now')
""").fetchone()[0]
stats["queue_remaining"] = conn.execute("""
SELECT COUNT(*) FROM scrape_queue
WHERE completed_at IS NULL
""").fetchone()[0]
stats["failed_today"] = conn.execute("""
SELECT COUNT(*) FROM scrape_log
WHERE success = 0 AND DATE(scraped_at) = DATE('now')
""").fetchone()[0]
stats["avg_price"] = conn.execute(
"SELECT ROUND(AVG(price), 2) FROM products WHERE price > 0"
).fetchone()[0]
stats["categories"] = conn.execute(
"SELECT COUNT(DISTINCT category) FROM products WHERE category IS NOT NULL"
).fetchone()[0]
for key, value in stats.items():
print(f" {key}: {value}")
return stats
def category_breakdown(conn):
"""Product count and price stats by category."""
return conn.execute("""
SELECT category,
COUNT(*) as count,
ROUND(AVG(price), 2) as avg_price,
ROUND(MIN(price), 2) as min_price,
ROUND(MAX(price), 2) as max_price
FROM products
WHERE category IS NOT NULL AND price > 0
GROUP BY category
ORDER BY count DESC
""").fetchall()
def export_to_csv(conn, query, filename):
"""Export any SQL query result to CSV."""
import csv
cur = conn.execute(query)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
with open(filename, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(columns)
writer.writerows(rows)
print(f"Exported {len(rows)} rows to {filename}")
PostgreSQL: Team and Scale
When multiple people need to query the data, or you're deploying scrapers to the cloud, PostgreSQL is the move. The deduplication pattern is the same — ON CONFLICT DO NOTHING replaces INSERT OR IGNORE.
Setup and Connection Management
import psycopg2
from psycopg2.extras import execute_values, RealDictCursor
from contextlib import contextmanager
DATABASE_URL = "postgresql://scraper:password@localhost:5432/scraping"
@contextmanager
def get_pg_conn():
"""PostgreSQL connection with auto-cleanup."""
conn = psycopg2.connect(DATABASE_URL)
try:
yield conn
finally:
conn.close()
def init_pg_schema(conn):
"""Create the scraping schema in PostgreSQL."""
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
url TEXT PRIMARY KEY,
title TEXT,
price NUMERIC(10, 2),
currency TEXT DEFAULT 'USD',
description TEXT,
category TEXT,
rating NUMERIC(3, 2),
review_count INTEGER,
in_stock BOOLEAN DEFAULT true,
metadata JSONB DEFAULT '{}',
scraped_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_products_category
ON products(category);
CREATE INDEX IF NOT EXISTS idx_products_metadata
ON products USING GIN(metadata);
CREATE INDEX IF NOT EXISTS idx_products_price
ON products(price);
CREATE TABLE IF NOT EXISTS price_history (
id SERIAL PRIMARY KEY,
product_url TEXT REFERENCES products(url),
price NUMERIC(10, 2) NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_price_url_time
ON price_history(product_url, recorded_at DESC);
""")
conn.commit()
Batch Upsert — Fast Bulk Operations
def upsert_products_pg(conn, products):
"""Batch upsert products — much faster than individual inserts."""
import json
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO products (url, title, price, currency, category, metadata)
VALUES %s
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
price = EXCLUDED.price,
category = EXCLUDED.category,
metadata = products.metadata || EXCLUDED.metadata,
updated_at = NOW()
""", [
(p["url"], p.get("title"), p.get("price"),
p.get("currency", "USD"), p.get("category"),
json.dumps(p.get("metadata", {})))
for p in products
])
conn.commit()
print(f"Upserted {len(products)} products")
JSONB for Flexible Metadata
PostgreSQL's JSONB type lets you store semi-structured data without schema changes — perfect for scraping where different pages have different fields:
import json
def save_with_metadata(conn, url, title, price, extra_data):
"""Store structured fields + flexible metadata in JSONB."""
with conn.cursor() as cur:
cur.execute("""
INSERT INTO products (url, title, price, metadata)
VALUES (%s, %s, %s, %s)
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
price = EXCLUDED.price,
metadata = products.metadata || EXCLUDED.metadata,
updated_at = NOW()
""", (url, title, price, json.dumps(extra_data)))
conn.commit()
# Query JSONB fields — find products by metadata
def find_by_metadata(conn, key, value):
"""Find products by any metadata field."""
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT url, title, price, metadata->>%s as field_value
FROM products
WHERE metadata->>%s = %s
""", (key, key, value))
return cur.fetchall()
# Full-text search within JSONB
def search_metadata(conn, search_term):
"""Search across all metadata fields."""
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT url, title, price
FROM products
WHERE metadata::text ILIKE %s
""", (f"%{search_term}%",))
return cur.fetchall()
Async PostgreSQL with asyncpg
For async scrapers using httpx or aiohttp — critical for high-throughput scraping:
import asyncio
import asyncpg
async def init_pool():
"""Create a connection pool for async operations."""
return await asyncpg.create_pool(
DATABASE_URL,
min_size=5,
max_size=20
)
async def save_product_async(pool, product):
"""Async product save with connection pool."""
async with pool.acquire() as conn:
await conn.execute("""
INSERT INTO products (url, title, price, category)
VALUES ($1, $2, $3, $4)
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
price = EXCLUDED.price,
updated_at = NOW()
""", product["url"], product.get("title"),
product.get("price"), product.get("category"))
async def scrape_and_store_async(pool, urls, scrape_fn):
"""Scrape multiple URLs concurrently with async storage."""
semaphore = asyncio.Semaphore(10) # Limit concurrency
async def process_url(url):
async with semaphore:
product = await scrape_fn(url)
await save_product_async(pool, product)
await asyncio.gather(*[process_url(url) for url in urls])
Using Proxies with Your Storage Layer
When scraping at scale, you need reliable IP rotation to avoid blocks. Your storage layer should track which proxy was used for each request — this helps debug issues when certain IP ranges get blocked.
ThorData residential proxies integrate cleanly with any storage backend:
import requests
import time
import random
THORDATA_PROXY = "http://USERNAME:[email protected]:9000"
def scrape_with_tracking(conn, url):
"""Scrape via ThorData proxy with result logging."""
proxies = {"http": THORDATA_PROXY, "https": THORDATA_PROXY}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
}
time.sleep(random.uniform(1.5, 4.0))
start = time.time()
try:
resp = requests.get(url, headers=headers, proxies=proxies, timeout=30)
elapsed = int((time.time() - start) * 1000)
resp.raise_for_status()
conn.execute(
"INSERT INTO scrape_log (url, status_code, success, response_time_ms) "
"VALUES (?, ?, 1, ?)",
(url, resp.status_code, elapsed)
)
conn.commit()
return resp.text
except requests.RequestException as e:
elapsed = int((time.time() - start) * 1000)
conn.execute(
"INSERT INTO scrape_log (url, status_code, success, error_message, "
"response_time_ms) VALUES (?, ?, 0, ?, ?)",
(url, getattr(e.response, 'status_code', None), str(e), elapsed)
)
conn.commit()
raise
def scrape_with_retry(conn, url, max_retries=3):
"""Scrape with exponential backoff through ThorData proxy rotation."""
for attempt in range(max_retries):
try:
return scrape_with_tracking(conn, url)
except requests.RequestException as e:
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1}/{max_retries} failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {wait:.1f}s...")
time.sleep(wait)
else:
raise
Using httpx with Async Proxy Rotation
import httpx
import asyncio
async def scrape_batch_with_proxies(conn, urls, proxy_url):
"""Scrape multiple URLs concurrently through ThorData proxies."""
async with httpx.AsyncClient(
proxy=proxy_url,
timeout=30,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
follow_redirects=True,
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5)
) as client:
for url in urls:
if already_scraped(conn, url):
continue
try:
await asyncio.sleep(random.uniform(2, 5))
resp = await client.get(url)
resp.raise_for_status()
product = parse_product(url, resp.text)
save_product(conn, product)
mark_completed(conn, url)
print(f"OK: {url}")
except httpx.HTTPError as e:
mark_failed(conn, url, e)
print(f"FAIL: {url} — {e}")
Anti-Detection: Headers, Delays, and Fingerprint Spoofing
Realistic Request Headers
Bare requests.get(url) sends a python-requests/2.31.0 User-Agent that screams "bot." Always set realistic headers:
import random
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
]
def get_headers(referer=None):
"""Generate realistic browser headers with rotation."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
}
if referer:
headers["Referer"] = referer
headers["Sec-Fetch-Site"] = "same-origin"
return headers
Smart Delay Patterns
Fixed delays are detectable. Real users have variable timing:
import time
import random
def human_delay(min_seconds=1.5, max_seconds=5.0):
"""Sleep for a random, human-like duration."""
delay = random.uniform(min_seconds, max_seconds)
# Occasionally take a longer pause (like a user reading a page)
if random.random() < 0.1:
delay += random.uniform(5, 15)
time.sleep(delay)
def adaptive_delay(response_time_ms):
"""Adjust delay based on server response time — slow server = back off more."""
base_delay = max(2.0, response_time_ms / 1000 * 2)
jitter = random.uniform(0.5, 2.0)
time.sleep(base_delay + jitter)
Output Schemas and Data Validation
Define what your scraped data should look like before you start scraping:
from dataclasses import dataclass, field, asdict
from typing import Optional
from datetime import datetime
@dataclass
class ScrapedProduct:
"""Schema for scraped product data."""
url: str
title: str
price: Optional[float] = None
currency: str = "USD"
description: Optional[str] = None
category: Optional[str] = None
rating: Optional[float] = None
review_count: Optional[int] = None
in_stock: bool = True
images: list[str] = field(default_factory=list)
variants: list[dict] = field(default_factory=list)
scraped_at: str = field(default_factory=lambda: datetime.now().isoformat())
def validate(self):
"""Validate before storage — catch data quality issues early."""
errors = []
if not self.url or not self.url.startswith("http"):
errors.append(f"Invalid URL: {self.url}")
if self.price is not None and self.price < 0:
errors.append(f"Negative price: {self.price}")
if self.rating is not None and not (0 <= self.rating <= 5):
errors.append(f"Rating out of range: {self.rating}")
if self.title and len(self.title) > 1000:
errors.append(f"Title too long: {len(self.title)} chars")
if errors:
raise ValueError(f"Validation errors: {'; '.join(errors)}")
return True
def to_dict(self):
return asdict(self)
def to_csv_row(self):
"""Flatten for CSV output (skip nested fields)."""
d = self.to_dict()
d["images"] = "|".join(d["images"])
d.pop("variants")
return d
# Usage
product = ScrapedProduct(
url="https://store.example.com/widget",
title="Super Widget",
price=29.99,
category="Widgets",
rating=4.5,
review_count=128,
images=["https://cdn.example.com/img1.jpg"]
)
product.validate()
Example Output Schemas
E-commerce product:
{
"url": "https://store.example.com/product-123",
"title": "Wireless Bluetooth Headphones",
"price": 79.99,
"currency": "USD",
"description": "Noise-cancelling over-ear headphones with 30h battery",
"category": "Electronics > Audio > Headphones",
"rating": 4.3,
"review_count": 2841,
"in_stock": true,
"images": [
"https://cdn.example.com/products/123/main.jpg",
"https://cdn.example.com/products/123/side.jpg"
],
"variants": [
{"color": "Black", "sku": "BT-HP-BLK", "price": 79.99, "in_stock": true},
{"color": "White", "sku": "BT-HP-WHT", "price": 79.99, "in_stock": false}
],
"scraped_at": "2026-03-30T14:22:33.456789"
}
Job listing:
{
"url": "https://jobs.example.com/posting/456",
"title": "Senior Python Developer",
"company": "TechCorp",
"location": "Remote (US)",
"salary_min": 150000,
"salary_max": 200000,
"salary_currency": "USD",
"employment_type": "full-time",
"skills": ["Python", "FastAPI", "PostgreSQL", "Docker"],
"posted_date": "2026-03-28",
"scraped_at": "2026-03-30T14:22:33.456789"
}
Real estate listing:
{
"url": "https://realty.example.com/listing/789",
"title": "3-Bedroom Apartment in Downtown",
"price": 450000,
"currency": "USD",
"property_type": "apartment",
"bedrooms": 3,
"bathrooms": 2,
"area_sqft": 1200,
"address": {
"street": "123 Main St",
"city": "Austin",
"state": "TX",
"zip": "78701"
},
"features": ["parking", "pool", "gym", "balcony"],
"scraped_at": "2026-03-30T14:22:33.456789"
}
Error Handling and Data Integrity
Safe Data Extraction
from bs4 import BeautifulSoup
import re
def safe_extract(soup, selector, attribute=None, default=None):
"""Extract data with fallback — never crash on missing elements."""
element = soup.select_one(selector)
if element is None:
return default
if attribute:
return element.get(attribute, default)
text = element.get_text(strip=True)
return text if text else default
def parse_price(price_str):
"""Parse price from various international formats."""
if not price_str:
return None
cleaned = re.sub(r'[^\d.,]', '', price_str)
if ',' in cleaned and '.' in cleaned:
cleaned = cleaned.replace(',', '') # 1,299.99 -> 1299.99
elif ',' in cleaned and len(cleaned.split(',')[-1]) == 2:
cleaned = cleaned.replace(',', '.') # 29,99 -> 29.99
elif ',' in cleaned:
cleaned = cleaned.replace(',', '') # 1,299 -> 1299
try:
return round(float(cleaned), 2)
except ValueError:
return None
Transaction Safety for Batch Operations
def atomic_batch_save(conn, products):
"""Save a batch atomically — all succeed or none do."""
try:
for product in products:
conn.execute("""
INSERT INTO products (url, title, price, category)
VALUES (?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
title = excluded.title,
price = excluded.price,
updated_at = CURRENT_TIMESTAMP
""", (product["url"], product.get("title"),
product.get("price"), product.get("category")))
conn.commit()
print(f"Saved batch of {len(products)} products")
except Exception as e:
conn.rollback()
print(f"Batch save failed, rolled back: {e}")
raise
Real-World Use Cases with Code
1. Price Monitoring Dashboard
Track prices across multiple e-commerce sites and alert on drops:
def run_price_monitor(conn, product_urls):
"""Daily price check with change detection and alerting."""
price_changes = []
for url in product_urls:
try:
html = scrape_with_retry(conn, url)
product = parse_product(url, html)
if product.get("price"):
record_price(conn, url, product["price"])
save_product(conn, product)
except Exception as e:
print(f"Failed to check {url}: {e}")
drops = get_price_drops(conn, min_drop_pct=15)
if drops:
print(f"\n{len(drops)} significant price drops detected!")
for url, old, new, pct in drops:
print(f" {url}: ${old} -> ${new} (-{pct}%)")
2. Job Board Aggregator
def aggregate_jobs(conn, sources):
"""Scrape multiple job boards into unified storage."""
for source_name, scrape_fn in sources.items():
print(f"Scraping {source_name}...")
jobs = scrape_fn()
for job in jobs:
job["source"] = source_name
conn.execute("""
INSERT INTO jobs (url, title, company, location, salary_min,
salary_max, source, scraped_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(url) DO UPDATE SET
salary_min = excluded.salary_min,
salary_max = excluded.salary_max,
updated_at = CURRENT_TIMESTAMP
""", (job["url"], job["title"], job["company"],
job["location"], job.get("salary_min"),
job.get("salary_max"), source_name))
conn.commit()
print(f" Saved {len(jobs)} jobs from {source_name}")
3. Content Change Monitor
import hashlib
def monitor_pages(conn, urls):
"""Detect content changes on monitored pages."""
changes = []
for url in urls:
html = scrape_with_retry(conn, url)
content_hash = hashlib.sha256(html.encode()).hexdigest()
last = conn.execute(
"SELECT content_hash FROM snapshots WHERE url = ? "
"ORDER BY scraped_at DESC LIMIT 1", (url,)
).fetchone()
if last is None or last[0] != content_hash:
conn.execute(
"INSERT INTO snapshots (url, content, content_hash) VALUES (?, ?, ?)",
(url, html, content_hash)
)
conn.commit()
if last:
changes.append(url)
print(f"CHANGED: {url}")
return changes
4. Research Data Pipeline
def research_pipeline(conn, queries, output_csv="research_data.csv"):
"""Collect, deduplicate, and export research data."""
for query in queries:
results = search_and_scrape(query)
for r in results:
conn.execute("""
INSERT INTO research (url, title, content, query, source)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET updated_at = CURRENT_TIMESTAMP
""", (r["url"], r["title"], r["content"], query, r["source"]))
conn.commit()
print(f"Collected {len(results)} results for '{query}'")
# Export deduplicated results
export_to_csv(conn, "SELECT * FROM research ORDER BY scraped_at DESC", output_csv)
5. Real Estate Listing Tracker
def track_listings(conn, search_urls):
"""Track new listings and price changes in real estate."""
for search_url in search_urls:
listings = scrape_listing_page(search_url)
for listing in listings:
existing = conn.execute(
"SELECT price FROM listings WHERE url = ?", (listing["url"],)
).fetchone()
if existing is None:
print(f"NEW: {listing['title']} — ${listing['price']:,.0f}")
elif abs(existing[0] - listing["price"]) > 1:
print(f"PRICE: {listing['title']}: "
f"${existing[0]:,.0f} -> ${listing['price']:,.0f}")
conn.execute("""
INSERT INTO listings (url, title, price, bedrooms, location)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
price = excluded.price, updated_at = CURRENT_TIMESTAMP
""", (listing["url"], listing["title"], listing["price"],
listing.get("bedrooms"), listing.get("location")))
conn.commit()
CAPTCHA and Rate Limiting Strategies
Rate Limit Detection and Backoff
def rate_limit_aware_scrape(conn, url, session=None):
"""Scrape with automatic rate limit detection and backoff."""
if session is None:
session = requests.Session()
session.headers.update(get_headers())
resp = session.get(url, timeout=30)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
resp = session.get(url, timeout=30)
if resp.status_code == 403:
print(f"Blocked on {url}. Consider using residential proxies.")
# ThorData provides automatic IP rotation to avoid blocks
# https://thordata.partnerstack.com/partner/0a0x4nzh
raise requests.HTTPError(f"403 Forbidden: {url}")
resp.raise_for_status()
return resp
CAPTCHA Detection
def check_for_captcha(html):
"""Detect common CAPTCHA challenges in response HTML."""
captcha_indicators = [
"captcha", "recaptcha", "hcaptcha", "cf-challenge",
"challenge-running", "turnstile", "just a moment",
"checking your browser", "verify you are human"
]
html_lower = html.lower()
for indicator in captcha_indicators:
if indicator in html_lower:
return True
return False
def scrape_with_captcha_detection(conn, url):
"""Scrape with CAPTCHA detection and proxy fallback."""
html = scrape_with_tracking(conn, url)
if check_for_captcha(html):
print(f"CAPTCHA detected on {url} — switching to residential proxy")
# Retry through ThorData residential proxy
html = scrape_with_retry(conn, url)
if check_for_captcha(html):
print(f"Still blocked after proxy — marking for manual review")
mark_failed(conn, url, "CAPTCHA not bypassed")
return None
return html
The Decision Flowchart
Quick reference for choosing your storage backend:
- One-off scrape, someone wants a spreadsheet? → CSV with
utf-8-sig - Nested/semi-structured data, or streaming output? → JSONL
- Runs more than once, needs dedup, local project? → SQLite
- Team access, cloud deployment, millions of rows? → PostgreSQL
- Need flexible schema + SQL queries? → PostgreSQL with JSONB
- Don't want to manage storage at all? → Managed platform
Migration: SQLite to PostgreSQL
Start with SQLite. It handles 90% of scraping projects. When you outgrow it, migration is straightforward because the SQL is nearly identical:
def migrate_sqlite_to_pg(sqlite_path, pg_conn_string):
"""Migrate scraped data from SQLite to PostgreSQL."""
import sqlite3
import psycopg2
from psycopg2.extras import execute_values
sqlite_conn = sqlite3.connect(sqlite_path)
sqlite_conn.row_factory = sqlite3.Row
pg_conn = psycopg2.connect(pg_conn_string)
rows = sqlite_conn.execute("SELECT * FROM products").fetchall()
if not rows:
print("No data to migrate")
return
columns = rows[0].keys()
with pg_conn.cursor() as cur:
execute_values(cur, f"""
INSERT INTO products ({', '.join(columns)})
VALUES %s
ON CONFLICT (url) DO NOTHING
""", [tuple(row) for row in rows])
pg_conn.commit()
print(f"Migrated {len(rows)} products from SQLite to PostgreSQL")
sqlite_conn.close()
pg_conn.close()
The deduplication pattern is the core: INSERT OR IGNORE in SQLite becomes ON CONFLICT DO NOTHING in PostgreSQL. INSERT ... ON CONFLICT DO UPDATE works identically in both. If you start with SQLite and design your schema well, upgrading to PostgreSQL later is just changing the connection string and a few SQL keywords. Your scraping logic stays exactly the same.
Start with SQLite. Seriously. The deduplication pattern is three lines of SQL, it handles concurrent reads via WAL mode, and migrating to PostgreSQL later is a 20-minute job when you actually need it.