How to Scrape OpenSea NFT Data in 2026: Listings, Floor Prices & Collection Stats
How to Scrape OpenSea NFT Data in 2026: Listings, Floor Prices & Collection Stats
OpenSea remains the dominant NFT marketplace — over 80 million NFTs across 2 million collections, with daily trading volume that still dwarfs most competitors. Whether you're tracking floor prices for trading bots, analyzing collection trends, or building a portfolio tracker, OpenSea's data is the starting point.
The platform offers an official API (v2), but it comes with tight rate limits and gaps in historical data. For anything beyond basic collection stats, you'll need to combine API calls with GraphQL queries and occasional web scraping.
What Data Can You Extract?
Between the API and direct scraping, you can get:
- Collection stats — floor price, total volume, number of owners, total supply
- Individual listings — current price, seller, listing date, token metadata
- Sales history — past transactions with price, buyer, seller, and timestamp
- Trait data — rarity breakdowns for each trait in a collection
- Offers and bids — current best offer, bid history
- Account activity — items owned, listed, or sold by a specific wallet
- Collection rankings — trending collections by volume, floor changes, new listings
OpenSea's Anti-Bot Measures
OpenSea has gotten significantly more aggressive with bot detection since 2024:
- API rate limiting — The v2 API allows 4 requests per second with an API key. Burst above this and you get 429 responses, then temporary bans if you persist.
- Cloudflare protection — Web pages sit behind Cloudflare with JavaScript challenges. Simple HTTP requests get blocked immediately.
- GraphQL fingerprinting — Their internal GraphQL endpoint checks request headers, TLS fingerprints, and cookie state. Requests that don't look like a real browser get 403'd.
- IP reputation scoring — Datacenter IPs are flagged almost instantly. Shared proxy IPs that other scrapers have burned are also blocked.
- Wallet-linked API keys — API keys are tied to your wallet, so abuse gets your key revoked permanently.
For any serious data collection, residential proxies are essential. A service like ThorData provides residential IPs that pass OpenSea's reputation checks — datacenter proxies get blocked within minutes on OpenSea.
Setting Up: OpenSea API v2
Get an API key at docs.opensea.io. It's free but requires a wallet signature.
pip install requests beautifulsoup4
Fetching Collection Stats
import requests
import time
OPENSEA_API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.opensea.io/api/v2"
HEADERS = {
"accept": "application/json",
"x-api-key": OPENSEA_API_KEY,
}
def get_collection_stats(collection_slug: str) -> dict:
\"\"\"Fetch collection-level stats from OpenSea API v2.\"\"\"
url = f"{BASE_URL}/collections/{collection_slug}/stats"
resp = requests.get(url, headers=HEADERS, timeout=15)
resp.raise_for_status()
data = resp.json()
return {
"slug": collection_slug,
"floor_price": data.get("total", {}).get("floor_price"),
"floor_price_symbol": data.get("total", {}).get("floor_price_symbol"),
"total_volume": data.get("total", {}).get("volume"),
"total_sales": data.get("total", {}).get("sales"),
"num_owners": data.get("total", {}).get("num_owners"),
"total_supply": data.get("total", {}).get("supply"),
"market_cap": data.get("total", {}).get("market_cap"),
"volume_24h": data.get("intervals", [{}])[0].get("volume") if data.get("intervals") else None,
"sales_24h": data.get("intervals", [{}])[0].get("sales") if data.get("intervals") else None,
}
# Example: Bored Ape Yacht Club
stats = get_collection_stats("boredapeyachtclub")
print(f"Floor: {stats['floor_price']} {stats['floor_price_symbol']}")
print(f"Owners: {stats['num_owners']} | Supply: {stats['total_supply']}")
print(f"24h Volume: {stats['volume_24h']}")
Listing NFTs in a Collection
def get_listings(collection_slug: str, limit: int = 50) -> list:
\"\"\"Fetch active listings for a collection.\"\"\"
url = f"{BASE_URL}/listings/collection/{collection_slug}/all"
params = {"limit": min(limit, 100)}
all_listings = []
next_cursor = None
while len(all_listings) < limit:
if next_cursor:
params["next"] = next_cursor
resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
listings = data.get("listings", [])
if not listings:
break
for item in listings:
price_info = item.get("price", {}).get("current", {})
params_data = item.get("protocol_data", {}).get("parameters", {})
offer = params_data.get("offer", [{}])
all_listings.append({
"token_id": offer[0].get("identifierOrCriteria") if offer else None,
"price_wei": price_info.get("value"),
"price_eth": int(price_info.get("value", 0)) / 1e18 if price_info.get("value") else None,
"currency": price_info.get("currency"),
"seller": params_data.get("offerer"),
"listing_date": params_data.get("startTime"),
"expiry_date": params_data.get("endTime"),
})
next_cursor = data.get("next")
if not next_cursor:
break
time.sleep(0.3) # Stay under 4 req/s
return all_listings[:limit]
Fetching Sales History via Events
def get_sales_history(collection_slug: str, limit: int = 50, after_timestamp: int = None) -> list:
\"\"\"Fetch recent sales events for a collection.\"\"\"
url = f"{BASE_URL}/events/collection/{collection_slug}"
params = {
"event_type": "sale",
"limit": min(limit, 50),
}
if after_timestamp:
params["after"] = after_timestamp
resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
sales = []
for event in data.get("asset_events", []):
payment = event.get("payment", {})
quantity = int(payment.get("quantity", 0))
decimals = int(payment.get("decimals", 18))
sales.append({
"token_id": event.get("nft", {}).get("identifier"),
"name": event.get("nft", {}).get("name"),
"price_raw": quantity,
"price_eth": quantity / (10 ** decimals) if decimals else None,
"currency": payment.get("symbol"),
"seller": event.get("seller"),
"buyer": event.get("buyer"),
"timestamp": event.get("event_timestamp"),
"transaction": event.get("transaction"),
"chain": event.get("chain"),
})
return sales
Fetching Individual NFT Metadata
def get_nft_details(collection_slug: str, token_id: str) -> dict:
\"\"\"Fetch metadata and current price for a specific NFT.\"\"\"
url = f"{BASE_URL}/chain/ethereum/contract/{collection_slug}/nfts/{token_id}"
resp = requests.get(url, headers=HEADERS, timeout=15)
resp.raise_for_status()
nft = resp.json().get("nft", {})
return {
"token_id": nft.get("identifier"),
"name": nft.get("name"),
"description": nft.get("description"),
"image_url": nft.get("image_url"),
"traits": [
{"type": t.get("trait_type"), "value": t.get("value"), "rarity": t.get("trait_count")}
for t in nft.get("traits", [])
],
"owner": nft.get("owners", [{}])[0].get("address") if nft.get("owners") else None,
"rarity_rank": nft.get("rarity", {}).get("rank"),
"rarity_score": nft.get("rarity", {}).get("score"),
}
Scraping Trait Rarity Data
The API gives you traits per NFT, but for collection-wide rarity breakdowns, scraping the collection page is faster:
import random
from bs4 import BeautifulSoup
import json
def get_trait_counts(collection_slug: str, proxy: str = None) -> dict:
\"\"\"Scrape trait count data from OpenSea collection page.
proxy: e.g., 'http://USER:[email protected]:9000'
\"\"\"
url = f"https://opensea.io/collection/{collection_slug}"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "en-US,en;q=0.9",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
}
proxies = {"https": proxy} if proxy else None
resp = requests.get(url, headers=headers, proxies=proxies, timeout=20)
# OpenSea embeds collection data in __NEXT_DATA__ JSON
soup = BeautifulSoup(resp.text, "html.parser")
script = soup.find("script", id="__NEXT_DATA__")
if script:
data = json.loads(script.string)
# Navigate nested props to extract collection and trait data
props = data.get("props", {}).get("pageProps", {})
collection_data = props.get("collection", {})
return {
"name": collection_data.get("name"),
"slug": collection_slug,
"trait_data": collection_data.get("traitData", {}),
}
return {}
Bulk Collection Monitoring with Proxies
For tracking multiple collections continuously, you need to rotate IPs to avoid hitting rate limits:
def monitor_floor_prices(slugs: list, proxy_url: str, interval: int = 300):
\"\"\"Monitor floor prices for multiple collections.
proxy_url: ThorData rotating residential proxy endpoint
interval: seconds between checks
\"\"\"
import json
from datetime import datetime
proxies = {"https": proxy_url, "http": proxy_url}
while True:
snapshot = {"timestamp": datetime.utcnow().isoformat(), "collections": {}}
for slug in slugs:
try:
resp = requests.get(
f"{BASE_URL}/collections/{slug}/stats",
headers=HEADERS,
proxies=proxies,
timeout=15,
)
data = resp.json()
floor = data.get("total", {}).get("floor_price")
volume_24h = None
if data.get("intervals"):
volume_24h = data["intervals"][0].get("volume")
snapshot["collections"][slug] = {
"floor": float(floor) if floor else None,
"volume_24h": float(volume_24h) if volume_24h else None,
}
except Exception as e:
snapshot["collections"][slug] = {"error": str(e)}
time.sleep(random.uniform(0.3, 0.8))
with open("floor_prices.jsonl", "a") as f:
f.write(json.dumps(snapshot) + "\\n")
print(f"[{snapshot['timestamp']}] Tracked {len(slugs)} collections")
time.sleep(interval)
# Track top collections every 5 minutes
PROXY = "http://USER:[email protected]:9000"
collections = ["boredapeyachtclub", "mutant-ape-yacht-club", "azuki", "pudgypenguins", "doodles-official"]
# monitor_floor_prices(collections, PROXY)
Storing Data in SQLite
For time-series floor price analysis, SQLite is the right choice:
import sqlite3
from datetime import datetime
def init_nft_db(db_path: str = "nft_data.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute(\"\"\"
CREATE TABLE IF NOT EXISTS floor_snapshots (
collection_slug TEXT,
timestamp TEXT,
floor_price REAL,
floor_price_symbol TEXT,
total_volume REAL,
num_owners INTEGER,
total_supply INTEGER,
volume_24h REAL,
sales_24h INTEGER,
PRIMARY KEY (collection_slug, timestamp)
)
\"\"\")
conn.execute(\"\"\"
CREATE TABLE IF NOT EXISTS sales_history (
transaction_hash TEXT PRIMARY KEY,
collection_slug TEXT,
token_id TEXT,
name TEXT,
price_eth REAL,
currency TEXT,
seller TEXT,
buyer TEXT,
event_timestamp TEXT,
scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
)
\"\"\")
conn.execute(\"\"\"
CREATE TABLE IF NOT EXISTS nft_metadata (
collection_slug TEXT,
token_id TEXT,
name TEXT,
rarity_rank INTEGER,
rarity_score REAL,
traits TEXT,
last_updated TEXT,
PRIMARY KEY (collection_slug, token_id)
)
\"\"\")
conn.commit()
return conn
def save_floor_snapshot(conn: sqlite3.Connection, slug: str, stats: dict):
conn.execute(
"INSERT OR REPLACE INTO floor_snapshots VALUES (?,?,?,?,?,?,?,?,?)",
(
slug,
datetime.utcnow().isoformat(),
stats.get("floor_price"),
stats.get("floor_price_symbol"),
stats.get("total_volume"),
stats.get("num_owners"),
stats.get("total_supply"),
stats.get("volume_24h"),
stats.get("sales_24h"),
)
)
conn.commit()
def get_floor_trend(conn: sqlite3.Connection, slug: str, days: int = 30) -> list[dict]:
\"\"\"Get floor price history for a collection.\"\"\"
rows = conn.execute(\"\"\"
SELECT timestamp, floor_price, volume_24h, sales_24h
FROM floor_snapshots
WHERE collection_slug = ?
AND timestamp >= datetime('now', '-' || ? || ' days')
ORDER BY timestamp
\"\"\", (slug, days)).fetchall()
return [
{"timestamp": r[0], "floor": r[1], "volume_24h": r[2], "sales_24h": r[3]}
for r in rows
]
Identifying Floor Price Anomalies
Once you have historical data, you can detect unusual price movements:
import statistics
def detect_floor_anomalies(conn: sqlite3.Connection, slug: str, z_score_threshold: float = 2.0) -> list[dict]:
\"\"\"Find floor price snapshots that are statistical outliers.\"\"\"
rows = conn.execute(
"SELECT timestamp, floor_price FROM floor_snapshots WHERE collection_slug = ? AND floor_price IS NOT NULL ORDER BY timestamp",
(slug,)
).fetchall()
if len(rows) < 5:
return []
prices = [r[1] for r in rows]
mean_price = statistics.mean(prices)
stdev = statistics.stdev(prices)
anomalies = []
for timestamp, price in rows:
if stdev > 0:
z_score = abs(price - mean_price) / stdev
if z_score >= z_score_threshold:
direction = "spike" if price > mean_price else "crash"
anomalies.append({
"timestamp": timestamp,
"price": price,
"mean": round(mean_price, 4),
"z_score": round(z_score, 2),
"direction": direction,
})
return anomalies
Legal Considerations
OpenSea's Terms of Service prohibit scraping, but their public API is explicitly designed for developer access. Blockchain data itself is public — anyone can query Ethereum for the same transaction data. Use the official API where possible, avoid scraping at volumes that degrade service, and don't republish their UI or proprietary ranking data. Building analytical tools on top of publicly available blockchain data is generally accepted — tools like NFTScan, Dune Analytics, and Reservoir all operate in this space.
Key Takeaways
- Start with OpenSea's v2 API — collection stats, listings, and sales events are all available with a free key.
- Respect the 4 requests/second rate limit. Add 0.3s delays between calls and implement exponential backoff on 429s.
- For web scraping (trait rarity, UI-only data), residential proxies are mandatory. ThorData's rotating residential IPs avoid the IP reputation blocks that kill datacenter proxies on OpenSea.
- Store floor price snapshots in SQLite for time-series analysis and anomaly detection.
- Blockchain transaction data is public — for historical sales, consider querying Ethereum directly or using indexers like Reservoir alongside OpenSea.
- The
rarity_rankfield from the NFT endpoint lets you quickly identify if a listed item is underpriced relative to its trait rarity.
Querying On-Chain Data Directly
For historical transaction data that OpenSea's API doesn't expose, query Ethereum directly:
def get_onchain_transfers(contract_address: str, from_block: int = 0) -> list[dict]:
"""Query ERC-721 Transfer events directly from Ethereum.
Requires a free Alchemy or Infura endpoint.
"""
ALCHEMY_URL = "https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
# ERC-721 Transfer topic
transfer_topic = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
payload = {
"jsonrpc": "2.0",
"method": "eth_getLogs",
"params": [{
"fromBlock": hex(from_block),
"toBlock": "latest",
"address": contract_address,
"topics": [transfer_topic],
}],
"id": 1,
}
resp = requests.post(ALCHEMY_URL, json=payload, timeout=30)
logs = resp.json().get("result", [])
transfers = []
for log in logs:
token_id = int(log["topics"][3], 16) if len(log["topics"]) > 3 else None
transfers.append({
"block": int(log["blockNumber"], 16),
"tx_hash": log["transactionHash"],
"from_addr": "0x" + log["topics"][1][-40:],
"to_addr": "0x" + log["topics"][2][-40:],
"token_id": token_id,
})
return transfers
Tracking Collection Velocity
Collection velocity -- how quickly new listings appear and sell -- is a leading indicator of market heat:
def track_listing_velocity(collection_slug: str, db_path: str = "nft_data.db"):
"""Track how fast new listings are appearing and selling."""
conn = init_nft_db(db_path)
# Get current listings
current_listings = get_listings(collection_slug, limit=200)
# Get recent sales
recent_sales = get_sales_history(collection_slug, limit=50)
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
# Calculate listing age distribution
ages_hours = []
for listing in current_listings:
listing_time = listing.get("listing_date")
if listing_time:
try:
lt = datetime.fromtimestamp(int(listing_time), tz=timezone.utc)
age_hours = (now - lt).total_seconds() / 3600
ages_hours.append(age_hours)
except (ValueError, TypeError):
pass
metrics = {
"collection": collection_slug,
"timestamp": now.isoformat(),
"active_listings": len(current_listings),
"recent_sales_count": len(recent_sales),
"avg_listing_age_hours": round(sum(ages_hours) / len(ages_hours), 1) if ages_hours else None,
"fresh_listings_24h": sum(1 for a in ages_hours if a <= 24),
}
return metrics
# Monitor several collections
for slug in ["boredapeyachtclub", "azuki", "pudgypenguins"]:
velocity = track_listing_velocity(slug)
print(f"{slug}: {velocity['active_listings']} listings, {velocity['recent_sales_count']} recent sales, {velocity['fresh_listings_24h']} new in 24h")
time.sleep(0.5)
Comparing Rarity Tiers
NFT collections have rarity tiers (common/uncommon/rare/legendary). Mapping floor prices per tier reveals pricing inefficiencies:
def analyze_rarity_pricing(collection_slug: str, max_items: int = 200) -> dict:
"""Analyze floor prices across rarity tiers."""
listings = get_listings(collection_slug, limit=max_items)
# Fetch rarity for each listed token
rarity_buckets = {"legendary": [], "rare": [], "uncommon": [], "common": []}
for listing in listings[:50]: # Sample to avoid rate limits
token_id = listing.get("token_id")
if not token_id:
continue
try:
nft = get_nft_details(collection_slug, token_id)
rank = nft.get("rarity_rank")
price_eth = listing.get("price_eth", 0)
if rank and price_eth:
# Approximate tiers based on rank percentage
total_supply = 10000 # common collection size; adjust as needed
pct = rank / total_supply * 100
if pct <= 1:
bucket = "legendary"
elif pct <= 5:
bucket = "rare"
elif pct <= 20:
bucket = "uncommon"
else:
bucket = "common"
rarity_buckets[bucket].append(price_eth)
time.sleep(0.3)
except Exception:
pass
from statistics import mean
return {
tier: {"count": len(prices), "avg_floor": round(mean(prices), 4) if prices else None}
for tier, prices in rarity_buckets.items()
}
Using Reservoir as a Data Supplement
Reservoir is an open-source NFT data aggregator that often has better historical data than OpenSea's API:
def get_reservoir_floor_history(collection: str, days: int = 30) -> list[dict]:
"""Get floor price history from Reservoir API (free, no key required)."""
url = "https://api.reservoir.tools/collections/daily-volumes/v1"
params = {
"id": collection,
"limit": days,
}
headers = {"accept": "*/*"}
resp = requests.get(url, params=params, headers=headers, timeout=15)
resp.raise_for_status()
data = resp.json()
history = []
for entry in data.get("collections", []):
history.append({
"date": entry.get("timestamp"),
"floor_price_eth": entry.get("floorSalePrice"),
"volume_eth": entry.get("volume"),
"sales_count": entry.get("salesCount"),
})
return history
Automated Portfolio Tracker
Build a wallet-based portfolio tracker that monitors your NFT holdings:
def track_wallet_portfolio(wallet_address: str) -> dict:
"""Get all NFTs owned by a wallet with current floor prices."""
url = f"{BASE_URL}/chain/ethereum/account/{wallet_address}/nfts"
params = {"limit": 200}
all_nfts = []
next_cursor = None
while True:
if next_cursor:
params["next"] = next_cursor
resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
for nft in data.get("nfts", []):
collection = nft.get("collection", {})
all_nfts.append({
"collection": collection.get("name"),
"collection_slug": collection.get("slug"),
"token_id": nft.get("identifier"),
"name": nft.get("name"),
"image_url": nft.get("image_url"),
})
next_cursor = data.get("next")
if not next_cursor:
break
time.sleep(0.3)
# Enrich with floor prices
slugs = list({nft["collection_slug"] for nft in all_nfts if nft["collection_slug"]})
floor_prices = {}
for slug in slugs:
try:
stats = get_collection_stats(slug)
floor_prices[slug] = stats.get("floor_price", 0)
time.sleep(0.3)
except Exception:
floor_prices[slug] = None
# Calculate portfolio value
portfolio_value = 0
for nft in all_nfts:
slug = nft.get("collection_slug")
floor = floor_prices.get(slug)
nft["floor_price_eth"] = floor
if floor:
portfolio_value += floor
return {
"wallet": wallet_address,
"total_nfts": len(all_nfts),
"portfolio_value_eth": round(portfolio_value, 4),
"collections": len(slugs),
"nfts": all_nfts,
}
Performance Tips for High-Volume Scraping
When tracking 100+ collections continuously, these optimizations matter:
import asyncio
import httpx
async def fetch_stats_async(session: httpx.AsyncClient, slug: str) -> dict:
"""Async fetch for collection stats."""
url = f"https://api.opensea.io/api/v2/collections/{slug}/stats"
resp = await session.get(url)
resp.raise_for_status()
data = resp.json()
total = data.get("total", {})
return {
"slug": slug,
"floor_price": total.get("floor_price"),
"volume_24h": data.get("intervals", [{}])[0].get("volume") if data.get("intervals") else None,
}
async def bulk_fetch_stats(slugs: list[str], api_key: str) -> list[dict]:
"""Fetch stats for many collections concurrently."""
headers = {"accept": "application/json", "x-api-key": api_key}
limits = httpx.Limits(max_connections=4, max_keepalive_connections=4) # Respect 4 req/s limit
async with httpx.AsyncClient(headers=headers, limits=limits) as session:
tasks = [fetch_stats_async(session, slug) for slug in slugs]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if isinstance(r, dict)]
# Run async bulk fetch
results = asyncio.run(bulk_fetch_stats(["boredapeyachtclub", "azuki", "pudgypenguins"], OPENSEA_API_KEY))
for r in results:
print(f"{r['slug']}: floor {r['floor_price']} ETH")