Scrape Amazon Kindle Books: E-Book Metadata & Bestseller Rankings (2026)
Scrape Amazon Kindle Books: E-Book Metadata & Bestseller Rankings (2026)
Amazon's Kindle Store is the largest e-book marketplace. If you're building a book recommendation engine, tracking publishing trends, or doing market research on self-publishing niches, you need access to its data — titles, prices, BSR rankings, reviews, and category placements.
There are two paths: the official Product Advertising API (PA-API 5.0) and direct web scraping. In practice, you'll probably use both — PA-API for ASIN lookups you know, scraping for discovery.
What Data Is Available
From Kindle product pages and category listings, you can collect:
- Title, subtitle, series name
- Author(s) — including pen names and co-authors
- Price — Kindle price, paperback price, audiobook price if available
- ASIN — Amazon's unique identifier
- Publisher and publication date
- File size and page count (print length)
- Language and reading level
- Categories — Kindle Store category path (up to 3 levels deep)
- Best Sellers Rank — category and sub-category ranks
- Star rating and review count
- Kindle Unlimited eligibility
- Simultaneous device usage
- X-Ray, Whispersync, Word Wise availability
The Official Route: Product Advertising API 5.0
Amazon's PA-API gives you structured access to product data. The catch: you need an Amazon Associates account with qualifying sales in the last 30 days (or the account goes inactive), and rate limits are strict.
import hashlib
import hmac
import json
import time
from datetime import datetime, timezone
from curl_cffi import requests as curl_requests
class AmazonPAAPI:
"""Amazon Product Advertising API 5.0 client."""
HOST = "webservices.amazon.com"
REGION = "us-east-1"
SERVICE = "ProductAdvertisingAPI"
PATH = "/paapi5/searchitems"
def __init__(self, access_key: str, secret_key: str, partner_tag: str):
self.access_key = access_key
self.secret_key = secret_key
self.partner_tag = partner_tag
def _sign(self, payload: str, target: str) -> dict:
"""Generate AWS Signature Version 4 headers."""
now = datetime.now(timezone.utc)
datestamp = now.strftime("%Y%m%d")
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
headers = {
"content-type": "application/json; charset=utf-8",
"host": self.HOST,
"x-amz-date": amz_date,
"x-amz-target": f"com.amazon.paapi5.v1.ProductAdvertisingAPIv1.{target}",
"content-encoding": "amz-1.0",
}
signed_headers_str = ";".join(sorted(headers.keys()))
canonical_headers = "\n".join(f"{k}:{v}" for k, v in sorted(headers.items())) + "\n"
payload_hash = hashlib.sha256(payload.encode()).hexdigest()
canonical_request = f"POST\n{self.PATH}\n\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
credential_scope = f"{datestamp}/{self.REGION}/{self.SERVICE}/aws4_request"
string_to_sign = (
f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n"
f"{hashlib.sha256(canonical_request.encode()).hexdigest()}"
)
def _hmac256(key, msg):
return hmac.new(key if isinstance(key, bytes) else key.encode(), msg.encode(), hashlib.sha256).digest()
signing_key = _hmac256(
_hmac256(_hmac256(_hmac256(f"AWS4{self.secret_key}", datestamp), self.REGION), self.SERVICE),
"aws4_request"
)
signature = hmac.new(signing_key, string_to_sign.encode(), hashlib.sha256).hexdigest()
headers["authorization"] = (
f"AWS4-HMAC-SHA256 Credential={self.access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers_str}, Signature={signature}"
)
return headers
def search_kindle_books(
self,
keywords: str,
browse_node_id: str = None,
page: int = 1,
sort_by: str = "Relevance",
) -> dict:
"""
Search Kindle Store.
Args:
keywords: Search query
browse_node_id: Kindle category browse node (see below)
page: Result page (1-10)
sort_by: "Relevance", "Featured", "NewestArrivals", "Price:HighToLow", "Price:LowToHigh"
Common Kindle browse node IDs:
154606011 = Kindle Store (top level)
1286228011 = Kindle eBooks
157028011 = Mystery, Thriller & Suspense
17659 = Science Fiction & Fantasy
10129 = Business & Investing
6013 = Biographies & Memoirs
2590149011 = Romance
"""
req_body = {
"Keywords": keywords,
"SearchIndex": "KindleStore",
"ItemPage": page,
"SortBy": sort_by,
"PartnerTag": self.partner_tag,
"PartnerType": "Associates",
"Marketplace": "www.amazon.com",
"Resources": [
"ItemInfo.Title",
"ItemInfo.ByLineInfo",
"ItemInfo.ContentInfo",
"ItemInfo.ProductInfo",
"ItemInfo.Classifications",
"Offers.Listings.Price",
"Offers.Listings.Availability.Type",
"BrowseNodeInfo.BrowseNodes",
"BrowseNodeInfo.BrowseNodes.Ancestor",
"SearchRefinements",
],
}
if browse_node_id:
req_body["BrowseNodeId"] = browse_node_id
payload = json.dumps(req_body)
headers = self._sign(payload, "SearchItems")
resp = curl_requests.post(
f"https://{self.HOST}{self.PATH}",
content=payload,
headers=headers,
impersonate="chrome124",
timeout=15,
)
return resp.json()
def get_items(self, asins: list[str]) -> dict:
"""Look up specific items by ASIN."""
payload = json.dumps({
"ItemIds": asins[:10], # Max 10 per request
"PartnerTag": self.partner_tag,
"PartnerType": "Associates",
"Marketplace": "www.amazon.com",
"Resources": [
"ItemInfo.Title",
"ItemInfo.ByLineInfo",
"ItemInfo.ContentInfo",
"ItemInfo.ProductInfo",
"ItemInfo.TechnicalInfo",
"Offers.Listings.Price",
"BrowseNodeInfo.BrowseNodes",
],
})
headers = self._sign(payload, "GetItems")
resp = curl_requests.post(
f"https://{self.HOST}/paapi5/getitems",
content=payload,
headers=headers,
impersonate="chrome124",
timeout=15,
)
return resp.json()
Direct Scraping: Kindle Bestseller Lists
For category discovery and trend tracking without a PA-API account, scrape the bestseller pages directly:
import re
import sqlite3
import random
import time
from datetime import datetime
from bs4 import BeautifulSoup
from curl_cffi import requests as curl_requests
from pathlib import Path
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
}
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
]
def create_session(proxy_url: str = None) -> curl_requests.Session:
"""Create a curl_cffi session with browser TLS impersonation."""
session = curl_requests.Session()
if proxy_url:
session.proxies = {"https": proxy_url}
return session
def fetch_kindle_page(
url: str,
session: curl_requests.Session,
retries: int = 3,
) -> str | None:
"""Fetch a Kindle page with TLS impersonation and CAPTCHA detection."""
headers = {**HEADERS, "User-Agent": random.choice(USER_AGENTS)}
for attempt in range(retries):
try:
resp = session.get(
url,
impersonate="chrome124",
headers=headers,
timeout=20,
allow_redirects=True,
)
if resp.status_code != 200:
time.sleep(random.uniform(3, 7))
continue
# Detect CAPTCHA
if any(x in resp.text for x in ["Robot Check", "Type the characters", "Sorry!"]):
print(f" CAPTCHA on attempt {attempt+1}, waiting...")
time.sleep(random.uniform(15, 30))
continue
return resp.text
except Exception as e:
print(f" Fetch error attempt {attempt+1}: {e}")
time.sleep(random.uniform(2, 5))
return None
def scrape_kindle_bestsellers(
category_url: str,
session: curl_requests.Session,
) -> list[dict]:
"""
Scrape a Kindle bestseller list page.
Common Kindle category URLs:
- https://www.amazon.com/Best-Sellers-Kindle-Store/zgbs/digital-text/
- https://www.amazon.com/Best-Sellers-Kindle-eBooks-Mystery/zgbs/digital-text/18580176011/
- https://www.amazon.com/Best-Sellers-Kindle-Store-Romance/zgbs/digital-text/6057053011/
"""
books = []
for page_num in range(1, 3): # Each page has 50 items, 2 pages = 100
url = category_url if page_num == 1 else f"{category_url.rstrip('/')}?pg={page_num}"
html = fetch_kindle_page(url, session)
if not html:
break
soup = BeautifulSoup(html, "lxml")
items = soup.select("#gridItemRoot")
if not items:
items = soup.select("div[data-asin]")
page_books = 0
for item in items:
book = parse_kindle_card(item)
if book and book.get("asin"):
books.append(book)
page_books += 1
print(f" Page {page_num}: {page_books} books")
time.sleep(random.uniform(4, 8))
return books
def parse_kindle_card(item) -> dict:
"""Parse a Kindle bestseller list item."""
asin = item.get("data-asin", "").strip()
if not asin:
return {}
# Rank
rank_el = item.select_one("span.zg-bdg-text, .zg-badge-wrapper span")
rank_text = rank_el.get_text(strip=True) if rank_el else ""
rank_match = re.search(r"(\d+)", rank_text.replace(",", ""))
rank = int(rank_match.group(1)) if rank_match else None
# Title
title_el = item.select_one(
"._cDEzb_p13n-sc-css-line-clamp-1, "
"._cDEzb_p13n-sc-css-line-clamp-2, "
".p13n-sc-truncated, .a-link-normal > span"
)
title = title_el.get_text(strip=True) if title_el else None
# Author
author_el = item.select_one(".a-size-small .a-link-child, span.a-color-secondary")
author = author_el.get_text(strip=True) if author_el else None
# Price
price_el = item.select_one("._cDEzb_p13n-sc-price, .a-color-price, .a-price .a-offscreen")
price = price_el.get_text(strip=True) if price_el else None
# Rating
rating_el = item.select_one(".a-icon-alt")
rating = None
if rating_el:
r_text = rating_el.get_text(strip=True)
r_match = re.search(r"([\d.]+)", r_text)
rating = float(r_match.group(1)) if r_match else None
# Review count
reviews_el = item.select_one("span.a-size-small:last-child, .a-size-small a")
review_count = None
if reviews_el:
r_text = reviews_el.get_text(strip=True).replace(",", "")
r_match = re.search(r"(\d+)", r_text)
review_count = int(r_match.group(1)) if r_match else None
# Kindle Unlimited indicator
ku_el = item.select_one(".a-badge-label")
is_kindle_unlimited = ku_el and "Kindle Unlimited" in ku_el.get_text()
# Cover image
img_el = item.select_one("img")
cover_url = img_el.get("src") if img_el else None
return {
"asin": asin,
"rank": rank,
"title": title,
"author": author,
"price": price,
"rating": rating,
"review_count": review_count,
"is_kindle_unlimited": bool(is_kindle_unlimited),
"cover_url": cover_url,
}
Scraping Detailed Book Pages
Once you have ASINs, fetch full metadata from individual product pages:
def scrape_kindle_book_details(
asin: str,
session: curl_requests.Session,
) -> dict:
"""Fetch detailed metadata for a Kindle book by ASIN."""
url = f"https://www.amazon.com/dp/{asin}"
html = fetch_kindle_page(url, session)
if not html:
return {"asin": asin, "error": "fetch failed"}
soup = BeautifulSoup(html, "lxml")
book = {"asin": asin}
# Title
title_el = soup.select_one("#productTitle")
book["title"] = title_el.get_text(strip=True) if title_el else None
# Author
author_el = soup.select_one(".author .a-link-normal, #bylineInfo .a-link-normal")
book["author"] = author_el.get_text(strip=True) if author_el else None
# Kindle price
kindle_price_el = soup.select_one(
"#kindle-price, #tp-price-block-kindle, "
"[data-action='show-all-offers-display'] .a-color-price, "
".kindle-price .a-color-price"
)
book["kindle_price"] = kindle_price_el.get_text(strip=True) if kindle_price_el else None
# Paperback price (for comparison)
paperback_el = soup.select_one("#paperback_price, #price, .print-list-price")
book["paperback_price"] = paperback_el.get_text(strip=True) if paperback_el else None
# Description
desc_el = soup.select_one("#bookDescription_feature_div", "#productDescription")
book["description"] = desc_el.get_text(strip=True)[:3000] if desc_el else None
# Rating
rating_el = soup.select_one("#acrPopover")
if rating_el:
r_text = rating_el.get("title", "")
r_match = re.search(r"([\d.]+)", r_text)
book["rating"] = float(r_match.group(1)) if r_match else None
# Review count
reviews_el = soup.select_one("#acrCustomerReviewText")
if reviews_el:
r_text = reviews_el.get_text(strip=True).replace(",", "")
r_match = re.search(r"(\d+)", r_text)
book["review_count"] = int(r_match.group(1)) if r_match else None
# Product details table
details = {}
for li in soup.select("#detailBullets_feature_div li, #productDetails_techSpec_section_1 tr"):
text = li.get_text(separator=": ", strip=True)
if ":" in text:
key, _, val = text.partition(":")
details[key.strip()] = val.strip()
book["print_length"] = details.get("Print length", details.get("Print Length"))
book["language"] = details.get("Language")
book["publisher"] = details.get("Publisher")
book["publication_date"] = details.get("Publication date", details.get("Publication Date"))
book["word_wise"] = details.get("Word Wise")
book["enhanced_typesetting"] = details.get("Enhanced typesetting")
book["x_ray"] = details.get("X-Ray")
# Best Sellers Rank
bsr_ranks = []
for li in soup.select("#detailBullets_feature_div li, #SalesRank"):
li_text = li.get_text()
if "Best Sellers Rank" in li_text or "Amazon Best Sellers Rank" in li_text:
matches = re.findall(r"#([\d,]+)\s+in\s+([^(#\n]+)", li_text)
for rank_str, category in matches:
bsr_ranks.append({
"rank": int(rank_str.replace(",", "")),
"category": category.strip().rstrip(";").strip(),
})
book["bsr"] = bsr_ranks
# Categories / breadcrumbs
breadcrumbs = soup.select("#wayfinding-breadcrumbs_feature_div a, .a-breadcrumb a")
book["categories"] = [b.get_text(strip=True) for b in breadcrumbs if b.get_text(strip=True)]
# Series info
series_el = soup.select_one("[id*='series'], .series-childAsin-item")
book["series"] = series_el.get_text(strip=True) if series_el else None
# Kindle Unlimited
ku_el = soup.select_one("[data-feature-name='lendingEligibility'], .kindle-unlimited")
book["kindle_unlimited"] = bool(ku_el)
# Cover image
img_el = soup.select_one("#imgBlkFront, #landingImage")
book["cover_url"] = img_el.get("src") if img_el else None
return book
Proxy Setup for Amazon
Amazon aggressively blocks datacenter IPs. Use ThorData's residential proxy pool for reliable access:
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000
def get_proxy(sticky: bool = False, session_id: str = None) -> str:
user = THORDATA_USER
if sticky and session_id:
user += f"_session-{session_id}"
return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"
def make_kindle_scraper(sticky_id: str = None) -> curl_requests.Session:
"""Create a scraper session with optional sticky IP."""
proxy = get_proxy(sticky=bool(sticky_id), session_id=sticky_id)
return create_session(proxy_url=proxy)
SQLite Storage
DB_PATH = Path("kindle_data.db")
def init_db() -> sqlite3.Connection:
"""Initialize Kindle book tracking database."""
conn = sqlite3.connect(DB_PATH)
conn.executescript("""
CREATE TABLE IF NOT EXISTS books (
asin TEXT PRIMARY KEY,
title TEXT,
author TEXT,
series TEXT,
publisher TEXT,
publication_date TEXT,
kindle_price REAL,
paperback_price REAL,
rating REAL,
review_count INTEGER,
print_length TEXT,
language TEXT,
kindle_unlimited INTEGER DEFAULT 0,
description TEXT,
cover_url TEXT,
categories TEXT,
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS bsr_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asin TEXT NOT NULL,
category TEXT NOT NULL,
rank INTEGER NOT NULL,
captured_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS category_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category_url TEXT NOT NULL,
asin TEXT NOT NULL,
rank INTEGER,
price REAL,
rating REAL,
review_count INTEGER,
is_kindle_unlimited INTEGER DEFAULT 0,
captured_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_bsr_asin ON bsr_snapshots(asin, captured_at);
CREATE INDEX IF NOT EXISTS idx_cat_url ON category_snapshots(category_url, captured_at);
""")
conn.commit()
return conn
def parse_price_to_float(price_str: str | None) -> float | None:
"""Convert price string to float."""
if not price_str:
return None
match = re.search(r"[\d.]+", price_str.replace(",", ""))
if match:
return float(match.group())
return None
def save_book(conn: sqlite3.Connection, book: dict):
"""Save book details to database."""
conn.execute("""
INSERT OR REPLACE INTO books
(asin, title, author, series, publisher, publication_date,
kindle_price, paperback_price, rating, review_count,
print_length, language, kindle_unlimited, description, cover_url, categories)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
book.get("asin"), book.get("title"), book.get("author"),
book.get("series"), book.get("publisher"), book.get("publication_date"),
parse_price_to_float(book.get("kindle_price")),
parse_price_to_float(book.get("paperback_price")),
book.get("rating"), book.get("review_count"),
book.get("print_length"), book.get("language"),
1 if book.get("kindle_unlimited") else 0,
book.get("description"), book.get("cover_url"),
json.dumps(book.get("categories", [])),
))
# Save BSR snapshots
for rank_data in book.get("bsr", []):
conn.execute(
"INSERT INTO bsr_snapshots (asin, category, rank) VALUES (?, ?, ?)",
(book.get("asin"), rank_data["category"], rank_data["rank"]),
)
conn.commit()
Trend Analysis
import json
import csv
def find_trending_books(
conn: sqlite3.Connection,
category_url: str,
days: int = 7,
min_improvement: int = 10,
) -> list[dict]:
"""Find books whose rank improved most over the past N days."""
rows = conn.execute("""
WITH dated AS (
SELECT asin, rank, captured_at,
ROW_NUMBER() OVER (PARTITION BY asin ORDER BY captured_at ASC) as rn_first,
ROW_NUMBER() OVER (PARTITION BY asin ORDER BY captured_at DESC) as rn_last
FROM category_snapshots
WHERE category_url = ?
AND captured_at > datetime('now', ?)
)
SELECT a.asin, b.rank as title,
a.rank as start_rank, b.rank as end_rank,
a.rank - b.rank as improvement
FROM dated a
JOIN dated b ON a.asin = b.asin
WHERE a.rn_first = 1 AND b.rn_last = 1
AND a.rank > b.rank
AND (a.rank - b.rank) >= ?
ORDER BY improvement DESC
LIMIT 20
""", (category_url, f"-{days} days", min_improvement)).fetchall()
# Enrich with title from books table
results = []
for asin, _, start_rank, end_rank, improvement in rows:
book = conn.execute("SELECT title, author FROM books WHERE asin = ?", (asin,)).fetchone()
results.append({
"asin": asin,
"title": book[0] if book else None,
"author": book[1] if book else None,
"start_rank": start_rank,
"current_rank": end_rank,
"improvement": improvement,
})
return results
def analyze_niche(
conn: sqlite3.Connection,
category_url: str,
) -> dict:
"""Analyze a Kindle niche category."""
latest = conn.execute("""
SELECT cs.asin, cs.rank, cs.price, cs.rating, cs.review_count, cs.is_kindle_unlimited,
b.title, b.author
FROM category_snapshots cs
LEFT JOIN books b ON cs.asin = b.asin
WHERE cs.category_url = ?
AND cs.captured_at = (
SELECT MAX(captured_at) FROM category_snapshots WHERE category_url = ?
)
ORDER BY cs.rank ASC
""", (category_url, category_url)).fetchall()
if not latest:
return {}
prices = [r[2] for r in latest if r[2] and r[2] > 0]
ratings = [r[3] for r in latest if r[3]]
reviews = [r[4] for r in latest if r[4]]
ku_count = sum(1 for r in latest if r[5])
return {
"category": category_url.split("/")[-2],
"total_books": len(latest),
"price_avg": sum(prices) / len(prices) if prices else 0,
"price_min": min(prices) if prices else 0,
"price_max": max(prices) if prices else 0,
"rating_avg": sum(ratings) / len(ratings) if ratings else 0,
"reviews_avg": sum(reviews) / len(reviews) if reviews else 0,
"reviews_median": sorted(reviews)[len(reviews) // 2] if reviews else 0,
"kindle_unlimited_pct": ku_count / len(latest) * 100 if latest else 0,
"top_10": [
{"rank": r[1], "title": r[6], "author": r[7], "price": r[2]}
for r in latest[:10]
],
}
def export_category_to_csv(
conn: sqlite3.Connection,
category_url: str,
output_path: str = "kindle_category.csv",
):
"""Export latest category snapshot to CSV."""
rows = conn.execute("""
SELECT cs.asin, cs.rank, b.title, b.author, cs.price,
cs.rating, cs.review_count, cs.is_kindle_unlimited,
b.publisher, b.publication_date, b.series
FROM category_snapshots cs
LEFT JOIN books b ON cs.asin = b.asin
WHERE cs.category_url = ?
AND cs.captured_at = (
SELECT MAX(captured_at) FROM category_snapshots WHERE category_url = ?
)
ORDER BY cs.rank ASC
""", (category_url, category_url)).fetchall()
with open(output_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"rank", "asin", "title", "author", "price", "rating",
"review_count", "kindle_unlimited", "publisher", "pub_date", "series"
])
for r in rows:
writer.writerow([r[1], r[0]] + list(r[2:]))
print(f"Exported {len(rows)} books to {output_path}")
Full Pipeline
KINDLE_CATEGORIES = [
"https://www.amazon.com/Best-Sellers-Kindle-Store/zgbs/digital-text/",
"https://www.amazon.com/Best-Sellers-Kindle-eBooks-Mystery/zgbs/digital-text/18580176011/",
"https://www.amazon.com/Best-Sellers-Kindle-eBooks-Romance/zgbs/digital-text/6057053011/",
]
def run_kindle_pipeline(
proxy_url: str = None,
scrape_details: bool = True,
):
"""Full Kindle bestseller tracking pipeline."""
conn = init_db()
session = create_session(proxy_url)
for cat_url in KINDLE_CATEGORIES:
cat_name = cat_url.split("/")[-2][:30]
print(f"\n--- Category: {cat_name} ---")
# Scrape bestseller list
books = scrape_kindle_bestsellers(cat_url, session)
print(f"Found {len(books)} books")
# Save to category snapshots
for book in books:
price = parse_price_to_float(book.get("price"))
conn.execute("""
INSERT INTO category_snapshots
(category_url, asin, rank, price, rating, review_count, is_kindle_unlimited)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
cat_url, book["asin"], book.get("rank"),
price, book.get("rating"), book.get("review_count"),
1 if book.get("is_kindle_unlimited") else 0,
))
conn.commit()
# Optionally fetch full details for each book
if scrape_details:
detail_session = make_kindle_scraper(sticky_id=f"details_{cat_name}")
for i, book in enumerate(books[:20]): # Top 20 only to keep it manageable
if not book.get("asin"):
continue
print(f" [{i+1}/20] Getting details for {book['asin']}")
detail = scrape_kindle_book_details(book["asin"], detail_session)
save_book(conn, {**book, **detail})
time.sleep(random.uniform(6, 12))
time.sleep(random.uniform(10, 20))
# Print niche analysis
print("\n=== Niche Analysis ===")
for cat_url in KINDLE_CATEGORIES:
analysis = analyze_niche(conn, cat_url)
if analysis:
print(f"\n{analysis['category']}:")
print(f" Books tracked: {analysis['total_books']}")
print(f" Avg price: ${analysis['price_avg']:.2f} (range ${analysis['price_min']:.2f}-${analysis['price_max']:.2f})")
print(f" Avg rating: {analysis['rating_avg']:.1f}")
print(f" Kindle Unlimited: {analysis['kindle_unlimited_pct']:.0f}%")
conn.close()
# Run it
run_kindle_pipeline(proxy_url="http://user:[email protected]:9000")
Practical Considerations
Respect robots.txt. Amazon's robots.txt disallows many paths. Bestseller pages (/zgbs/) are allowed, but product search results are restricted. Know the difference.
Combine API and scraping. Use PA-API for lookups where you know the ASIN, and scraping for discovery (bestseller lists, category browsing). This reduces your scraping footprint.
Rotate everything. Rotate user agents, vary request timing, mix product detail requests with category browsing. With ThorData's residential proxy pool, IP rotation is automatic.
Handle Kindle Unlimited. Many books show "$0.00" because they're in Kindle Unlimited. The actual purchase price is separate. Track both kindle_price and is_kindle_unlimited for accurate pricing data.
Watch for A/B testing. Amazon constantly A/B tests page layouts. Build parsers with multiple fallback selectors and log parse failures so you know when to update them.
Republishing frequency. Kindle bestseller lists update hourly. For serious trend tracking, run the pipeline every 2-4 hours and store the full history. A daily snapshot is the minimum useful granularity.