Track Amazon Bestseller Ranks with Python: BSR Scraping Guide (2026)
Amazon Bestseller Rank (BSR) is one of the most useful signals in ecommerce research. It tells you how well a product sells relative to every other product in its category, updated hourly. If you can track BSR over time, you can spot trending products before they go mainstream, watch competitors respond to launches, and identify thin niches that are suddenly heating up.
This guide covers building a working BSR tracker in Python: scraping product pages, pulling category bestseller lists, storing data in SQLite, and calculating rank velocity.
What BSR Data Looks Like
On a product page, BSR appears in the product details section — something like "#1,203 in Kitchen & Dining" with sub-category ranks below it. A single product can have ranks in multiple categories simultaneously. On a category bestseller page (amazon.com/best-sellers-books/zgbs/books/), you get the top 100 products ranked by sales, paginated across multiple pages.
Both sources are useful for different purposes. Product page scraping lets you track specific ASINs over time. Category list scraping gives you the full competitive landscape.
Dependencies
pip install httpx beautifulsoup4 lxml curl-cffi
Extracting BSR from Product Pages
The BSR block sits inside a <div id="detailBulletsWrapper_feature_div"> or the older <div id="productDetails_detailBullets_sections1"> depending on page template. Amazon changes this structure periodically, so the selector needs fallbacks.
import httpx
import re
import json
import sqlite3
import random
import time
from pathlib import Path
from datetime import datetime
from bs4 import BeautifulSoup
from curl_cffi import requests as curl_requests
# Full browser-like headers — Amazon checks these
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Referer": "https://www.amazon.com/",
"DNT": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"sec-ch-ua": '"Chromium";v="124", "Google Chrome";v="124"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
}
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
]
def make_client(proxy: str | None = None) -> curl_requests.Session:
"""Create a curl_cffi session that impersonates Chrome's TLS fingerprint."""
session = curl_requests.Session()
if proxy:
session.proxies = {"https": proxy}
return session
def get_headers() -> dict:
"""Return headers with rotated User-Agent."""
return {**HEADERS, "User-Agent": random.choice(USER_AGENTS)}
def fetch_page(
url: str,
session: curl_requests.Session,
retries: int = 3,
) -> str | None:
"""Fetch a page with TLS impersonation and retry logic."""
for attempt in range(retries):
try:
resp = session.get(
url,
impersonate="chrome124",
headers=get_headers(),
timeout=20,
allow_redirects=True,
)
if resp.status_code == 200:
# Check for CAPTCHA
if "Type the characters you see" in resp.text or "Robot Check" in resp.text:
print(f" CAPTCHA detected on attempt {attempt+1}")
time.sleep(random.uniform(10, 20))
continue
return resp.text
elif resp.status_code == 503:
wait = (2 ** attempt) * random.uniform(1, 2)
print(f" 503 on attempt {attempt+1}, waiting {wait:.1f}s")
time.sleep(wait)
else:
print(f" Status {resp.status_code} on attempt {attempt+1}")
return None
except Exception as e:
print(f" Request error on attempt {attempt+1}: {e}")
time.sleep(random.uniform(2, 5))
return None
def parse_bsr(html: str) -> list[dict]:
"""
Extract Best Sellers Rank from an Amazon product page.
Handles multiple page template variants.
"""
soup = BeautifulSoup(html, "lxml")
ranks = []
# Template 1: Detail bullets (most common in 2026)
detail_div = soup.find("div", {"id": "detailBulletsWrapper_feature_div"})
if detail_div:
text = detail_div.get_text()
matches = re.findall(r"#([\d,]+)\s+in\s+([^\n#(]+)", text)
for rank_str, category in matches:
rank = int(rank_str.replace(",", ""))
category = category.strip().rstrip("(see top 100").strip()
if rank > 0:
ranks.append({"rank": rank, "category": category})
# Template 2: Product details table
if not ranks:
table = soup.find("table", {"id": "productDetails_detailBullets_sections1"})
if table:
for th in table.select("th"):
if "Best Sellers Rank" in th.get_text():
td = th.find_next_sibling("td")
if td:
matches = re.findall(r"#([\d,]+)\s+in\s+([^\n#(]+)", td.get_text())
for rank_str, category in matches:
ranks.append({
"rank": int(rank_str.replace(",", "")),
"category": category.strip(),
})
# Template 3: Alternative product details
if not ranks:
for span in soup.select("span.a-text-bold"):
if "Best Sellers Rank" in span.get_text():
parent = span.parent
if parent:
matches = re.findall(r"#([\d,]+)\s+in\s+([^\n#(]+)", parent.get_text())
for rank_str, category in matches:
ranks.append({
"rank": int(rank_str.replace(",", "")),
"category": category.strip(),
})
# Deduplicate
seen = set()
unique_ranks = []
for r in ranks:
key = (r["rank"], r["category"][:30])
if key not in seen:
seen.add(key)
unique_ranks.append(r)
return unique_ranks
def parse_product_metadata(html: str, asin: str) -> dict:
"""Extract full product metadata from a product page."""
soup = BeautifulSoup(html, "lxml")
result = {"asin": asin}
# Title
title_el = soup.select_one("#productTitle")
result["title"] = title_el.get_text(strip=True) if title_el else None
# Price
price_el = soup.select_one(
".a-price .a-offscreen, #priceblock_ourprice, #priceblock_dealprice, "
"[class*='priceToPay'] .a-offscreen"
)
result["price"] = price_el.get_text(strip=True) if price_el else None
# Star rating
rating_el = soup.select_one("#acrPopover, [data-hook='rating-out-of-text']")
result["rating"] = rating_el.get("title", "").split(" out")[0] if rating_el else None
# Review count
reviews_el = soup.select_one("#acrCustomerReviewText, [data-hook='total-review-count']")
result["review_count"] = reviews_el.get_text(strip=True) if reviews_el else None
# Brand
brand_el = soup.select_one("#bylineInfo, #brand, .a-brand")
result["brand"] = brand_el.get_text(strip=True) if brand_el else None
# BSR
result["bsr"] = parse_bsr(html)
return result
Scraping Category Bestseller Lists
Category pages list the top 100 products. Each page shows 20 products (2 pages of 50 for the newer layout):
def scrape_category_bestsellers(
category_url: str,
session: curl_requests.Session,
max_items: int = 100,
) -> list[dict]:
"""
Scrape Amazon bestseller list for a category.
Args:
category_url: URL like "https://www.amazon.com/Best-Sellers-Books/zgbs/books/"
session: curl_cffi session
max_items: Max number of items to collect (default 100 = full list)
"""
products = []
page = 1
while len(products) < max_items:
if page == 1:
url = category_url
else:
# Page 2 uses a different URL pattern
base = category_url.rstrip("/")
url = f"{base}/ref=zg_bs_pg_{page}?pg={page}"
html = fetch_page(url, session)
if not html:
break
soup = BeautifulSoup(html, "lxml")
# Main list items
items = soup.select("#gridItemRoot, div[data-asin]")
if not items:
# Fallback selector
items = soup.select(".zg-item-immersion")
page_items = 0
for item in items:
asin = item.get("data-asin", "").strip()
if not asin or len(asin) != 10:
continue
rank_el = item.select_one("span.zg-bdg-text, .zg-badge-wrapper span")
rank_text = rank_el.get_text(strip=True) if rank_el else ""
rank_match = re.search(r"(\d+)", rank_text.replace(",", ""))
rank = int(rank_match.group(1)) if rank_match else None
title_el = item.select_one(
"._cDEzb_p13n-sc-css-line-clamp-1, "
"._cDEzb_p13n-sc-css-line-clamp-2, "
".a-link-normal span, .p13n-sc-truncated"
)
title = title_el.get_text(strip=True) if title_el else None
author_el = item.select_one(".a-size-small .a-link-child, .a-color-secondary")
author = author_el.get_text(strip=True) if author_el else None
price_el = item.select_one("._cDEzb_p13n-sc-price, .a-color-price")
price = price_el.get_text(strip=True) if price_el else None
rating_el = item.select_one(".a-icon-alt")
rating = rating_el.get_text(strip=True) if rating_el else None
review_count_el = item.select_one("span.a-size-small:last-child, .a-icon-row a")
review_count_raw = review_count_el.get_text(strip=True) if review_count_el else None
img_el = item.select_one("img")
img = img_el.get("src") if img_el else None
product = {
"asin": asin,
"rank": rank,
"title": title,
"author": author,
"price": price,
"rating": rating,
"review_count": review_count_raw,
"image": img,
"category_url": category_url,
"page": page,
"scraped_at": datetime.utcnow().isoformat(),
}
if product["asin"] and product["title"]:
products.append(product)
page_items += 1
print(f" Page {page}: {page_items} products (total {len(products)})")
if page_items == 0 or len(products) >= max_items:
break
page += 1
time.sleep(random.uniform(4, 8))
return products[:max_items]
Amazon's Anti-Bot Stack
Amazon is one of the hardest sites to scrape at volume. Here's what you're dealing with:
CAPTCHA walls. Amazon serves CAPTCHAs aggressively, especially on first request from a new IP. Check soup.title.text for "Robot Check" or "Sorry!" as an early exit signal. When you hit one, wait 10-20 seconds and rotate your proxy before retrying.
TLS fingerprinting. Amazon's edge infrastructure checks the TLS handshake signature. curl_cffi with impersonate="chrome124" reproduces Chrome's exact TLS fingerprint, which bypasses most TLS-based detection. This is why requests and plain httpx don't work as well.
Session fingerprinting. Amazon tracks your browsing session across requests. Don't reuse sessions across many product pages — rotate to a fresh session every 20-30 requests.
Request pattern analysis. Scraping pages in sequential order (rank 1, 2, 3...) is obvious bot behavior. Add randomized delays and mix category browsing with product detail requests.
Account-based blocking. Scrape without cookies/login. If Amazon sets cookies, discard them before the next session.
Proxy Configuration
For Amazon, residential proxies are strongly recommended. ThorData provides rotating residential IPs with sticky session support — essential when you need to paginate a category list without rotating mid-session:
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000
def get_proxy(sticky: bool = False, session_id: str = None) -> str:
"""
Build ThorData proxy URL.
Args:
sticky: Use a sticky session (same IP across requests)
session_id: Session identifier for sticky sessions
"""
user = THORDATA_USER
if sticky and session_id:
user += f"_session-{session_id}"
return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"
def make_client_for_category(category_id: str) -> curl_requests.Session:
"""
Create a client with a sticky session for paginating a category list.
Same IP across all pages of the category avoids session mismatch.
"""
session = curl_requests.Session()
session.proxies = {"https": get_proxy(sticky=True, session_id=f"cat_{category_id}")}
return session
Storing BSR in SQLite
DB_PATH = Path("bsr_tracker.db")
def init_db(path: Path = DB_PATH) -> sqlite3.Connection:
"""Initialize BSR tracking database."""
conn = sqlite3.connect(path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS products (
asin TEXT PRIMARY KEY,
title TEXT,
brand TEXT,
price TEXT,
rating TEXT,
review_count TEXT,
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS bsr_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asin TEXT NOT NULL,
category TEXT NOT NULL,
rank INTEGER NOT NULL,
scraped_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS category_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asin TEXT NOT NULL,
category_url TEXT NOT NULL,
rank INTEGER,
title TEXT,
author TEXT,
price TEXT,
rating TEXT,
review_count TEXT,
scraped_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_bsr_asin_time
ON bsr_snapshots(asin, scraped_at);
CREATE INDEX IF NOT EXISTS idx_bsr_category
ON bsr_snapshots(category, scraped_at);
CREATE INDEX IF NOT EXISTS idx_cat_snapshots_url
ON category_snapshots(category_url, scraped_at);
""")
conn.close()
return sqlite3.connect(path)
def save_product_bsr(conn: sqlite3.Connection, asin: str, ranks: list[dict], metadata: dict = None):
"""Save BSR snapshot and optionally product metadata."""
now = datetime.utcnow().isoformat()
rows = [(asin, r["category"], r["rank"], now) for r in ranks]
conn.executemany(
"INSERT INTO bsr_snapshots (asin, category, rank, scraped_at) VALUES (?, ?, ?, ?)",
rows,
)
if metadata:
conn.execute("""
INSERT OR REPLACE INTO products (asin, title, brand, price, rating, review_count)
VALUES (?, ?, ?, ?, ?, ?)
""", (
asin,
metadata.get("title"), metadata.get("brand"),
metadata.get("price"), metadata.get("rating"),
metadata.get("review_count"),
))
conn.commit()
def save_category_list(conn: sqlite3.Connection, products: list[dict], category_url: str):
"""Save a full category bestseller snapshot."""
now = datetime.utcnow().isoformat()
for p in products:
conn.execute("""
INSERT INTO category_snapshots
(asin, category_url, rank, title, author, price, rating, review_count, scraped_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
p.get("asin"), category_url, p.get("rank"),
p.get("title"), p.get("author"), p.get("price"),
p.get("rating"), p.get("review_count"), now,
))
conn.commit()
Rank Velocity Calculation
Velocity tells you how fast a product is climbing or dropping — the most actionable signal from BSR data:
def calculate_velocity(
conn: sqlite3.Connection,
asin: str,
category: str,
hours: int = 24,
) -> dict | None:
"""
Calculate rank velocity: rank points per hour over the last N hours.
Positive = climbing (rank number decreasing = better)
Negative = dropping
Returns None if insufficient data.
"""
rows = conn.execute("""
SELECT rank, scraped_at FROM bsr_snapshots
WHERE asin = ? AND category LIKE ?
ORDER BY scraped_at DESC
LIMIT ?
""", (asin, f"%{category[:30]}%", hours + 5)).fetchall()
if len(rows) < 2:
return None
newest_rank, newest_time = rows[0]
oldest_rank, oldest_time = rows[-1]
dt_hours = (
datetime.fromisoformat(newest_time) - datetime.fromisoformat(oldest_time)
).total_seconds() / 3600
if dt_hours < 0.1:
return None
# Positive velocity = climbing (rank number decreasing)
velocity = (oldest_rank - newest_rank) / dt_hours
# Calculate acceleration (is it speeding up or slowing down?)
mid_idx = len(rows) // 2
mid_rank, mid_time = rows[mid_idx]
first_half_velocity = (oldest_rank - mid_rank) / (dt_hours / 2) if dt_hours > 0 else 0
second_half_velocity = (mid_rank - newest_rank) / (dt_hours / 2) if dt_hours > 0 else 0
acceleration = second_half_velocity - first_half_velocity
return {
"asin": asin,
"category": category,
"current_rank": newest_rank,
"previous_rank": oldest_rank,
"velocity": round(velocity, 2),
"acceleration": round(acceleration, 2),
"direction": "climbing" if velocity > 0 else "dropping",
"period_hours": round(dt_hours, 1),
"data_points": len(rows),
}
def find_trending_products(
conn: sqlite3.Connection,
category: str,
min_velocity: float = 50.0,
hours: int = 24,
) -> list[dict]:
"""Find products with highest upward velocity in a category."""
# Get all ASINs that appeared in this category recently
asins = conn.execute("""
SELECT DISTINCT asin FROM bsr_snapshots
WHERE category LIKE ? AND scraped_at > datetime('now', ?)
""", (f"%{category[:30]}%", f"-{hours+1} hours")).fetchall()
trending = []
for (asin,) in asins:
velocity = calculate_velocity(conn, asin, category, hours)
if velocity and velocity["velocity"] >= min_velocity:
# Get title from products table
product = conn.execute(
"SELECT title FROM products WHERE asin = ?", (asin,)
).fetchone()
velocity["title"] = product[0] if product else None
trending.append(velocity)
return sorted(trending, key=lambda x: x["velocity"], reverse=True)
Detecting New Category Entries
Products that appear in the top 100 for the first time signal real momentum:
def detect_new_entries(
conn: sqlite3.Connection,
category_url: str,
hours_back: int = 25,
) -> list[dict]:
"""
Find ASINs that newly entered the bestseller list in the last N hours
but were NOT present in the previous snapshot.
"""
# Get current snapshot
current = conn.execute("""
SELECT asin, rank, title
FROM category_snapshots
WHERE category_url = ? AND scraped_at > datetime('now', ?)
ORDER BY scraped_at DESC
""", (category_url, f"-{hours_back} hours")).fetchall()
current_asins = {r[0] for r in current}
# Get previous snapshot (24+ hours ago)
previous = conn.execute("""
SELECT DISTINCT asin
FROM category_snapshots
WHERE category_url = ? AND scraped_at < datetime('now', ?)
""", (category_url, f"-{hours_back} hours")).fetchall()
previous_asins = {r[0] for r in previous}
new_entries = []
for asin, rank, title in current:
if asin not in previous_asins:
new_entries.append({
"asin": asin,
"rank": rank,
"title": title,
"status": "new_entry",
})
return sorted(new_entries, key=lambda x: x["rank"] or 999)
def detect_rank_drops(
conn: sqlite3.Connection,
category_url: str,
min_drop: int = 20,
) -> list[dict]:
"""Find products that dropped significantly in rank in the last 24 hours."""
# Latest rank for each ASIN
latest = conn.execute("""
SELECT asin, rank FROM category_snapshots
WHERE category_url = ?
AND scraped_at = (
SELECT MAX(scraped_at) FROM category_snapshots
WHERE category_url = ?
)
""", (category_url, category_url)).fetchall()
# Rank from 24 hours ago
previous = conn.execute("""
SELECT asin, rank FROM category_snapshots
WHERE category_url = ?
AND scraped_at BETWEEN datetime('now', '-26 hours') AND datetime('now', '-22 hours')
""", (category_url,)).fetchall()
prev_map = {r[0]: r[1] for r in previous}
drops = []
for asin, current_rank in latest:
prev_rank = prev_map.get(asin)
if prev_rank and current_rank and (current_rank - prev_rank) >= min_drop:
drops.append({
"asin": asin,
"current_rank": current_rank,
"previous_rank": prev_rank,
"drop": current_rank - prev_rank,
})
return sorted(drops, key=lambda x: x["drop"], reverse=True)
Full BSR Tracking Pipeline
WATCHLIST_ASINS = [
"B0D5CSL2FN", # Example ASINs
"B0CRMZD9MH",
"B0C4CJ5P5D",
]
CATEGORY_WATCHLIST = [
"https://www.amazon.com/Best-Sellers-Books/zgbs/books/",
"https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics/",
"https://www.amazon.com/best-sellers-kitchen/zgbs/kitchen/",
]
def run_bsr_pipeline(
proxy_url: str = None,
db_path: str = "bsr_tracker.db",
):
"""
Full BSR tracking run:
1. Scrape BSR from product pages for watchlist ASINs
2. Scrape category bestseller lists
3. Calculate velocities
4. Print summary
"""
conn = init_db(Path(db_path))
session = make_client(proxy_url)
print("=== BSR Tracking Run ===")
print(f"Time: {datetime.utcnow().isoformat()}")
# Step 1: Product page BSR
print(f"\n--- Scraping {len(WATCHLIST_ASINS)} product pages ---")
for asin in WATCHLIST_ASINS:
url = f"https://www.amazon.com/dp/{asin}"
html = fetch_page(url, session)
if html:
ranks = parse_bsr(html)
metadata = parse_product_metadata(html, asin)
if ranks:
save_product_bsr(conn, asin, ranks, metadata)
print(f" {asin}: {ranks[0]['rank']} in {ranks[0]['category'][:40]}")
time.sleep(random.uniform(8, 15))
# Step 2: Category lists
print(f"\n--- Scraping {len(CATEGORY_WATCHLIST)} category lists ---")
for cat_url in CATEGORY_WATCHLIST:
cat_name = cat_url.split("/")[4][:30]
cat_session = make_client_for_category(cat_name)
products = scrape_category_bestsellers(cat_url, cat_session)
save_category_list(conn, products, cat_url)
print(f" {cat_name}: {len(products)} products")
time.sleep(random.uniform(10, 20))
# Step 3: Velocity report
print("\n--- Velocity Report (last 24h) ---")
for asin in WATCHLIST_ASINS:
rows = conn.execute("""
SELECT DISTINCT category FROM bsr_snapshots WHERE asin = ? LIMIT 1
""", (asin,)).fetchone()
if rows:
v = calculate_velocity(conn, asin, rows[0])
if v:
product = conn.execute(
"SELECT title FROM products WHERE asin = ?", (asin,)
).fetchone()
title = (product[0] or "")[:50] if product else asin
print(f" {title}")
print(f" Rank: {v['current_rank']} | Velocity: {v['velocity']:+.1f}/hr ({v['direction']})")
# Step 4: New entries
print("\n--- New Category Entries ---")
for cat_url in CATEGORY_WATCHLIST:
new = detect_new_entries(conn, cat_url)
if new:
cat_name = cat_url.split("/")[4][:30]
print(f" {cat_name}: {len(new)} new entries")
for entry in new[:3]:
print(f" #{entry['rank']} {entry.get('title', entry['asin'])[:50]}")
conn.close()
print("\nRun complete.")
Use Cases
Product research. Track BSR across a niche over 30 days. Products that consistently hold a rank below 5,000 in a specific sub-category are selling daily. That is your demand signal before you commit to sourcing.
Competitor monitoring. Watch the ASINs of your direct competitors. If their BSR drops sharply after you launch a promotion, you can quantify the impact. If their rank improves, check if they changed their listing or dropped price.
Niche analysis. Scrape a full top-100 category list weekly. Track how often the list turns over. High turnover means the niche is volatile. Low turnover means incumbents are entrenched. Both are useful to know before entering.
Launch tracking. New product launches often show a sharp velocity spike in the first 72 hours from launch promotions. If you catch a product at rank 200 with a velocity of +400/hour, it is worth investigating what drove the spike.
Pricing intelligence. Correlate BSR changes with price changes. A product that climbs from rank 500 to rank 50 the same day a competitor goes out of stock is a pricing signal you can act on immediately.
BSR data is public and updated hourly. The infrastructure to collect it is straightforward. The value comes from consistency — running the scraper on a schedule and building a historical dataset over weeks and months, not one-off lookups.