← Back to blog

How to Scrape the Wayback Machine: CDX API & Python Guide (2026)

How to Scrape the Wayback Machine: CDX API & Python Guide (2026)

The Internet Archive's Wayback Machine holds over 900 billion archived web pages going back to 1996. It is the largest publicly accessible web archive on the planet, and unlike most valuable data sources, it comes with a proper API — no scraping tricks required for the vast majority of use cases.

If you need historical versions of web pages, want to trace how a competitor's pricing page changed over time, recover deleted content, or build a dataset of how the web looked at a particular moment, the Wayback Machine's CDX API is your starting point. This guide covers every major use case: URL enumeration, snapshot retrieval, WARC parsing, change tracking, and scaling with proxy rotation.

Why the Wayback Machine Matters for Data Work

Before diving into code, let's establish what makes the Wayback Machine genuinely useful rather than just a curiosity:

Competitive intelligence. You can reconstruct a competitor's pricing history, product catalog evolution, or messaging changes by comparing snapshots across years. No other public source gives you this.

Content recovery. Pages that return 404 today may have years of archived versions. Blog posts, documentation, product pages — all recoverable if they were ever crawled.

Research datasets. Academics and journalists routinely build datasets from archived web content. The CDX API makes enumerating what exists feasible without downloading terabytes of raw WARC files first.

Link archaeology. What URLs did a domain expose in 2018? What sub-pages existed and when were they removed? The collapse=urlkey parameter answers this in one request.

The CDX API — URL Enumeration

The CDX (Capture Index) API lets you search for all archived snapshots of a URL pattern. It is the backbone of everything else in this guide.

import httpx
import csv
from io import StringIO

def search_cdx(url: str, limit: int = 100, **kwargs) -> list[dict]:
    """Search the Wayback Machine CDX API for archived snapshots."""
    params = {
        "url": url,
        "output": "json",
        "limit": limit,
        "fl": "timestamp,original,mimetype,statuscode,digest,length",
    }
    params.update(kwargs)

    resp = httpx.get(
        "https://web.archive.org/cdx/search/cdx",
        params=params,
        timeout=30,
    )
    resp.raise_for_status()

    rows = resp.json()
    if not rows:
        return []

    headers = rows[0]
    return [dict(zip(headers, row)) for row in rows[1:]]

# Find all snapshots of a page
snapshots = search_cdx("example.com", limit=10)
for s in snapshots:
    print(f"{s['timestamp']} — {s['statuscode']} — {s['mimetype']}")

The response fields are: - timestamp — 14-digit datetime string in YYYYMMDDHHMMSS format - original — the URL as it was crawled - mimetype — content type at time of capture - statuscode — HTTP status returned - digest — SHA-1 hash of the content; identical digests mean identical content - length — size in bytes of the captured response

Useful CDX Parameters

The CDX API accepts a rich set of filters. Using them aggressively reduces API load and speeds up your pipeline:

# All pages under a domain (wildcard)
snapshots = search_cdx("*.example.com/*", matchType="domain", limit=500)

# Only HTML pages (skip images, CSS, JS)
snapshots = search_cdx("example.com/*", filter="mimetype:text/html", limit=200)

# Only successful responses
snapshots = search_cdx("example.com", filter="statuscode:200")

# Date range — YYYYMMDD format
snapshots = search_cdx(
    "example.com",
    from_="20230101",
    to="20260101",
)

# Collapse by digest — removes duplicate content
snapshots = search_cdx("example.com", collapse="digest")

# Pagination for large result sets
snapshots = search_cdx("example.com", page=0, pageSize=50)

# Combine filters — only HTML 200 responses after 2024
snapshots = search_cdx(
    "example.com/*",
    filter=["mimetype:text/html", "statuscode:200"],
    from_="20240101",
    collapse="digest",
    limit=1000,
)

The collapse=digest parameter deserves special attention. If a page did not change between two crawls, both snapshots will have the same SHA-1 digest. Collapsing on digest means you only get snapshots where the content actually changed — invaluable for change-tracking workflows where you want meaningful versions rather than hundreds of identical captures.

The matchType parameter controls how the URL pattern is interpreted: - exact (default) — match this exact URL only - prefix — match URLs that start with this string - host — match all URLs for this host - domain — match all URLs for this domain and all subdomains

Counting Snapshots Before Downloading

For large domains, always check the count before pulling full results:

def count_snapshots(url: str, **kwargs) -> int:
    """Count total CDX results without fetching them."""
    params = {"url": url, "output": "json", "limit": 1, "showNumPages": "true"}
    params.update(kwargs)

    resp = httpx.get("https://web.archive.org/cdx/search/cdx", params=params, timeout=30)
    resp.raise_for_status()

    data = resp.json()
    # Returns total page count at page_size=5 by default
    return int(data) if isinstance(data, (int, str)) else 0

total = count_snapshots("example.com/*", filter="mimetype:text/html")
print(f"Total HTML snapshots: {total}")

Availability API — Quick Checks

Before pulling full page content, use the availability API to confirm a snapshot exists:

import httpx

def check_availability(url: str, timestamp: str = None) -> dict | None:
    """Check if a URL has been archived."""
    params = {"url": url}
    if timestamp:
        params["timestamp"] = timestamp  # YYYYMMDDHHMMSS

    resp = httpx.get(
        "https://archive.org/wayback/available",
        params=params,
        timeout=10,
    )

    data = resp.json()
    snapshot = data.get("archived_snapshots", {}).get("closest")

    if snapshot and snapshot.get("available"):
        return {
            "url": snapshot["url"],
            "timestamp": snapshot["timestamp"],
            "status": snapshot["status"],
        }
    return None

# Check if a page was archived around a specific date
result = check_availability("example.com/pricing", "20250601")
if result:
    print(f"Found: {result['url']}")

The availability API returns the closest snapshot to the requested timestamp — it does not need to be exact. If you request June 2025 and the closest snapshot is from May, that is what you get. Check the returned timestamp before using the content.

Fetching Archived Page Content

Once you have timestamps from CDX, fetch the actual archived content:

import httpx
import time

def fetch_snapshot(url: str, timestamp: str, raw: bool = False) -> str | None:
    """Fetch an archived page from the Wayback Machine."""
    if raw:
        # id_ returns the original content without Wayback's toolbar/rewrites
        wayback_url = f"https://web.archive.org/web/{timestamp}id_/{url}"
    else:
        wayback_url = f"https://web.archive.org/web/{timestamp}/{url}"

    resp = httpx.get(wayback_url, timeout=30, follow_redirects=True)

    if resp.status_code == 200:
        return resp.text
    return None

def fetch_multiple_snapshots(
    url: str, timestamps: list[str], delay: float = 1.5
) -> list[dict]:
    """Fetch multiple historical versions of a page."""
    results = []

    for ts in timestamps:
        content = fetch_snapshot(url, ts, raw=True)
        if content:
            results.append({
                "timestamp": ts,
                "content_length": len(content),
                "content": content,
            })
            print(f"Fetched {ts}: {len(content)} bytes")
        else:
            print(f"Failed {ts}")

        time.sleep(delay)

    return results

The id_ suffix matters. Without it, Wayback Machine injects its own toolbar into the HTML and rewrites all internal URLs so they point back through archive.org. With id_, you get the original HTML exactly as it was captured — essential for parsing, comparison, or rehosting archived content.

Other modifier suffixes worth knowing: - if_ — show the page only if it was archived within a day of the requested timestamp - cs_ — return content-type from the original response - fw_ — show the "flash warning" overlay page instead of content

Bulk URL Enumeration

One of the most powerful CDX use cases: discovering every URL ever archived under a domain.

import httpx
import json

def enumerate_domain_urls(domain: str, output_file: str = None) -> list[str]:
    """Get all unique URLs ever archived for a domain."""
    params = {
        "url": f"*.{domain}/*",
        "matchType": "domain",
        "output": "json",
        "fl": "original",
        "collapse": "urlkey",  # one entry per unique URL
        "filter": "statuscode:200",
        "limit": 10000,
    }

    resp = httpx.get(
        "https://web.archive.org/cdx/search/cdx",
        params=params,
        timeout=120,
    )
    resp.raise_for_status()

    rows = resp.json()
    urls = [row[0] for row in rows[1:]]  # skip header

    if output_file:
        with open(output_file, "w") as f:
            f.write("\n".join(urls))
        print(f"Saved {len(urls)} URLs to {output_file}")

    return urls

# Discover all archived URLs for a domain
urls = enumerate_domain_urls("example.com", "example_urls.txt")
print(f"Found {len(urls)} unique URLs")

Use cases for URL enumeration: - Security research — find forgotten API endpoints, old admin panels, staging URLs that were crawled before being secured - SEO analysis — identify what pages a competitor has published and removed over time - Content archaeology — discover articles, product pages, or documentation that no longer exist on the live site - Link validation — verify which historic URLs are still live versus lost

For very large domains with millions of archived pages, paginate through the CDX results rather than fetching everything at once:

def paginate_cdx(url_pattern: str, page_size: int = 1000) -> list[dict]:
    """Paginate through all CDX results for a URL pattern."""
    all_results = []
    page = 0

    while True:
        results = search_cdx(
            url_pattern,
            limit=page_size,
            page=page,
            collapse="urlkey",
            filter="statuscode:200",
        )

        if not results:
            break

        all_results.extend(results)
        print(f"Page {page}: fetched {len(results)} results (total: {len(all_results)})")

        if len(results) < page_size:
            break  # Last page

        page += 1
        time.sleep(0.5)

    return all_results

SQLite Schema for Snapshot Storage

Before going further into pipeline code, set up a proper storage layer. SQLite handles millions of rows easily for this kind of workload:

import sqlite3
import json

def init_wayback_db(db_path: str = "wayback.db") -> sqlite3.Connection:
    """Initialize SQLite database for storing Wayback Machine data."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS snapshots (
            timestamp TEXT NOT NULL,
            url TEXT NOT NULL,
            mimetype TEXT,
            statuscode TEXT,
            digest TEXT,
            length INTEGER,
            fetched INTEGER DEFAULT 0,
            content TEXT,
            fetched_at TIMESTAMP,
            PRIMARY KEY (timestamp, url)
        );

        CREATE INDEX IF NOT EXISTS idx_snapshots_url
            ON snapshots(url);

        CREATE INDEX IF NOT EXISTS idx_snapshots_digest
            ON snapshots(digest);

        CREATE INDEX IF NOT EXISTS idx_snapshots_fetched
            ON snapshots(fetched);

        CREATE TABLE IF NOT EXISTS domains (
            domain TEXT PRIMARY KEY,
            total_snapshots INTEGER,
            unique_urls INTEGER,
            earliest_snapshot TEXT,
            latest_snapshot TEXT,
            last_enumerated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
    """)
    conn.commit()
    return conn


def save_cdx_results(conn: sqlite3.Connection, results: list[dict]):
    """Bulk insert CDX results into the database."""
    conn.executemany(
        """INSERT OR IGNORE INTO snapshots
           (timestamp, url, mimetype, statuscode, digest, length)
           VALUES (?, ?, ?, ?, ?, ?)""",
        [
            (
                r["timestamp"], r["original"], r["mimetype"],
                r["statuscode"], r["digest"],
                int(r["length"]) if r["length"].isdigit() else 0,
            )
            for r in results
        ],
    )
    conn.commit()


def mark_fetched(conn: sqlite3.Connection, timestamp: str, url: str, content: str):
    """Update a snapshot record with fetched content."""
    conn.execute(
        """UPDATE snapshots
           SET fetched = 1, content = ?, fetched_at = CURRENT_TIMESTAMP
           WHERE timestamp = ? AND url = ?""",
        (content, timestamp, url),
    )
    conn.commit()


def get_unfetched(conn: sqlite3.Connection, limit: int = 100) -> list[dict]:
    """Get snapshot records that haven't been fetched yet."""
    rows = conn.execute(
        """SELECT timestamp, url FROM snapshots
           WHERE fetched = 0 AND statuscode = '200'
           AND mimetype LIKE 'text/html%'
           ORDER BY timestamp DESC
           LIMIT ?""",
        (limit,),
    ).fetchall()
    return [{"timestamp": r[0], "url": r[1]} for r in rows]

This schema separates the enumeration phase (CDX metadata) from the fetch phase (actual content), so you can resume interrupted jobs and query what you have before deciding what to download.

Parsing WARC Files

For bulk downloads, the Internet Archive provides WARC (Web ARChive) files — the raw capture format. If you need millions of pages, download WARCs rather than fetching individual snapshots through the API:

import gzip
import re
from dataclasses import dataclass

@dataclass
class WARCRecord:
    url: str
    date: str
    status: int
    content_type: str
    body: bytes

def parse_warc(filepath: str) -> list[WARCRecord]:
    """Parse a WARC file and extract HTTP responses."""
    records = []

    opener = gzip.open if filepath.endswith(".gz") else open

    with opener(filepath, "rb") as f:
        content = f.read()

    # Split into WARC records
    parts = content.split(b"WARC/1.0\r\n")

    for part in parts[1:]:  # skip empty first split
        # Only process response records
        if b"WARC-Type: response" not in part:
            continue

        # Extract WARC headers
        url_match = re.search(rb"WARC-Target-URI: (.+?)\r\n", part)
        date_match = re.search(rb"WARC-Date: (.+?)\r\n", part)

        if not url_match:
            continue

        # Split HTTP headers from body
        http_start = part.find(b"HTTP/")
        if http_start == -1:
            continue

        http_part = part[http_start:]
        header_end = http_part.find(b"\r\n\r\n")
        if header_end == -1:
            continue

        http_headers = http_part[:header_end].decode("utf-8", errors="replace")
        body = http_part[header_end + 4:]

        # Parse status code
        status_match = re.search(r"HTTP/[\d.]+ (\d+)", http_headers)
        status = int(status_match.group(1)) if status_match else 0

        # Parse content type
        ct_match = re.search(r"Content-Type: (.+?)[\r\n]", http_headers, re.I)
        content_type = ct_match.group(1).strip() if ct_match else "unknown"

        records.append(WARCRecord(
            url=url_match.group(1).decode("utf-8"),
            date=date_match.group(1).decode("utf-8") if date_match else "",
            status=status,
            content_type=content_type,
            body=body,
        ))

    return records

# Parse a downloaded WARC file
records = parse_warc("example.warc.gz")
for r in records:
    print(f"{r.status} {r.url} ({r.content_type}, {len(r.body)} bytes)")

Error Handling and Retries

The Wayback Machine API is generally reliable, but it does go down for maintenance, returns occasional 503s during peak load, and sometimes returns empty results for queries that should have matches. Robust retry logic is non-negotiable for production pipelines:

import time
import random
from typing import Any, Callable

def with_retry(
    func: Callable,
    max_attempts: int = 5,
    base_delay: float = 2.0,
    max_delay: float = 60.0,
) -> Any:
    """
    Execute a function with exponential backoff retry.
    Retry on HTTP 429, 500, 502, 503, 504 and network errors.
    """
    for attempt in range(max_attempts):
        try:
            return func()
        except httpx.HTTPStatusError as e:
            status = e.response.status_code
            if status in (429, 500, 502, 503, 504) and attempt < max_attempts - 1:
                delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
                print(f"  HTTP {status} on attempt {attempt + 1}, retrying in {delay:.1f}s")
                time.sleep(delay)
                continue
            raise
        except (httpx.ConnectError, httpx.TimeoutException) as e:
            if attempt < max_attempts - 1:
                delay = min(base_delay * (2 ** attempt), max_delay)
                print(f"  Network error: {e}, retrying in {delay:.1f}s")
                time.sleep(delay)
                continue
            raise

    return None


# Usage
def fetch_with_retry(url: str, timestamp: str) -> str | None:
    return with_retry(
        lambda: fetch_snapshot(url, timestamp, raw=True),
        max_attempts=5,
        base_delay=3.0,
    )

For CDX queries on large domains, also handle empty results that might indicate a transient issue rather than a genuine absence:

def safe_cdx_search(url: str, **kwargs) -> list[dict]:
    """CDX search with retry on empty results that look suspicious."""
    for attempt in range(3):
        results = search_cdx(url, **kwargs)
        if results or attempt == 2:
            return results
        # Empty result from a URL we expect to have history — retry once
        print(f"  Empty CDX result on attempt {attempt + 1}, retrying...")
        time.sleep(5)
    return []

Rate Limits and Anti-Bot Measures

The Internet Archive is a non-profit running on donations. They are generous with API access but will throttle aggressive scrapers.

Current limits as of 2026: - CDX API: approximately 15 requests per minute per IP for large queries - Snapshot fetching: 10-15 pages per minute - Bulk operations: the Archive asks you to use their S3-like download interface for large datasets

import time

class WaybackThrottle:
    """Simple rate limiter for Wayback Machine requests."""

    def __init__(self, requests_per_minute: int = 12):
        self.delay = 60.0 / requests_per_minute
        self.last_request = 0

    def wait(self):
        elapsed = time.time() - self.last_request
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)
        self.last_request = time.time()

For large-scale enumeration across many domains, route requests through rotating proxies to avoid IP-based throttling. ThorData's datacenter proxies are a good fit here — the Wayback Machine does not use aggressive fingerprinting, so clean datacenter IPs work fine and are cheaper than residential for this use case.

import random

PROXY_LIST = [
    "http://USER:[email protected]:9000",
    # Add more as needed
]

def random_proxy() -> dict:
    proxy = random.choice(PROXY_LIST)
    return {"all://": proxy}

def fetch_snapshot_proxied(url: str, timestamp: str) -> str | None:
    """Fetch snapshot through a randomly selected proxy."""
    proxy = random.choice(PROXY_LIST)
    with httpx.Client(proxies={"all://": proxy}, timeout=30, follow_redirects=True) as client:
        resp = client.get(f"https://web.archive.org/web/{timestamp}id_/{url}")
        if resp.status_code == 200:
            return resp.text
    return None

Do not hammer the Internet Archive. It is a public resource that benefits researchers, journalists, and historians worldwide. If you need bulk access to millions of pages, use their bulk download options at archive.org/developers or consider donating to offset your usage costs.

Tracking Page Changes Over Time

A practical example — monitoring how a pricing page or product catalog changed across years:

from selectolax.parser import HTMLParser
from difflib import unified_diff

def track_page_changes(url: str, from_date: str, to_date: str):
    """Compare how a page changed between two dates."""
    snapshots = search_cdx(
        url,
        from_=from_date,
        to=to_date,
        filter="statuscode:200",
        collapse="digest",
        limit=50,
    )

    throttle = WaybackThrottle(requests_per_minute=10)

    previous = None
    changes = []

    for snap in snapshots:
        throttle.wait()
        content = fetch_snapshot(url, snap["timestamp"], raw=True)

        if content and previous:
            tree = HTMLParser(content)
            text = tree.body.text(separator="\n", strip=True) if tree.body else ""

            prev_tree = HTMLParser(previous)
            prev_text = prev_tree.body.text(separator="\n", strip=True) if prev_tree.body else ""

            if text != prev_text:
                diff = list(unified_diff(
                    prev_text.splitlines(),
                    text.splitlines(),
                    fromfile=f"version-{snapshots[snapshots.index(snap)-1]['timestamp']}",
                    tofile=f"version-{snap['timestamp']}",
                    lineterm="",
                ))
                if diff:
                    changes.append({
                        "timestamp": snap["timestamp"],
                        "diff_lines": len(diff),
                        "diff": "\n".join(diff[:50]),  # first 50 lines
                    })

        previous = content

    return changes

# Track pricing page changes
changes = track_page_changes("example.com/pricing", "20240101", "20260101")
for c in changes:
    print(f"\n=== Change at {c['timestamp']} ({c['diff_lines']} lines) ===")
    print(c["diff"])

Downloading PDFs and Static Assets

The Wayback Machine archives PDFs, images, and other static content too. The CDX API handles these the same way as HTML:

import os

def download_archived_pdfs(domain: str, output_dir: str = "pdfs"):
    """Download all archived PDFs from a domain."""
    os.makedirs(output_dir, exist_ok=True)

    snapshots = search_cdx(
        f"*.{domain}/*",
        matchType="domain",
        filter=["mimetype:application/pdf", "statuscode:200"],
        collapse="urlkey",
        limit=5000,
    )

    print(f"Found {len(snapshots)} unique PDFs")
    throttle = WaybackThrottle(requests_per_minute=8)

    for snap in snapshots:
        throttle.wait()

        safe_name = snap["original"].replace("https://", "").replace("http://", "")
        safe_name = safe_name.replace("/", "_").replace("?", "_")[:200] + ".pdf"
        out_path = os.path.join(output_dir, safe_name)

        if os.path.exists(out_path):
            continue

        url = f"https://web.archive.org/web/{snap['timestamp']}id_/{snap['original']}"
        with httpx.Client(timeout=60, follow_redirects=True) as client:
            resp = client.get(url)
            if resp.status_code == 200:
                with open(out_path, "wb") as f:
                    f.write(resp.content)
                print(f"  Downloaded: {safe_name}")
            else:
                print(f"  Failed: {resp.status_code} {snap['original']}")

Complete Pipeline Example

Putting it all together — a pipeline that enumerates a domain's history, stores CDX metadata, and fetches content in priority order:

import sqlite3
import time

def run_wayback_pipeline(domain: str, db_path: str = "wayback.db"):
    """
    Full pipeline:
    1. Enumerate all HTML snapshots for a domain
    2. Store CDX metadata in SQLite
    3. Fetch content for pages with high priority (most recent, 200 only)
    4. Save content to database
    """
    conn = init_wayback_db(db_path)

    # Phase 1: Enumerate
    print(f"Enumerating CDX records for {domain}...")
    throttle = WaybackThrottle(requests_per_minute=10)

    throttle.wait()
    results = search_cdx(
        f"*.{domain}/*",
        matchType="domain",
        filter=["mimetype:text/html", "statuscode:200"],
        collapse="digest",
        limit=10000,
        from_="20230101",
    )

    print(f"Found {len(results)} unique HTML snapshots")
    save_cdx_results(conn, results)

    # Phase 2: Fetch content
    print("Fetching snapshot content...")
    pending = get_unfetched(conn, limit=50)

    for item in pending:
        throttle.wait()
        content = fetch_snapshot(item["url"], item["timestamp"], raw=True)

        if content:
            mark_fetched(conn, item["timestamp"], item["url"], content)
            print(f"  Fetched: {item['timestamp']} {item['url'][:60]}")
        else:
            print(f"  Skipped: {item['url'][:60]}")

    conn.close()
    print("Pipeline complete.")

# Run it
run_wayback_pipeline("competitor.com")

The Internet Archive is a non-profit library and the Wayback Machine is a public service. Using the CDX API and fetching archived snapshots is legal and explicitly supported. The Archive's robots.txt exclusion system means some pages were never crawled by request of site owners — the CDX API will simply return no results for those URLs.

Republishing archived content commercially requires care. The original copyright holders retain their rights even after content is archived. Academic, journalistic, and research use is generally fine; building a competing product that republishes archived content at scale is a different matter.

Wrapping Up

The Wayback Machine's CDX API is one of the genuinely useful tools on the public internet. URL enumeration with wildcards, content deduplication with digest collapsing, and raw content access with the id_ modifier cover 90% of use cases.

For one-off lookups, the availability API is fastest. For historical analysis, CDX with collapse=digest shows only snapshots where content actually changed. For bulk research, WARC parsing gives everything in one download. For production pipelines, the SQLite-backed approach above handles interruptions cleanly and keeps your work resumable.

Keep request rates reasonable — this is a public resource. If you are pulling more than a few hundred pages, add delays and consider whether the bulk download interface would serve you better.