← Back to blog

How to Scrape OpenSea NFT Data in 2026: Listings, Floor Prices & Collection Stats

How to Scrape OpenSea NFT Data in 2026: Listings, Floor Prices & Collection Stats

OpenSea remains the dominant NFT marketplace — over 80 million NFTs across 2 million collections, with daily trading volume that still dwarfs most competitors. Whether you're tracking floor prices for trading bots, analyzing collection trends, or building a portfolio tracker, OpenSea's data is the starting point.

The platform offers an official API (v2), but it comes with tight rate limits and gaps in historical data. For anything beyond basic collection stats, you'll need to combine API calls with GraphQL queries and occasional web scraping.

What Data Can You Extract?

Between the API and direct scraping, you can get:

OpenSea's Anti-Bot Measures

OpenSea has gotten significantly more aggressive with bot detection since 2024:

  1. API rate limiting — The v2 API allows 4 requests per second with an API key. Burst above this and you get 429 responses, then temporary bans if you persist.
  2. Cloudflare protection — Web pages sit behind Cloudflare with JavaScript challenges. Simple HTTP requests get blocked immediately.
  3. GraphQL fingerprinting — Their internal GraphQL endpoint checks request headers, TLS fingerprints, and cookie state. Requests that don't look like a real browser get 403'd.
  4. IP reputation scoring — Datacenter IPs are flagged almost instantly. Shared proxy IPs that other scrapers have burned are also blocked.
  5. Wallet-linked API keys — API keys are tied to your wallet, so abuse gets your key revoked permanently.

For any serious data collection, residential proxies are essential. A service like ThorData provides residential IPs that pass OpenSea's reputation checks — datacenter proxies get blocked within minutes on OpenSea.

Setting Up: OpenSea API v2

Get an API key at docs.opensea.io. It's free but requires a wallet signature.

pip install requests beautifulsoup4

Fetching Collection Stats

import requests
import time

OPENSEA_API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.opensea.io/api/v2"

HEADERS = {
    "accept": "application/json",
    "x-api-key": OPENSEA_API_KEY,
}

def get_collection_stats(collection_slug: str) -> dict:
    \"\"\"Fetch collection-level stats from OpenSea API v2.\"\"\"
    url = f"{BASE_URL}/collections/{collection_slug}/stats"
    resp = requests.get(url, headers=HEADERS, timeout=15)
    resp.raise_for_status()
    data = resp.json()

    return {
        "slug": collection_slug,
        "floor_price": data.get("total", {}).get("floor_price"),
        "floor_price_symbol": data.get("total", {}).get("floor_price_symbol"),
        "total_volume": data.get("total", {}).get("volume"),
        "total_sales": data.get("total", {}).get("sales"),
        "num_owners": data.get("total", {}).get("num_owners"),
        "total_supply": data.get("total", {}).get("supply"),
        "market_cap": data.get("total", {}).get("market_cap"),
        "volume_24h": data.get("intervals", [{}])[0].get("volume") if data.get("intervals") else None,
        "sales_24h": data.get("intervals", [{}])[0].get("sales") if data.get("intervals") else None,
    }


# Example: Bored Ape Yacht Club
stats = get_collection_stats("boredapeyachtclub")
print(f"Floor: {stats['floor_price']} {stats['floor_price_symbol']}")
print(f"Owners: {stats['num_owners']} | Supply: {stats['total_supply']}")
print(f"24h Volume: {stats['volume_24h']}")

Listing NFTs in a Collection

def get_listings(collection_slug: str, limit: int = 50) -> list:
    \"\"\"Fetch active listings for a collection.\"\"\"
    url = f"{BASE_URL}/listings/collection/{collection_slug}/all"
    params = {"limit": min(limit, 100)}

    all_listings = []
    next_cursor = None

    while len(all_listings) < limit:
        if next_cursor:
            params["next"] = next_cursor

        resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
        resp.raise_for_status()
        data = resp.json()

        listings = data.get("listings", [])
        if not listings:
            break

        for item in listings:
            price_info = item.get("price", {}).get("current", {})
            params_data = item.get("protocol_data", {}).get("parameters", {})
            offer = params_data.get("offer", [{}])
            all_listings.append({
                "token_id": offer[0].get("identifierOrCriteria") if offer else None,
                "price_wei": price_info.get("value"),
                "price_eth": int(price_info.get("value", 0)) / 1e18 if price_info.get("value") else None,
                "currency": price_info.get("currency"),
                "seller": params_data.get("offerer"),
                "listing_date": params_data.get("startTime"),
                "expiry_date": params_data.get("endTime"),
            })

        next_cursor = data.get("next")
        if not next_cursor:
            break
        time.sleep(0.3)  # Stay under 4 req/s

    return all_listings[:limit]

Fetching Sales History via Events

def get_sales_history(collection_slug: str, limit: int = 50, after_timestamp: int = None) -> list:
    \"\"\"Fetch recent sales events for a collection.\"\"\"
    url = f"{BASE_URL}/events/collection/{collection_slug}"
    params = {
        "event_type": "sale",
        "limit": min(limit, 50),
    }
    if after_timestamp:
        params["after"] = after_timestamp

    resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
    resp.raise_for_status()
    data = resp.json()

    sales = []
    for event in data.get("asset_events", []):
        payment = event.get("payment", {})
        quantity = int(payment.get("quantity", 0))
        decimals = int(payment.get("decimals", 18))

        sales.append({
            "token_id": event.get("nft", {}).get("identifier"),
            "name": event.get("nft", {}).get("name"),
            "price_raw": quantity,
            "price_eth": quantity / (10 ** decimals) if decimals else None,
            "currency": payment.get("symbol"),
            "seller": event.get("seller"),
            "buyer": event.get("buyer"),
            "timestamp": event.get("event_timestamp"),
            "transaction": event.get("transaction"),
            "chain": event.get("chain"),
        })

    return sales

Fetching Individual NFT Metadata

def get_nft_details(collection_slug: str, token_id: str) -> dict:
    \"\"\"Fetch metadata and current price for a specific NFT.\"\"\"
    url = f"{BASE_URL}/chain/ethereum/contract/{collection_slug}/nfts/{token_id}"
    resp = requests.get(url, headers=HEADERS, timeout=15)
    resp.raise_for_status()
    nft = resp.json().get("nft", {})

    return {
        "token_id": nft.get("identifier"),
        "name": nft.get("name"),
        "description": nft.get("description"),
        "image_url": nft.get("image_url"),
        "traits": [
            {"type": t.get("trait_type"), "value": t.get("value"), "rarity": t.get("trait_count")}
            for t in nft.get("traits", [])
        ],
        "owner": nft.get("owners", [{}])[0].get("address") if nft.get("owners") else None,
        "rarity_rank": nft.get("rarity", {}).get("rank"),
        "rarity_score": nft.get("rarity", {}).get("score"),
    }

Scraping Trait Rarity Data

The API gives you traits per NFT, but for collection-wide rarity breakdowns, scraping the collection page is faster:

import random
from bs4 import BeautifulSoup
import json

def get_trait_counts(collection_slug: str, proxy: str = None) -> dict:
    \"\"\"Scrape trait count data from OpenSea collection page.

    proxy: e.g., 'http://USER:[email protected]:9000'
    \"\"\"
    url = f"https://opensea.io/collection/{collection_slug}"
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml",
        "Accept-Language": "en-US,en;q=0.9",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
    }

    proxies = {"https": proxy} if proxy else None
    resp = requests.get(url, headers=headers, proxies=proxies, timeout=20)

    # OpenSea embeds collection data in __NEXT_DATA__ JSON
    soup = BeautifulSoup(resp.text, "html.parser")
    script = soup.find("script", id="__NEXT_DATA__")

    if script:
        data = json.loads(script.string)
        # Navigate nested props to extract collection and trait data
        props = data.get("props", {}).get("pageProps", {})
        collection_data = props.get("collection", {})
        return {
            "name": collection_data.get("name"),
            "slug": collection_slug,
            "trait_data": collection_data.get("traitData", {}),
        }

    return {}

Bulk Collection Monitoring with Proxies

For tracking multiple collections continuously, you need to rotate IPs to avoid hitting rate limits:

def monitor_floor_prices(slugs: list, proxy_url: str, interval: int = 300):
    \"\"\"Monitor floor prices for multiple collections.

    proxy_url: ThorData rotating residential proxy endpoint
    interval: seconds between checks
    \"\"\"
    import json
    from datetime import datetime

    proxies = {"https": proxy_url, "http": proxy_url}

    while True:
        snapshot = {"timestamp": datetime.utcnow().isoformat(), "collections": {}}

        for slug in slugs:
            try:
                resp = requests.get(
                    f"{BASE_URL}/collections/{slug}/stats",
                    headers=HEADERS,
                    proxies=proxies,
                    timeout=15,
                )
                data = resp.json()
                floor = data.get("total", {}).get("floor_price")
                volume_24h = None
                if data.get("intervals"):
                    volume_24h = data["intervals"][0].get("volume")

                snapshot["collections"][slug] = {
                    "floor": float(floor) if floor else None,
                    "volume_24h": float(volume_24h) if volume_24h else None,
                }
            except Exception as e:
                snapshot["collections"][slug] = {"error": str(e)}

            time.sleep(random.uniform(0.3, 0.8))

        with open("floor_prices.jsonl", "a") as f:
            f.write(json.dumps(snapshot) + "\\n")

        print(f"[{snapshot['timestamp']}] Tracked {len(slugs)} collections")
        time.sleep(interval)


# Track top collections every 5 minutes
PROXY = "http://USER:[email protected]:9000"
collections = ["boredapeyachtclub", "mutant-ape-yacht-club", "azuki", "pudgypenguins", "doodles-official"]
# monitor_floor_prices(collections, PROXY)

Storing Data in SQLite

For time-series floor price analysis, SQLite is the right choice:

import sqlite3
from datetime import datetime

def init_nft_db(db_path: str = "nft_data.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)

    conn.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS floor_snapshots (
            collection_slug TEXT,
            timestamp TEXT,
            floor_price REAL,
            floor_price_symbol TEXT,
            total_volume REAL,
            num_owners INTEGER,
            total_supply INTEGER,
            volume_24h REAL,
            sales_24h INTEGER,
            PRIMARY KEY (collection_slug, timestamp)
        )
    \"\"\")

    conn.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS sales_history (
            transaction_hash TEXT PRIMARY KEY,
            collection_slug TEXT,
            token_id TEXT,
            name TEXT,
            price_eth REAL,
            currency TEXT,
            seller TEXT,
            buyer TEXT,
            event_timestamp TEXT,
            scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
        )
    \"\"\")

    conn.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS nft_metadata (
            collection_slug TEXT,
            token_id TEXT,
            name TEXT,
            rarity_rank INTEGER,
            rarity_score REAL,
            traits TEXT,
            last_updated TEXT,
            PRIMARY KEY (collection_slug, token_id)
        )
    \"\"\")

    conn.commit()
    return conn


def save_floor_snapshot(conn: sqlite3.Connection, slug: str, stats: dict):
    conn.execute(
        "INSERT OR REPLACE INTO floor_snapshots VALUES (?,?,?,?,?,?,?,?,?)",
        (
            slug,
            datetime.utcnow().isoformat(),
            stats.get("floor_price"),
            stats.get("floor_price_symbol"),
            stats.get("total_volume"),
            stats.get("num_owners"),
            stats.get("total_supply"),
            stats.get("volume_24h"),
            stats.get("sales_24h"),
        )
    )
    conn.commit()


def get_floor_trend(conn: sqlite3.Connection, slug: str, days: int = 30) -> list[dict]:
    \"\"\"Get floor price history for a collection.\"\"\"
    rows = conn.execute(\"\"\"
        SELECT timestamp, floor_price, volume_24h, sales_24h
        FROM floor_snapshots
        WHERE collection_slug = ?
          AND timestamp >= datetime('now', '-' || ? || ' days')
        ORDER BY timestamp
    \"\"\", (slug, days)).fetchall()

    return [
        {"timestamp": r[0], "floor": r[1], "volume_24h": r[2], "sales_24h": r[3]}
        for r in rows
    ]

Identifying Floor Price Anomalies

Once you have historical data, you can detect unusual price movements:

import statistics

def detect_floor_anomalies(conn: sqlite3.Connection, slug: str, z_score_threshold: float = 2.0) -> list[dict]:
    \"\"\"Find floor price snapshots that are statistical outliers.\"\"\"
    rows = conn.execute(
        "SELECT timestamp, floor_price FROM floor_snapshots WHERE collection_slug = ? AND floor_price IS NOT NULL ORDER BY timestamp",
        (slug,)
    ).fetchall()

    if len(rows) < 5:
        return []

    prices = [r[1] for r in rows]
    mean_price = statistics.mean(prices)
    stdev = statistics.stdev(prices)

    anomalies = []
    for timestamp, price in rows:
        if stdev > 0:
            z_score = abs(price - mean_price) / stdev
            if z_score >= z_score_threshold:
                direction = "spike" if price > mean_price else "crash"
                anomalies.append({
                    "timestamp": timestamp,
                    "price": price,
                    "mean": round(mean_price, 4),
                    "z_score": round(z_score, 2),
                    "direction": direction,
                })

    return anomalies

OpenSea's Terms of Service prohibit scraping, but their public API is explicitly designed for developer access. Blockchain data itself is public — anyone can query Ethereum for the same transaction data. Use the official API where possible, avoid scraping at volumes that degrade service, and don't republish their UI or proprietary ranking data. Building analytical tools on top of publicly available blockchain data is generally accepted — tools like NFTScan, Dune Analytics, and Reservoir all operate in this space.

Key Takeaways

Querying On-Chain Data Directly

For historical transaction data that OpenSea's API doesn't expose, query Ethereum directly:

def get_onchain_transfers(contract_address: str, from_block: int = 0) -> list[dict]:
    """Query ERC-721 Transfer events directly from Ethereum.

    Requires a free Alchemy or Infura endpoint.
    """
    ALCHEMY_URL = "https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"

    # ERC-721 Transfer topic
    transfer_topic = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"

    payload = {
        "jsonrpc": "2.0",
        "method": "eth_getLogs",
        "params": [{
            "fromBlock": hex(from_block),
            "toBlock": "latest",
            "address": contract_address,
            "topics": [transfer_topic],
        }],
        "id": 1,
    }

    resp = requests.post(ALCHEMY_URL, json=payload, timeout=30)
    logs = resp.json().get("result", [])

    transfers = []
    for log in logs:
        token_id = int(log["topics"][3], 16) if len(log["topics"]) > 3 else None
        transfers.append({
            "block": int(log["blockNumber"], 16),
            "tx_hash": log["transactionHash"],
            "from_addr": "0x" + log["topics"][1][-40:],
            "to_addr": "0x" + log["topics"][2][-40:],
            "token_id": token_id,
        })

    return transfers

Tracking Collection Velocity

Collection velocity -- how quickly new listings appear and sell -- is a leading indicator of market heat:

def track_listing_velocity(collection_slug: str, db_path: str = "nft_data.db"):
    """Track how fast new listings are appearing and selling."""
    conn = init_nft_db(db_path)

    # Get current listings
    current_listings = get_listings(collection_slug, limit=200)

    # Get recent sales
    recent_sales = get_sales_history(collection_slug, limit=50)

    from datetime import datetime, timezone
    now = datetime.now(timezone.utc)

    # Calculate listing age distribution
    ages_hours = []
    for listing in current_listings:
        listing_time = listing.get("listing_date")
        if listing_time:
            try:
                lt = datetime.fromtimestamp(int(listing_time), tz=timezone.utc)
                age_hours = (now - lt).total_seconds() / 3600
                ages_hours.append(age_hours)
            except (ValueError, TypeError):
                pass

    metrics = {
        "collection": collection_slug,
        "timestamp": now.isoformat(),
        "active_listings": len(current_listings),
        "recent_sales_count": len(recent_sales),
        "avg_listing_age_hours": round(sum(ages_hours) / len(ages_hours), 1) if ages_hours else None,
        "fresh_listings_24h": sum(1 for a in ages_hours if a <= 24),
    }

    return metrics


# Monitor several collections
for slug in ["boredapeyachtclub", "azuki", "pudgypenguins"]:
    velocity = track_listing_velocity(slug)
    print(f"{slug}: {velocity['active_listings']} listings, {velocity['recent_sales_count']} recent sales, {velocity['fresh_listings_24h']} new in 24h")
    time.sleep(0.5)

Comparing Rarity Tiers

NFT collections have rarity tiers (common/uncommon/rare/legendary). Mapping floor prices per tier reveals pricing inefficiencies:

def analyze_rarity_pricing(collection_slug: str, max_items: int = 200) -> dict:
    """Analyze floor prices across rarity tiers."""
    listings = get_listings(collection_slug, limit=max_items)

    # Fetch rarity for each listed token
    rarity_buckets = {"legendary": [], "rare": [], "uncommon": [], "common": []}

    for listing in listings[:50]:  # Sample to avoid rate limits
        token_id = listing.get("token_id")
        if not token_id:
            continue

        try:
            nft = get_nft_details(collection_slug, token_id)
            rank = nft.get("rarity_rank")
            price_eth = listing.get("price_eth", 0)

            if rank and price_eth:
                # Approximate tiers based on rank percentage
                total_supply = 10000  # common collection size; adjust as needed
                pct = rank / total_supply * 100
                if pct <= 1:
                    bucket = "legendary"
                elif pct <= 5:
                    bucket = "rare"
                elif pct <= 20:
                    bucket = "uncommon"
                else:
                    bucket = "common"
                rarity_buckets[bucket].append(price_eth)
            time.sleep(0.3)
        except Exception:
            pass

    from statistics import mean
    return {
        tier: {"count": len(prices), "avg_floor": round(mean(prices), 4) if prices else None}
        for tier, prices in rarity_buckets.items()
    }

Using Reservoir as a Data Supplement

Reservoir is an open-source NFT data aggregator that often has better historical data than OpenSea's API:

def get_reservoir_floor_history(collection: str, days: int = 30) -> list[dict]:
    """Get floor price history from Reservoir API (free, no key required)."""
    url = "https://api.reservoir.tools/collections/daily-volumes/v1"
    params = {
        "id": collection,
        "limit": days,
    }
    headers = {"accept": "*/*"}

    resp = requests.get(url, params=params, headers=headers, timeout=15)
    resp.raise_for_status()
    data = resp.json()

    history = []
    for entry in data.get("collections", []):
        history.append({
            "date": entry.get("timestamp"),
            "floor_price_eth": entry.get("floorSalePrice"),
            "volume_eth": entry.get("volume"),
            "sales_count": entry.get("salesCount"),
        })

    return history

Automated Portfolio Tracker

Build a wallet-based portfolio tracker that monitors your NFT holdings:

def track_wallet_portfolio(wallet_address: str) -> dict:
    """Get all NFTs owned by a wallet with current floor prices."""
    url = f"{BASE_URL}/chain/ethereum/account/{wallet_address}/nfts"
    params = {"limit": 200}

    all_nfts = []
    next_cursor = None

    while True:
        if next_cursor:
            params["next"] = next_cursor
        resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
        resp.raise_for_status()
        data = resp.json()

        for nft in data.get("nfts", []):
            collection = nft.get("collection", {})
            all_nfts.append({
                "collection": collection.get("name"),
                "collection_slug": collection.get("slug"),
                "token_id": nft.get("identifier"),
                "name": nft.get("name"),
                "image_url": nft.get("image_url"),
            })

        next_cursor = data.get("next")
        if not next_cursor:
            break
        time.sleep(0.3)

    # Enrich with floor prices
    slugs = list({nft["collection_slug"] for nft in all_nfts if nft["collection_slug"]})
    floor_prices = {}

    for slug in slugs:
        try:
            stats = get_collection_stats(slug)
            floor_prices[slug] = stats.get("floor_price", 0)
            time.sleep(0.3)
        except Exception:
            floor_prices[slug] = None

    # Calculate portfolio value
    portfolio_value = 0
    for nft in all_nfts:
        slug = nft.get("collection_slug")
        floor = floor_prices.get(slug)
        nft["floor_price_eth"] = floor
        if floor:
            portfolio_value += floor

    return {
        "wallet": wallet_address,
        "total_nfts": len(all_nfts),
        "portfolio_value_eth": round(portfolio_value, 4),
        "collections": len(slugs),
        "nfts": all_nfts,
    }

Performance Tips for High-Volume Scraping

When tracking 100+ collections continuously, these optimizations matter:

import asyncio
import httpx

async def fetch_stats_async(session: httpx.AsyncClient, slug: str) -> dict:
    """Async fetch for collection stats."""
    url = f"https://api.opensea.io/api/v2/collections/{slug}/stats"
    resp = await session.get(url)
    resp.raise_for_status()
    data = resp.json()
    total = data.get("total", {})
    return {
        "slug": slug,
        "floor_price": total.get("floor_price"),
        "volume_24h": data.get("intervals", [{}])[0].get("volume") if data.get("intervals") else None,
    }


async def bulk_fetch_stats(slugs: list[str], api_key: str) -> list[dict]:
    """Fetch stats for many collections concurrently."""
    headers = {"accept": "application/json", "x-api-key": api_key}
    limits = httpx.Limits(max_connections=4, max_keepalive_connections=4)  # Respect 4 req/s limit

    async with httpx.AsyncClient(headers=headers, limits=limits) as session:
        tasks = [fetch_stats_async(session, slug) for slug in slugs]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return [r for r in results if isinstance(r, dict)]


# Run async bulk fetch
results = asyncio.run(bulk_fetch_stats(["boredapeyachtclub", "azuki", "pudgypenguins"], OPENSEA_API_KEY))
for r in results:
    print(f"{r['slug']}: floor {r['floor_price']} ETH")