← Back to blog

Advanced OpenSea Scraping: Floor Prices, Rarity Tools, and Whale Wallets in 2026

Advanced OpenSea Scraping: Floor Prices, Rarity Tools, and Whale Wallets in 2026

Building an NFT analytics dashboard means tracking floor prices across hundreds of collections, flagging whale movements, and correlating rarity scores before a mint. This guide covers everything you need: the OpenSea API v2, the Reservoir aggregator for cross-marketplace data, whale wallet tracking, rate limit handling, and how to store it all in a time-series database.

OpenSea's Anti-Bot Measures

OpenSea runs Cloudflare in front of its web frontend. Any attempt to scrape HTML pages with a plain requests call hits a JS challenge almost immediately. The challenge fingerprints TLS handshake details and evaluates browser environment via JavaScript. Standard headless Chromium gets flagged within a few page loads unless you're spoofing browser fingerprints carefully.

The API side is more workable but still restricted: - The public API v2 enforces a hard limit of 4 requests per second per API key - Some endpoints — particularly anything touching wallet-specific listings — require wallet-based authentication via signed messages (EIP-191) - API keys are required for all v2 endpoints; apply through the OpenSea developer portal (approval typically takes 1-2 days) - Keys tied to suspicious usage patterns (burst requests, unusual query patterns) get revoked

OpenSea API v2: Core Endpoints

The v2 API is REST-based with JSON responses and covers the core use cases: collection stats, listings, offers, and transfer events.

import httpx
import time
import random
from typing import Optional

OPENSEA_API_KEY = "your_api_key_here"
BASE_URL = "https://api.opensea.io/api/v2"
HEADERS = {
    "accept": "application/json",
    "x-api-key": OPENSEA_API_KEY,
}


def get_collection_stats(slug: str) -> Optional[dict]:
    """Fetch floor price, volume, and owner count for a collection."""
    url = f"{BASE_URL}/collections/{slug}/stats"
    resp = httpx.get(url, headers=HEADERS, timeout=10)
    if resp.status_code == 404:
        return None
    resp.raise_for_status()
    data = resp.json()

    # Stats come in multiple time intervals: 1h, 6h, 24h, 7d, 30d, all
    intervals = {i["interval"]: i for i in data.get("intervals", [])}
    total = data.get("total", {})

    return {
        "floor_price_eth": total.get("floor_price"),
        "market_cap_eth": total.get("market_cap"),
        "volume_all_time_eth": total.get("volume"),
        "num_owners": total.get("num_owners"),
        "num_listed": total.get("num_listed"),
        "average_price_eth": total.get("average_price"),
        "sales_24h": intervals.get("one_day", {}).get("sales"),
        "volume_24h_eth": intervals.get("one_day", {}).get("volume"),
        "floor_change_24h_pct": intervals.get("one_day", {}).get("floor_price_percentage_change"),
        "sales_7d": intervals.get("seven_day", {}).get("sales"),
        "volume_7d_eth": intervals.get("seven_day", {}).get("volume"),
    }


def get_collection_metadata(slug: str) -> Optional[dict]:
    """Fetch collection name, description, creator, contract address."""
    url = f"{BASE_URL}/collections/{slug}"
    resp = httpx.get(url, headers=HEADERS, timeout=10)
    if resp.status_code == 404:
        return None
    resp.raise_for_status()
    data = resp.json()
    return {
        "name": data.get("name"),
        "slug": data.get("collection"),
        "description": data.get("description"),
        "owner": data.get("owner"),
        "created_date": data.get("created_date"),
        "contracts": data.get("contracts", []),
        "category": data.get("category"),
        "total_supply": data.get("total_supply"),
        "image_url": data.get("image_url"),
        "banner_image_url": data.get("banner_image_url"),
        "twitter_username": data.get("twitter_username"),
        "discord_url": data.get("discord_url"),
        "opensea_url": data.get("opensea_url"),
    }


def get_recent_listings(
    slug: str,
    limit: int = 50,
    next_cursor: str | None = None
) -> dict:
    """Fetch recent listings for a collection."""
    url = f"{BASE_URL}/listings/collection/{slug}/all"
    params = {"limit": min(limit, 100)}
    if next_cursor:
        params["next"] = next_cursor

    resp = httpx.get(url, headers=HEADERS, params=params, timeout=10)
    resp.raise_for_status()
    data = resp.json()

    listings = []
    for listing in data.get("listings", []):
        price_data = listing.get("price", {}).get("current", {})
        listings.append({
            "order_hash": listing.get("order_hash"),
            "token_id": listing.get("protocol_data", {}).get("parameters", {}).get("offer", [{}])[0].get("identifierOrCriteria"),
            "price_eth": price_data.get("value", 0) / 1e18 if price_data.get("value") else None,
            "currency": price_data.get("currency"),
            "expiration": listing.get("protocol_data", {}).get("parameters", {}).get("endTime"),
            "maker": listing.get("protocol_data", {}).get("parameters", {}).get("offerer"),
        })

    return {
        "listings": listings,
        "next": data.get("next"),  # cursor for next page
    }


def get_nft_details(collection_slug: str, token_id: str) -> Optional[dict]:
    """Fetch metadata and traits for a specific NFT."""
    url = f"{BASE_URL}/chain/ethereum/contract/{collection_slug}/nfts/{token_id}"
    resp = httpx.get(url, headers=HEADERS, timeout=10)
    if resp.status_code == 404:
        return None
    resp.raise_for_status()
    data = resp.json().get("nft", {})
    return {
        "identifier": data.get("identifier"),
        "name": data.get("name"),
        "description": data.get("description"),
        "image_url": data.get("image_url"),
        "traits": data.get("traits", []),
        "rarity": data.get("rarity", {}),
        "owners": data.get("owners", []),
        "last_sale": data.get("last_sale"),
    }

Tracking Collection Events (Sales, Transfers, Mints)

def get_collection_events(
    slug: str,
    event_types: list[str] | None = None,
    after_timestamp: int | None = None,
    limit: int = 100,
    next_cursor: str | None = None,
) -> dict:
    """
    Fetch events for a collection.
    event_types: ["sale", "transfer", "listing", "offer", "cancel", "redeem"]
    after_timestamp: unix timestamp to filter events after a certain time
    """
    url = f"{BASE_URL}/events/collection/{slug}"
    params = {"limit": min(limit, 100)}
    if event_types:
        params["event_type"] = event_types  # httpx handles list params
    if after_timestamp:
        params["after"] = after_timestamp
    if next_cursor:
        params["next"] = next_cursor

    resp = httpx.get(url, headers=HEADERS, params=params, timeout=10)
    resp.raise_for_status()
    data = resp.json()

    events = []
    for event in data.get("asset_events", []):
        events.append({
            "event_type": event.get("event_type"),
            "event_timestamp": event.get("event_timestamp"),
            "nft_identifier": event.get("nft", {}).get("identifier"),
            "from_address": event.get("from_address"),
            "to_address": event.get("to_address"),
            "transaction": event.get("transaction"),
            "payment": event.get("payment", {}),
        })

    return {"events": events, "next": data.get("next")}


def paginate_events(
    slug: str,
    event_types: list[str] | None = None,
    max_events: int = 1000,
    after_timestamp: int | None = None,
) -> list[dict]:
    """Fetch all events across pages with cursor pagination."""
    all_events = []
    cursor = None

    while len(all_events) < max_events:
        result = get_collection_events(
            slug,
            event_types=event_types,
            after_timestamp=after_timestamp,
            next_cursor=cursor,
        )
        batch = result["events"]
        if not batch:
            break
        all_events.extend(batch)
        cursor = result.get("next")
        if not cursor:
            break
        time.sleep(0.26)  # stay under 4 req/sec

    return all_events[:max_events]

Reservoir Protocol: Cross-Marketplace Data

Reservoir aggregates listings and sales across OpenSea, Blur, LooksRare, X2Y2, and smaller marketplaces. For floor price tracking it's often more accurate than OpenSea alone because it captures cross-marketplace activity. Their API is free at moderate volumes with a key, and rate limits are less punishing than OpenSea's.

RESERVOIR_API_KEY = "your_reservoir_key"
RESERVOIR_BASE = "https://api.reservoir.tools"
RES_HEADERS = {
    "accept": "application/json",
    "x-api-key": RESERVOIR_API_KEY,
}


def reservoir_collection_floor(contract_address: str) -> Optional[float]:
    """Get the current floor price from the Reservoir aggregator."""
    resp = httpx.get(
        f"{RESERVOIR_BASE}/collections/v7",
        headers=RES_HEADERS,
        params={"id": contract_address, "includeTopBid": "false"},
        timeout=10,
    )
    resp.raise_for_status()
    cols = resp.json().get("collections", [])
    if not cols:
        return None
    return cols[0].get("floorAsk", {}).get("price", {}).get("amount", {}).get("decimal")


def reservoir_collection_stats(contract_address: str) -> dict:
    """Get comprehensive collection stats from Reservoir."""
    resp = httpx.get(
        f"{RESERVOIR_BASE}/collections/v7",
        headers=RES_HEADERS,
        params={
            "id": contract_address,
            "includeTopBid": "true",
            "includeAttributes": "false",
        },
        timeout=10,
    )
    resp.raise_for_status()
    cols = resp.json().get("collections", [])
    if not cols:
        return {}

    col = cols[0]
    return {
        "name": col.get("name"),
        "floor_price_eth": col.get("floorAsk", {}).get("price", {}).get("amount", {}).get("decimal"),
        "top_bid_eth": col.get("topBid", {}).get("price", {}).get("amount", {}).get("decimal"),
        "volume_24h_eth": col.get("volume", {}).get("1day"),
        "volume_7d_eth": col.get("volume", {}).get("7day"),
        "volume_30d_eth": col.get("volume", {}).get("30day"),
        "volume_all_time_eth": col.get("volume", {}).get("allTime"),
        "floor_sale_24h_change": col.get("floorSaleChange", {}).get("1day"),
        "num_owners": col.get("ownerCount"),
        "num_tokens": col.get("tokenCount"),
        "supply_percent_listed": col.get("onSaleCount", 0) / max(col.get("tokenCount", 1), 1),
    }


def reservoir_recent_sales(
    contract_address: str,
    limit: int = 100,
) -> list[dict]:
    """Get recent sales across all marketplaces from Reservoir."""
    resp = httpx.get(
        f"{RESERVOIR_BASE}/sales/v6",
        headers=RES_HEADERS,
        params={
            "collection": contract_address,
            "limit": limit,
            "sortBy": "time",
            "includeTokenMetadata": "true",
        },
        timeout=10,
    )
    resp.raise_for_status()
    sales = resp.json().get("sales", [])

    return [
        {
            "token_id": s.get("token", {}).get("tokenId"),
            "price_eth": s.get("price", {}).get("amount", {}).get("decimal"),
            "marketplace": s.get("fillSource", {}).get("name"),
            "timestamp": s.get("timestamp"),
            "from": s.get("from"),
            "to": s.get("to"),
            "tx_hash": s.get("txHash"),
        }
        for s in sales
    ]

Run both OpenSea and Reservoir in parallel for collections where cross-marketplace discrepancy matters. If OpenSea shows 0.5 ETH but Reservoir shows 0.42 ETH, someone listed cheaper on Blur and arbitrage bots are already on it.

Tracking Whale Wallets

The most actionable signal is tracking wallets with large positions. When a wallet holding 15+ NFTs in a collection starts offloading, that's worth knowing before it hits the floor price.

def build_ownership_map_from_events(
    slug: str,
    max_events: int = 5000
) -> dict[str, int]:
    """
    Reconstruct approximate holdings by replaying transfer events.
    Returns {wallet_address: estimated_count}.
    Note: This is approximate. For exact holdings, use Reservoir's owner endpoint.
    """
    events = paginate_events(
        slug,
        event_types=["transfer"],
        max_events=max_events,
    )

    ownership: dict[str, int] = {}
    ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"

    for event in events:
        to_addr = event.get("to_address", "").lower()
        from_addr = event.get("from_address", "").lower()

        if to_addr and to_addr != ZERO_ADDRESS:
            ownership[to_addr] = ownership.get(to_addr, 0) + 1
        if from_addr and from_addr != ZERO_ADDRESS:
            ownership[from_addr] = max(0, ownership.get(from_addr, 0) - 1)

    return ownership


def get_whale_wallets(
    slug: str,
    min_owned: int = 10,
    max_events: int = 10000,
) -> list[tuple[str, int]]:
    """Return wallets holding >= min_owned tokens, sorted by count desc."""
    ownership = build_ownership_map_from_events(slug, max_events)
    whales = [
        (addr, count)
        for addr, count in ownership.items()
        if count >= min_owned
    ]
    return sorted(whales, key=lambda x: x[1], reverse=True)


def get_exact_owners_reservoir(
    contract_address: str,
    limit: int = 500,
) -> list[dict]:
    """
    Get exact current owners from Reservoir (more accurate than event replay).
    """
    resp = httpx.get(
        f"{RESERVOIR_BASE}/owners/v2",
        headers=RES_HEADERS,
        params={
            "collection": contract_address,
            "limit": limit,
            "sortBy": "tokenCount",
            "sortDirection": "desc",
        },
        timeout=15,
    )
    resp.raise_for_status()
    owners = resp.json().get("owners", [])
    return [
        {
            "address": o.get("address"),
            "token_count": o.get("ownership", {}).get("tokenCount"),
            "top_bid": o.get("ownership", {}).get("topBid", {}).get("value"),
        }
        for o in owners
    ]

Handling Rate Limits with Proxies

At 4 requests per second per key, monitoring 200 collections on a 5-minute interval is fine. But sub-minute polling, cross-marketplace scraping, or pulling web data alongside the API will exhaust limits fast.

The answer is residential proxy rotation. Cloudflare blocks datacenter IPs aggressively on the frontend, and the API has IP-level limits layered under key-level ones. Residential IPs sidestep the bans while your key handles auth.

ThorData provides a residential pool that holds up well for sustained NFT scraping. The key feature for pagination-heavy scraping is sticky sessions — maintaining the same IP across a paginated sequence to avoid anomaly detection mid-pagination.

PROXY_USER = "your_proxy_user"
PROXY_PASS = "your_proxy_pass"
PROXY_HOST = "proxy.thordata.com"
PROXY_PORT = 9000


def build_proxy_url(session_id: str | None = None) -> str:
    """
    Build proxy URL with optional sticky session.
    session_id=None → rotating (new IP per request)
    session_id="abc123" → sticky (same IP for this session)
    """
    if session_id:
        user = f"{PROXY_USER}-session-{session_id}"
    else:
        user = f"{PROXY_USER}-rotate"
    return f"http://{user}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"


def make_proxied_request(
    url: str,
    params: dict | None = None,
    session_id: str | None = None,
    extra_headers: dict | None = None,
) -> Optional[dict]:
    """Make a single request through ThorData proxy."""
    proxy_url = build_proxy_url(session_id)
    headers = {**HEADERS, **(extra_headers or {})}

    try:
        with httpx.Client(proxy=proxy_url, timeout=15) as client:
            resp = client.get(url, headers=headers, params=params)
            resp.raise_for_status()
            return resp.json()
    except (httpx.HTTPStatusError, httpx.ConnectError) as e:
        print(f"Request failed: {e}")
        return None

Building a Floor Price Tracker

This polls a list of collections every five minutes and stores results in SQLite with time-series data:

import sqlite3
import json
from datetime import datetime, timezone


def init_nft_db(path: str = "nft_floors.db") -> sqlite3.Connection:
    conn = sqlite3.connect(path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS collections (
            slug TEXT PRIMARY KEY,
            name TEXT,
            contract_address TEXT,
            total_supply INTEGER,
            added_at TEXT
        );

        CREATE TABLE IF NOT EXISTS floor_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            slug TEXT NOT NULL,
            floor_price_eth REAL,
            top_bid_eth REAL,
            volume_24h_eth REAL,
            num_owners INTEGER,
            num_listed INTEGER,
            floor_change_24h_pct REAL,
            source TEXT DEFAULT 'opensea',
            recorded_at TEXT NOT NULL,
            FOREIGN KEY(slug) REFERENCES collections(slug)
        );

        CREATE TABLE IF NOT EXISTS whale_alerts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            slug TEXT NOT NULL,
            wallet_address TEXT NOT NULL,
            event_type TEXT,
            token_count_change INTEGER,
            recorded_at TEXT NOT NULL
        );

        CREATE INDEX IF NOT EXISTS idx_floor_slug_time
            ON floor_snapshots(slug, recorded_at);
    """)
    conn.commit()
    return conn


def record_floor_snapshot(
    conn: sqlite3.Connection,
    slug: str,
    stats: dict,
    source: str = "opensea",
) -> None:
    conn.execute("""
        INSERT INTO floor_snapshots
        (slug, floor_price_eth, top_bid_eth, volume_24h_eth, num_owners,
         num_listed, floor_change_24h_pct, source, recorded_at)
        VALUES (?,?,?,?,?,?,?,?,?)
    """, (
        slug,
        stats.get("floor_price_eth"),
        stats.get("top_bid_eth"),
        stats.get("volume_24h_eth"),
        stats.get("num_owners"),
        stats.get("num_listed"),
        stats.get("floor_change_24h_pct"),
        source,
        datetime.now(timezone.utc).isoformat(),
    ))
    conn.commit()


def get_floor_history(
    conn: sqlite3.Connection,
    slug: str,
    hours: int = 168,  # 7 days
) -> list[dict]:
    """Get floor price history for plotting."""
    rows = conn.execute("""
        SELECT recorded_at, floor_price_eth, volume_24h_eth, num_owners
        FROM floor_snapshots
        WHERE slug = ?
          AND recorded_at >= datetime('now', ?)
        ORDER BY recorded_at
    """, (slug, f"-{hours} hours")).fetchall()

    return [
        {
            "timestamp": r[0],
            "floor_eth": r[1],
            "volume_24h": r[2],
            "num_owners": r[3],
        }
        for r in rows
    ]


def detect_floor_anomalies(
    conn: sqlite3.Connection,
    drop_threshold_pct: float = 10.0,
    spike_threshold_pct: float = 15.0,
) -> list[dict]:
    """Detect significant floor price changes across all collections."""
    rows = conn.execute("""
        WITH ranked AS (
            SELECT
                slug,
                floor_price_eth,
                recorded_at,
                LAG(floor_price_eth) OVER (
                    PARTITION BY slug ORDER BY recorded_at
                ) AS prev_floor
            FROM floor_snapshots
        )
        SELECT slug, prev_floor, floor_price_eth,
               ((floor_price_eth - prev_floor) / prev_floor * 100) AS pct_change,
               recorded_at
        FROM ranked
        WHERE prev_floor IS NOT NULL
          AND prev_floor > 0
          AND ABS((floor_price_eth - prev_floor) / prev_floor * 100) >= ?
        ORDER BY ABS((floor_price_eth - prev_floor) / prev_floor * 100) DESC
    """, (min(drop_threshold_pct, spike_threshold_pct),)).fetchall()

    return [
        {
            "slug": r[0],
            "prev_floor": r[1],
            "current_floor": r[2],
            "pct_change": r[3],
            "recorded_at": r[4],
        }
        for r in rows
    ]


def run_floor_tracker(
    collections: list[str],
    poll_interval_seconds: int = 300,  # 5 minutes
    proxy_url: str | None = None,
) -> None:
    """Main loop: poll collections and store floor snapshots."""
    conn = init_nft_db()

    print(f"Tracking {len(collections)} collections every {poll_interval_seconds}s")

    while True:
        poll_start = time.time()

        for slug in collections:
            try:
                stats = get_collection_stats(slug)
                if stats:
                    record_floor_snapshot(conn, slug, stats, source="opensea")
                    print(f"{slug}: floor={stats.get('floor_price_eth')} ETH, "
                          f"change={stats.get('floor_change_24h_pct', 0):.1f}%")
            except Exception as e:
                print(f"Error fetching {slug}: {e}")

            time.sleep(0.26)  # 4 req/sec limit

        # Check for anomalies after each polling round
        anomalies = detect_floor_anomalies(conn, drop_threshold_pct=10)
        for a in anomalies[:5]:
            print(f"ALERT: {a['slug']} floor changed {a['pct_change']:.1f}%")

        elapsed = time.time() - poll_start
        sleep_time = max(0, poll_interval_seconds - elapsed)
        print(f"Poll complete in {elapsed:.1f}s. Next poll in {sleep_time:.0f}s")
        time.sleep(sleep_time)


if __name__ == "__main__":
    COLLECTIONS = [
        "boredapeyachtclub",
        "pudgypenguins",
        "azuki",
        "milady",
        "cryptopunks",
        "doodles-official",
    ]
    run_floor_tracker(COLLECTIONS, poll_interval_seconds=300)

Rarity Scoring

For collections with on-chain or IPFS metadata, compute rarity scores locally:

from collections import Counter
import math


def compute_rarity_scores(nfts: list[dict]) -> list[dict]:
    """
    Compute statistical rarity for each NFT using trait frequency.
    Lower score = rarer.
    """
    total = len(nfts)
    if total == 0:
        return nfts

    # Count trait value frequencies
    trait_counts: dict[str, Counter] = {}
    for nft in nfts:
        for trait in nft.get("traits", []):
            trait_type = trait.get("trait_type", "unknown")
            trait_value = str(trait.get("value", ""))
            if trait_type not in trait_counts:
                trait_counts[trait_type] = Counter()
            trait_counts[trait_type][trait_value] += 1

    # Score each NFT
    scored_nfts = []
    for nft in nfts:
        score = 0.0
        trait_scores = []
        for trait in nft.get("traits", []):
            trait_type = trait.get("trait_type", "unknown")
            trait_value = str(trait.get("value", ""))
            count = trait_counts.get(trait_type, Counter()).get(trait_value, 1)
            frequency = count / total
            trait_score = 1.0 / frequency  # inverse frequency
            score += trait_score
            trait_scores.append({
                "type": trait_type,
                "value": trait_value,
                "frequency": frequency,
                "score": trait_score,
            })

        scored_nfts.append({
            **nft,
            "rarity_score": score,
            "trait_scores": trait_scores,
        })

    # Add rank
    scored_nfts.sort(key=lambda x: x["rarity_score"], reverse=True)
    for rank, nft in enumerate(scored_nfts, 1):
        nft["rarity_rank"] = rank

    return scored_nfts

Common Gotchas

OpenSea slugs vs contract addresses: The stats and events endpoints use the slug (e.g., boredapeyachtclub). Reservoir endpoints use the contract address (e.g., 0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d). Don't confuse them.

ETH price units in events: The raw payment data from OpenSea events comes in wei (10^18 smallest unit). Divide by 1e18 to get ETH. The stats endpoint returns ETH directly.

Burned tokens in ownership tracking: Tokens sent to 0x000...0000 or 0xdead... are burned. The ownership reconstruction function above handles the zero address, but watch for the dead address too.

API key rotation: You can apply for multiple API keys under different accounts. For sustained high-volume work, rotate between keys to stay under per-key rate limits.

Reservoir vs OpenSea discrepancy: Blur listings don't always appear on OpenSea and vice versa. For floor price accuracy, use Reservoir as the authoritative source since it aggregates all marketplaces.

Wrapping Up

The OpenSea API v2 covers most use cases cleanly within rate limits. Reservoir fills the gaps for cross-marketplace coverage. For anything beyond casual polling — dashboards, alert systems, high-frequency monitoring — proxy rotation via ThorData is part of the infrastructure, not an afterthought.

The whale tracking approach above is approximate — production use would want checkpointed ownership state and handling for off-marketplace transfers. But as a starting point it surfaces the signal that matters: large position holders moving before the floor price moves.