← Back to blog

Scraping Paginated Content with Python: Offset, Cursor, and Infinite Scroll (2026)

You hit "next page" and the URL changes to ?page=2. Easy enough. But then you run into an API that returns a cursor token, or a site that loads more items as you scroll. Each pagination pattern needs a different scraping strategy.

This guide covers every pagination pattern you'll encounter in 2026, with complete Python code, error handling, deduplication, and async patterns for high-volume scraping.

The Five Pagination Patterns

Every paginated source falls into one of these categories:

  1. URL offset — the page number or item offset sits in the URL (?page=2, ?offset=20). You can predict every page URL before you start.
  2. Cursor/token — each response contains a token pointing to the next batch (cursor=abc123, after=eyJpZCI6NDJ9). You must follow the chain; you can't skip ahead.
  3. Link header / HTML next link — the server tells you the next URL via an HTTP Link header or an HTML <a rel="next"> element.
  4. Infinite scroll — JavaScript loads content as the user scrolls. No URL changes. Requires a real browser.
  5. Load More button — a button triggers an AJAX request to load the next batch. Also requires a browser.

Let's build a scraper for each, from simplest to most complex.

Setup

pip install httpx beautifulsoup4 playwright
python -m playwright install chromium

We'll use httpx for HTTP requests (HTTP/2 support, async-friendly), BeautifulSoup for HTML parsing, and Playwright for browser-based patterns.

Pattern 1: URL Offset Pagination

This is the simplest case. The URL contains a page number or offset you increment in a loop.

import httpx
import time
import random

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
    "Accept-Language": "en-US,en;q=0.9",
}


def scrape_page_number_pagination(
    base_url: str,
    items_key: str = "results",
    max_pages: int = 100,
    delay: float = 1.5,
) -> list[dict]:
    """Scrape a JSON API with ?page=N pagination."""
    all_items = []

    with httpx.Client(headers=HEADERS, timeout=15, follow_redirects=True) as client:
        for page in range(1, max_pages + 1):
            resp = client.get(base_url, params={"page": page})

            if resp.status_code == 404:
                print(f"Page {page}: 404 — reached end")
                break
            if resp.status_code == 429:
                wait = int(resp.headers.get("Retry-After", 30))
                print(f"Rate limited. Sleeping {wait}s...")
                time.sleep(wait)
                continue

            resp.raise_for_status()

            try:
                data = resp.json()
            except Exception:
                print(f"Page {page}: non-JSON response")
                break

            items = data.get(items_key, [])
            if not items:
                print(f"Page {page}: empty — done")
                break

            all_items.extend(items)
            total = data.get("total_count") or data.get("total") or data.get("count")

            print(f"Page {page}: {len(items)} items (running total: {len(all_items)}"
                  + (f" / {total}" if total else "") + ")")

            # Stop if we know the total and have reached it
            if total and len(all_items) >= int(total):
                break

            time.sleep(random.uniform(delay * 0.7, delay * 1.5))

    return all_items


def scrape_offset_pagination(
    base_url: str,
    page_size: int = 20,
    items_key: str = "results",
    max_items: int = 5000,
) -> list[dict]:
    """Scrape a JSON API with ?offset=N&limit=N pagination."""
    all_items = []

    with httpx.Client(headers=HEADERS, timeout=15) as client:
        offset = 0
        while len(all_items) < max_items:
            resp = client.get(base_url, params={"offset": offset, "limit": page_size})
            resp.raise_for_status()
            data = resp.json()

            items = data.get(items_key, [])
            if not items:
                break

            all_items.extend(items)
            total = data.get("count") or data.get("total")
            if total and offset + page_size >= int(total):
                break

            offset += page_size
            time.sleep(random.uniform(1, 2))

    return all_items[:max_items]

Tip: Check the first response for total_count or total_pages. If it's there, you know exactly how many requests you need.

Pattern 2: Cursor-Based Pagination

APIs increasingly use cursors instead of page numbers. Each response gives you a token to fetch the next batch. You can't jump to page 5 — you follow the chain from the start.

Cursors appear under many names: next_cursor, after, page_token, continuation, nextPageToken. Some APIs encode them as base64 strings. Don't try to decode them — treat them as opaque tokens.

def scrape_cursor_pagination(
    api_url: str,
    items_key: str = "results",
    cursor_field: str | None = None,  # auto-detect if None
    page_size: int = 100,
) -> list[dict]:
    """Follow cursor tokens until the API says we're done."""
    all_items = []
    cursor = None

    # Common cursor field names to try (in order of preference)
    cursor_candidates = (
        [cursor_field] if cursor_field
        else ["next_cursor", "after", "cursor", "page_token",
              "nextPageToken", "continuation_token", "next"]
    )

    with httpx.Client(headers=HEADERS, timeout=15) as client:
        while True:
            params = {"limit": page_size, "per_page": page_size}
            if cursor:
                # Try to guess the right parameter name
                for field in cursor_candidates:
                    params[field] = cursor
                    break  # only add once

            resp = client.get(api_url, params=params)
            if resp.status_code == 429:
                time.sleep(random.uniform(10, 30))
                continue
            resp.raise_for_status()
            data = resp.json()

            items = data.get(items_key, [])
            all_items.extend(items)
            print(f"Fetched {len(items)} items (total: {len(all_items)})")

            # Find the next cursor — try all common field names
            next_cursor = None
            for field in cursor_candidates:
                # Check top level
                if data.get(field):
                    next_cursor = data[field]
                    break
                # Check nested pagination/meta objects
                for meta_key in ("pagination", "meta", "paging", "page_info"):
                    if isinstance(data.get(meta_key), dict):
                        val = data[meta_key].get(field)
                        if val:
                            next_cursor = val
                            break
                if next_cursor:
                    break

            # Also check for explicit "has_more" flag
            has_more = data.get("has_more") or data.get("hasMore") or data.get("has_next_page")
            if has_more is False:
                break

            if not next_cursor or not items:
                break

            cursor = next_cursor
            time.sleep(random.uniform(0.5, 1.5))

    return all_items

GraphQL Cursor Pagination (Relay Pattern)

Many modern APIs use GraphQL with the Relay pagination spec. Each node has a cursor, and the response includes pageInfo.endCursor and pageInfo.hasNextPage:

def scrape_graphql_relay(
    graphql_url: str,
    query_template: str,
    variables: dict,
    edge_path: str = "data.items.edges",
    page_info_path: str = "data.items.pageInfo",
) -> list[dict]:
    """Paginate through a GraphQL API using Relay cursor pattern."""
    import operator
    from functools import reduce

    def deep_get(d, path):
        return reduce(operator.getitem, path.split("."), d)

    all_nodes = []
    after_cursor = None

    with httpx.Client(headers=HEADERS, timeout=20) as client:
        while True:
            vars_with_cursor = {**variables}
            if after_cursor:
                vars_with_cursor["after"] = after_cursor

            resp = client.post(graphql_url, json={
                "query": query_template,
                "variables": vars_with_cursor,
            })
            resp.raise_for_status()
            data = resp.json()

            if "errors" in data:
                print(f"GraphQL errors: {data['errors']}")
                break

            edges = deep_get(data, edge_path)
            nodes = [e.get("node", e) for e in edges]
            all_nodes.extend(nodes)

            page_info = deep_get(data, page_info_path)
            has_next = page_info.get("hasNextPage", False)
            after_cursor = page_info.get("endCursor")

            print(f"Fetched {len(nodes)} items (total: {len(all_nodes)})")

            if not has_next or not after_cursor:
                break

            time.sleep(random.uniform(0.5, 1.5))

    return all_nodes

Some APIs use the HTTP Link header (standard on GitHub, GitLab, many REST APIs), while HTML sites put a "Next" link in the page.

import re
from urllib.parse import urljoin
from bs4 import BeautifulSoup


def parse_link_header(link_header: str) -> dict[str, str]:
    """Parse an HTTP Link header into a dict of {rel: url}."""
    links = {}
    for part in link_header.split(","):
        part = part.strip()
        match = re.match(r'<([^>]+)>;\s*rel="([^"]+)"', part)
        if match:
            links[match.group(2)] = match.group(1)
    return links


def scrape_link_header_pagination(
    start_url: str,
    items_key: str = "results",
) -> list[dict]:
    """Follow Link header pagination (common on GitHub, GitLab, etc.)."""
    all_items = []
    url = start_url

    with httpx.Client(headers=HEADERS, timeout=15) as client:
        while url:
            resp = client.get(url)
            resp.raise_for_status()

            data = resp.json()
            items = data if isinstance(data, list) else data.get(items_key, [])
            all_items.extend(items)

            link_header = resp.headers.get("Link", "")
            links = parse_link_header(link_header)
            url = links.get("next")  # None if no next page

            print(f"Fetched {len(items)} items from {resp.url}")
            time.sleep(random.uniform(0.5, 1.5))

    return all_items


def scrape_html_next_link(
    start_url: str,
    item_selector: str = "article, .result, .card",
    next_link_selector: str = 'a[rel="next"], .next-page a, .pagination .next a',
) -> list[str]:
    """Follow HTML 'next page' links."""
    all_items = []
    url = start_url
    visited = set()

    with httpx.Client(headers=HEADERS, timeout=15, follow_redirects=True) as client:
        while url and url not in visited:
            visited.add(url)
            resp = client.get(url)
            resp.raise_for_status()

            soup = BeautifulSoup(resp.text, "html.parser")

            # Extract items
            for item in soup.select(item_selector):
                all_items.append(item.get_text(strip=True))

            # Find next link
            next_el = soup.select_one(next_link_selector)
            url = urljoin(str(resp.url), next_el["href"]) if next_el and next_el.get("href") else None

            print(f"Page {len(visited)}: {len(soup.select(item_selector))} items")
            time.sleep(random.uniform(1.5, 3))

    return all_items

Pattern 4: Infinite Scroll with Playwright

When content loads as you scroll and the URL doesn't change, you need a real browser. Playwright handles this efficiently with its async API.

import asyncio
from playwright.async_api import async_playwright, Page


async def scrape_infinite_scroll(
    url: str,
    item_selector: str,
    max_scrolls: int = 50,
    scroll_pause_ms: int = 2000,
    proxy_url: str | None = None,
) -> list[str]:
    """Scroll down repeatedly until no new content loads."""
    launch_kwargs = {}
    if proxy_url:
        # Parse proxy URL for Playwright format
        import urllib.parse
        parsed = urllib.parse.urlparse(proxy_url)
        launch_kwargs["proxy"] = {
            "server": f"{parsed.scheme}://{parsed.hostname}:{parsed.port}",
            "username": parsed.username or "",
            "password": parsed.password or "",
        }

    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=True,
            **launch_kwargs,
        )
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                       "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
            viewport={"width": 1280, "height": 900},
        )
        page = await context.new_page()
        await page.goto(url, wait_until="networkidle", timeout=30000)

        previous_height = 0
        previous_count = 0

        for scroll_num in range(max_scrolls):
            # Scroll to bottom
            await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
            await page.wait_for_timeout(scroll_pause_ms)

            current_height = await page.evaluate("document.body.scrollHeight")
            current_count = await page.locator(item_selector).count()

            print(f"Scroll {scroll_num + 1}: height={current_height}, items={current_count}")

            if current_height == previous_height and current_count == previous_count:
                print("No new content loaded — done scrolling")
                break

            previous_height = current_height
            previous_count = current_count

        # Extract all loaded items
        items = await page.locator(item_selector).all()
        results = []
        for item in items:
            text = await item.inner_text()
            results.append(text.strip())

        await browser.close()
        return results


# Run it:
# results = asyncio.run(scrape_infinite_scroll(
#     "https://example.com/feed",
#     item_selector=".post-card",
#     max_scrolls=30
# ))

Intercepting XHR on Infinite Scroll Pages

A smarter approach: instead of scraping rendered HTML, intercept the API calls the page makes as you scroll. This gives you clean JSON directly:

async def intercept_scroll_api(
    url: str,
    api_pattern: str,  # e.g., "**/api/feed**"
    scroll_count: int = 20,
) -> list[dict]:
    """Intercept XHR/fetch calls made during infinite scroll."""
    captured_responses = []

    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()

        # Set up response interception
        async def handle_response(response):
            if api_pattern.replace("**", "") in response.url:
                try:
                    body = await response.json()
                    captured_responses.append(body)
                    print(f"Captured API response from: {response.url}")
                except Exception:
                    pass

        page.on("response", handle_response)

        await page.goto(url, wait_until="networkidle")

        # Trigger scrolling to fire API calls
        for _ in range(scroll_count):
            await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
            await page.wait_for_timeout(2000)

        await browser.close()

    return captured_responses

Pattern 5: Load More Button

Similar to infinite scroll, but triggered by a button click instead of scroll position.

async def scrape_load_more(
    url: str,
    item_selector: str,
    button_selector: str = "button",
    button_text_contains: str = "load more",
    max_clicks: int = 50,
) -> list[str]:
    """Click 'Load More' until the button disappears or max_clicks is reached."""
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        await page.goto(url, wait_until="networkidle")

        clicks = 0
        while clicks < max_clicks:
            # Find the load more button
            btn = None
            for candidate in await page.query_selector_all(button_selector):
                text = (await candidate.inner_text()).lower()
                if button_text_contains.lower() in text:
                    btn = candidate
                    break

            if not btn:
                print(f"No more 'Load More' button after {clicks} clicks")
                break

            is_visible = await btn.is_visible()
            is_enabled = await btn.is_enabled()
            if not is_visible or not is_enabled:
                break

            prev_count = await page.locator(item_selector).count()

            await btn.click()
            await page.wait_for_timeout(2000)

            new_count = await page.locator(item_selector).count()
            print(f"Click {clicks + 1}: {prev_count} -> {new_count} items")

            if new_count == prev_count:
                print("No new items loaded after click — done")
                break

            clicks += 1

        # Extract all items
        items = []
        for el in await page.query_selector_all(item_selector):
            items.append(await el.inner_text())

        await browser.close()
        return items

Rate Limiting: Don't Get Blocked

Rapid-fire requests are the fastest way to get your IP banned. Core rules:

import random
import time


def polite_delay(min_sec: float = 1.0, max_sec: float = 3.0) -> None:
    """Sleep for a random duration to mimic human browsing."""
    time.sleep(random.uniform(min_sec, max_sec))


def handle_rate_limit_response(
    resp: httpx.Response,
    attempt: int,
    max_attempts: int = 5
) -> float:
    """
    Returns seconds to wait, or raises if max attempts exceeded.
    """
    if attempt >= max_attempts:
        resp.raise_for_status()

    # Respect Retry-After header
    retry_after = resp.headers.get("Retry-After")
    if retry_after:
        try:
            return float(retry_after)
        except ValueError:
            pass  # Could be an HTTP date string

    # Exponential backoff with jitter
    return (2 ** attempt) + random.uniform(0, 2)

For large-scale crawls (10,000+ pages), use rotating residential proxies to prevent any single IP from hitting rate limits. ThorData provides residential proxies that work well for this — pass the proxy URL into httpx.Client:

proxy_url = "http://USER:[email protected]:9000"

client = httpx.Client(
    headers=HEADERS,
    proxy=proxy_url,
    timeout=20,
    follow_redirects=True,
)

Deduplication: Avoid Scraping the Same Data Twice

Pagination can overlap — especially with cursor-based APIs where items shift between fetches.

def deduplicate_items(
    items: list[dict],
    id_field: str = "id"
) -> list[dict]:
    """Remove duplicate items by ID field."""
    seen = set()
    unique = []
    for item in items:
        item_id = item.get(id_field)
        if item_id is None:
            unique.append(item)  # can't deduplicate without ID
        elif item_id not in seen:
            seen.add(item_id)
            unique.append(item)
    return unique

For resumable scraping across multiple runs, persist seen IDs to SQLite:

import sqlite3


def load_seen_ids(db_path: str, table: str = "seen_ids") -> set[str]:
    conn = sqlite3.connect(db_path)
    conn.execute(f"CREATE TABLE IF NOT EXISTS {table} (id TEXT PRIMARY KEY)")
    rows = conn.execute(f"SELECT id FROM {table}").fetchall()
    conn.close()
    return {r[0] for r in rows}


def save_seen_ids(ids: set[str], db_path: str, table: str = "seen_ids") -> None:
    conn = sqlite3.connect(db_path)
    conn.execute(f"CREATE TABLE IF NOT EXISTS {table} (id TEXT PRIMARY KEY)")
    conn.executemany(
        f"INSERT OR IGNORE INTO {table} (id) VALUES (?)",
        [(id_,) for id_ in ids]
    )
    conn.commit()
    conn.close()

Async Pagination for High Volume

For scraping many independent pages concurrently, asyncio with httpx's async client dramatically speeds things up:

import asyncio
import httpx
from typing import Callable, Awaitable


async def async_paginate(
    base_url: str,
    max_pages: int = 100,
    concurrency: int = 5,
    delay: float = 0.5,
) -> list[dict]:
    """Fetch multiple pages concurrently with a semaphore for rate limiting."""
    semaphore = asyncio.Semaphore(concurrency)
    all_items = []

    async def fetch_page(client: httpx.AsyncClient, page: int) -> list[dict]:
        async with semaphore:
            await asyncio.sleep(delay * random.uniform(0.5, 1.5))
            resp = await client.get(base_url, params={"page": page})
            if resp.status_code == 404:
                return []
            resp.raise_for_status()
            data = resp.json()
            return data.get("results", [])

    async with httpx.AsyncClient(headers=HEADERS, timeout=20) as client:
        # First, find total pages
        first_resp = await client.get(base_url, params={"page": 1})
        first_data = first_resp.json()
        total = first_data.get("total_count", 0)
        page_size = len(first_data.get("results", [])) or 20
        total_pages = min(max_pages, (total + page_size - 1) // page_size)

        all_items.extend(first_data.get("results", []))

        # Fetch remaining pages concurrently
        tasks = [fetch_page(client, p) for p in range(2, total_pages + 1)]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for result in results:
            if isinstance(result, list):
                all_items.extend(result)
            elif isinstance(result, Exception):
                print(f"Page fetch error: {result}")

    return all_items

Choosing the Right Approach

Pattern Detection Tool Speed
URL offset (?page=N) URL has ?page=, ?offset= httpx Fast
Cursor (next_cursor) Response has cursor token httpx Fast
Link header HTTP Link: header with rel="next" httpx Fast
HTML next link <a rel="next"> in page httpx + BS4 Medium
Infinite scroll Height grows on scroll Playwright Slow
Load more button Button triggers AJAX Playwright Slow
XHR interception Network tab shows API calls Playwright Medium

Start with the simplest approach that works. If the site has an internal JSON API (check DevTools Network tab), target that directly — it's faster and more stable than parsing HTML. If you must use a browser, Playwright's async API keeps things efficient.

Whatever the pattern, the core loop is identical: fetch a batch, extract items, find the pointer to the next batch, repeat until done. The only thing that changes is how you find "next."