← Back to blog

Scrape Amazon Kindle Books: E-Book Metadata & Bestseller Rankings (2026)

Scrape Amazon Kindle Books: E-Book Metadata & Bestseller Rankings (2026)

Amazon's Kindle Store is the largest e-book marketplace. If you're building a book recommendation engine, tracking publishing trends, or doing market research on self-publishing niches, you need access to its data — titles, prices, BSR rankings, reviews, and category placements.

There are two paths: the official Product Advertising API (PA-API 5.0) and direct web scraping. In practice, you'll probably use both — PA-API for ASIN lookups you know, scraping for discovery.

What Data Is Available

From Kindle product pages and category listings, you can collect:

The Official Route: Product Advertising API 5.0

Amazon's PA-API gives you structured access to product data. The catch: you need an Amazon Associates account with qualifying sales in the last 30 days (or the account goes inactive), and rate limits are strict.

import hashlib
import hmac
import json
import time
from datetime import datetime, timezone
from curl_cffi import requests as curl_requests

class AmazonPAAPI:
    """Amazon Product Advertising API 5.0 client."""

    HOST = "webservices.amazon.com"
    REGION = "us-east-1"
    SERVICE = "ProductAdvertisingAPI"
    PATH = "/paapi5/searchitems"

    def __init__(self, access_key: str, secret_key: str, partner_tag: str):
        self.access_key = access_key
        self.secret_key = secret_key
        self.partner_tag = partner_tag

    def _sign(self, payload: str, target: str) -> dict:
        """Generate AWS Signature Version 4 headers."""
        now = datetime.now(timezone.utc)
        datestamp = now.strftime("%Y%m%d")
        amz_date = now.strftime("%Y%m%dT%H%M%SZ")

        headers = {
            "content-type": "application/json; charset=utf-8",
            "host": self.HOST,
            "x-amz-date": amz_date,
            "x-amz-target": f"com.amazon.paapi5.v1.ProductAdvertisingAPIv1.{target}",
            "content-encoding": "amz-1.0",
        }

        signed_headers_str = ";".join(sorted(headers.keys()))
        canonical_headers = "\n".join(f"{k}:{v}" for k, v in sorted(headers.items())) + "\n"
        payload_hash = hashlib.sha256(payload.encode()).hexdigest()
        canonical_request = f"POST\n{self.PATH}\n\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"

        credential_scope = f"{datestamp}/{self.REGION}/{self.SERVICE}/aws4_request"
        string_to_sign = (
            f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n"
            f"{hashlib.sha256(canonical_request.encode()).hexdigest()}"
        )

        def _hmac256(key, msg):
            return hmac.new(key if isinstance(key, bytes) else key.encode(), msg.encode(), hashlib.sha256).digest()

        signing_key = _hmac256(
            _hmac256(_hmac256(_hmac256(f"AWS4{self.secret_key}", datestamp), self.REGION), self.SERVICE),
            "aws4_request"
        )
        signature = hmac.new(signing_key, string_to_sign.encode(), hashlib.sha256).hexdigest()

        headers["authorization"] = (
            f"AWS4-HMAC-SHA256 Credential={self.access_key}/{credential_scope}, "
            f"SignedHeaders={signed_headers_str}, Signature={signature}"
        )
        return headers

    def search_kindle_books(
        self,
        keywords: str,
        browse_node_id: str = None,
        page: int = 1,
        sort_by: str = "Relevance",
    ) -> dict:
        """
        Search Kindle Store.

        Args:
            keywords: Search query
            browse_node_id: Kindle category browse node (see below)
            page: Result page (1-10)
            sort_by: "Relevance", "Featured", "NewestArrivals", "Price:HighToLow", "Price:LowToHigh"

        Common Kindle browse node IDs:
            154606011 = Kindle Store (top level)
            1286228011 = Kindle eBooks
            157028011 = Mystery, Thriller & Suspense
            17659 = Science Fiction & Fantasy
            10129 = Business & Investing
            6013 = Biographies & Memoirs
            2590149011 = Romance
        """
        req_body = {
            "Keywords": keywords,
            "SearchIndex": "KindleStore",
            "ItemPage": page,
            "SortBy": sort_by,
            "PartnerTag": self.partner_tag,
            "PartnerType": "Associates",
            "Marketplace": "www.amazon.com",
            "Resources": [
                "ItemInfo.Title",
                "ItemInfo.ByLineInfo",
                "ItemInfo.ContentInfo",
                "ItemInfo.ProductInfo",
                "ItemInfo.Classifications",
                "Offers.Listings.Price",
                "Offers.Listings.Availability.Type",
                "BrowseNodeInfo.BrowseNodes",
                "BrowseNodeInfo.BrowseNodes.Ancestor",
                "SearchRefinements",
            ],
        }

        if browse_node_id:
            req_body["BrowseNodeId"] = browse_node_id

        payload = json.dumps(req_body)
        headers = self._sign(payload, "SearchItems")

        resp = curl_requests.post(
            f"https://{self.HOST}{self.PATH}",
            content=payload,
            headers=headers,
            impersonate="chrome124",
            timeout=15,
        )
        return resp.json()

    def get_items(self, asins: list[str]) -> dict:
        """Look up specific items by ASIN."""
        payload = json.dumps({
            "ItemIds": asins[:10],  # Max 10 per request
            "PartnerTag": self.partner_tag,
            "PartnerType": "Associates",
            "Marketplace": "www.amazon.com",
            "Resources": [
                "ItemInfo.Title",
                "ItemInfo.ByLineInfo",
                "ItemInfo.ContentInfo",
                "ItemInfo.ProductInfo",
                "ItemInfo.TechnicalInfo",
                "Offers.Listings.Price",
                "BrowseNodeInfo.BrowseNodes",
            ],
        })
        headers = self._sign(payload, "GetItems")
        resp = curl_requests.post(
            f"https://{self.HOST}/paapi5/getitems",
            content=payload,
            headers=headers,
            impersonate="chrome124",
            timeout=15,
        )
        return resp.json()

Direct Scraping: Kindle Bestseller Lists

For category discovery and trend tracking without a PA-API account, scrape the bestseller pages directly:

import re
import sqlite3
import random
import time
from datetime import datetime
from bs4 import BeautifulSoup
from curl_cffi import requests as curl_requests
from pathlib import Path

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
    "Accept-Encoding": "gzip, deflate, br",
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "none",
    "Sec-Fetch-User": "?1",
}

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
]

def create_session(proxy_url: str = None) -> curl_requests.Session:
    """Create a curl_cffi session with browser TLS impersonation."""
    session = curl_requests.Session()
    if proxy_url:
        session.proxies = {"https": proxy_url}
    return session

def fetch_kindle_page(
    url: str,
    session: curl_requests.Session,
    retries: int = 3,
) -> str | None:
    """Fetch a Kindle page with TLS impersonation and CAPTCHA detection."""
    headers = {**HEADERS, "User-Agent": random.choice(USER_AGENTS)}

    for attempt in range(retries):
        try:
            resp = session.get(
                url,
                impersonate="chrome124",
                headers=headers,
                timeout=20,
                allow_redirects=True,
            )

            if resp.status_code != 200:
                time.sleep(random.uniform(3, 7))
                continue

            # Detect CAPTCHA
            if any(x in resp.text for x in ["Robot Check", "Type the characters", "Sorry!"]):
                print(f"  CAPTCHA on attempt {attempt+1}, waiting...")
                time.sleep(random.uniform(15, 30))
                continue

            return resp.text

        except Exception as e:
            print(f"  Fetch error attempt {attempt+1}: {e}")
            time.sleep(random.uniform(2, 5))

    return None

def scrape_kindle_bestsellers(
    category_url: str,
    session: curl_requests.Session,
) -> list[dict]:
    """
    Scrape a Kindle bestseller list page.

    Common Kindle category URLs:
    - https://www.amazon.com/Best-Sellers-Kindle-Store/zgbs/digital-text/
    - https://www.amazon.com/Best-Sellers-Kindle-eBooks-Mystery/zgbs/digital-text/18580176011/
    - https://www.amazon.com/Best-Sellers-Kindle-Store-Romance/zgbs/digital-text/6057053011/
    """
    books = []

    for page_num in range(1, 3):  # Each page has 50 items, 2 pages = 100
        url = category_url if page_num == 1 else f"{category_url.rstrip('/')}?pg={page_num}"

        html = fetch_kindle_page(url, session)
        if not html:
            break

        soup = BeautifulSoup(html, "lxml")
        items = soup.select("#gridItemRoot")

        if not items:
            items = soup.select("div[data-asin]")

        page_books = 0
        for item in items:
            book = parse_kindle_card(item)
            if book and book.get("asin"):
                books.append(book)
                page_books += 1

        print(f"  Page {page_num}: {page_books} books")
        time.sleep(random.uniform(4, 8))

    return books

def parse_kindle_card(item) -> dict:
    """Parse a Kindle bestseller list item."""
    asin = item.get("data-asin", "").strip()
    if not asin:
        return {}

    # Rank
    rank_el = item.select_one("span.zg-bdg-text, .zg-badge-wrapper span")
    rank_text = rank_el.get_text(strip=True) if rank_el else ""
    rank_match = re.search(r"(\d+)", rank_text.replace(",", ""))
    rank = int(rank_match.group(1)) if rank_match else None

    # Title
    title_el = item.select_one(
        "._cDEzb_p13n-sc-css-line-clamp-1, "
        "._cDEzb_p13n-sc-css-line-clamp-2, "
        ".p13n-sc-truncated, .a-link-normal > span"
    )
    title = title_el.get_text(strip=True) if title_el else None

    # Author
    author_el = item.select_one(".a-size-small .a-link-child, span.a-color-secondary")
    author = author_el.get_text(strip=True) if author_el else None

    # Price
    price_el = item.select_one("._cDEzb_p13n-sc-price, .a-color-price, .a-price .a-offscreen")
    price = price_el.get_text(strip=True) if price_el else None

    # Rating
    rating_el = item.select_one(".a-icon-alt")
    rating = None
    if rating_el:
        r_text = rating_el.get_text(strip=True)
        r_match = re.search(r"([\d.]+)", r_text)
        rating = float(r_match.group(1)) if r_match else None

    # Review count
    reviews_el = item.select_one("span.a-size-small:last-child, .a-size-small a")
    review_count = None
    if reviews_el:
        r_text = reviews_el.get_text(strip=True).replace(",", "")
        r_match = re.search(r"(\d+)", r_text)
        review_count = int(r_match.group(1)) if r_match else None

    # Kindle Unlimited indicator
    ku_el = item.select_one(".a-badge-label")
    is_kindle_unlimited = ku_el and "Kindle Unlimited" in ku_el.get_text()

    # Cover image
    img_el = item.select_one("img")
    cover_url = img_el.get("src") if img_el else None

    return {
        "asin": asin,
        "rank": rank,
        "title": title,
        "author": author,
        "price": price,
        "rating": rating,
        "review_count": review_count,
        "is_kindle_unlimited": bool(is_kindle_unlimited),
        "cover_url": cover_url,
    }

Scraping Detailed Book Pages

Once you have ASINs, fetch full metadata from individual product pages:

def scrape_kindle_book_details(
    asin: str,
    session: curl_requests.Session,
) -> dict:
    """Fetch detailed metadata for a Kindle book by ASIN."""
    url = f"https://www.amazon.com/dp/{asin}"
    html = fetch_kindle_page(url, session)
    if not html:
        return {"asin": asin, "error": "fetch failed"}

    soup = BeautifulSoup(html, "lxml")
    book = {"asin": asin}

    # Title
    title_el = soup.select_one("#productTitle")
    book["title"] = title_el.get_text(strip=True) if title_el else None

    # Author
    author_el = soup.select_one(".author .a-link-normal, #bylineInfo .a-link-normal")
    book["author"] = author_el.get_text(strip=True) if author_el else None

    # Kindle price
    kindle_price_el = soup.select_one(
        "#kindle-price, #tp-price-block-kindle, "
        "[data-action='show-all-offers-display'] .a-color-price, "
        ".kindle-price .a-color-price"
    )
    book["kindle_price"] = kindle_price_el.get_text(strip=True) if kindle_price_el else None

    # Paperback price (for comparison)
    paperback_el = soup.select_one("#paperback_price, #price, .print-list-price")
    book["paperback_price"] = paperback_el.get_text(strip=True) if paperback_el else None

    # Description
    desc_el = soup.select_one("#bookDescription_feature_div", "#productDescription")
    book["description"] = desc_el.get_text(strip=True)[:3000] if desc_el else None

    # Rating
    rating_el = soup.select_one("#acrPopover")
    if rating_el:
        r_text = rating_el.get("title", "")
        r_match = re.search(r"([\d.]+)", r_text)
        book["rating"] = float(r_match.group(1)) if r_match else None

    # Review count
    reviews_el = soup.select_one("#acrCustomerReviewText")
    if reviews_el:
        r_text = reviews_el.get_text(strip=True).replace(",", "")
        r_match = re.search(r"(\d+)", r_text)
        book["review_count"] = int(r_match.group(1)) if r_match else None

    # Product details table
    details = {}
    for li in soup.select("#detailBullets_feature_div li, #productDetails_techSpec_section_1 tr"):
        text = li.get_text(separator=": ", strip=True)
        if ":" in text:
            key, _, val = text.partition(":")
            details[key.strip()] = val.strip()

    book["print_length"] = details.get("Print length", details.get("Print Length"))
    book["language"] = details.get("Language")
    book["publisher"] = details.get("Publisher")
    book["publication_date"] = details.get("Publication date", details.get("Publication Date"))
    book["word_wise"] = details.get("Word Wise")
    book["enhanced_typesetting"] = details.get("Enhanced typesetting")
    book["x_ray"] = details.get("X-Ray")

    # Best Sellers Rank
    bsr_ranks = []
    for li in soup.select("#detailBullets_feature_div li, #SalesRank"):
        li_text = li.get_text()
        if "Best Sellers Rank" in li_text or "Amazon Best Sellers Rank" in li_text:
            matches = re.findall(r"#([\d,]+)\s+in\s+([^(#\n]+)", li_text)
            for rank_str, category in matches:
                bsr_ranks.append({
                    "rank": int(rank_str.replace(",", "")),
                    "category": category.strip().rstrip(";").strip(),
                })
    book["bsr"] = bsr_ranks

    # Categories / breadcrumbs
    breadcrumbs = soup.select("#wayfinding-breadcrumbs_feature_div a, .a-breadcrumb a")
    book["categories"] = [b.get_text(strip=True) for b in breadcrumbs if b.get_text(strip=True)]

    # Series info
    series_el = soup.select_one("[id*='series'], .series-childAsin-item")
    book["series"] = series_el.get_text(strip=True) if series_el else None

    # Kindle Unlimited
    ku_el = soup.select_one("[data-feature-name='lendingEligibility'], .kindle-unlimited")
    book["kindle_unlimited"] = bool(ku_el)

    # Cover image
    img_el = soup.select_one("#imgBlkFront, #landingImage")
    book["cover_url"] = img_el.get("src") if img_el else None

    return book

Proxy Setup for Amazon

Amazon aggressively blocks datacenter IPs. Use ThorData's residential proxy pool for reliable access:

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000

def get_proxy(sticky: bool = False, session_id: str = None) -> str:
    user = THORDATA_USER
    if sticky and session_id:
        user += f"_session-{session_id}"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"

def make_kindle_scraper(sticky_id: str = None) -> curl_requests.Session:
    """Create a scraper session with optional sticky IP."""
    proxy = get_proxy(sticky=bool(sticky_id), session_id=sticky_id)
    return create_session(proxy_url=proxy)

SQLite Storage

DB_PATH = Path("kindle_data.db")

def init_db() -> sqlite3.Connection:
    """Initialize Kindle book tracking database."""
    conn = sqlite3.connect(DB_PATH)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS books (
            asin TEXT PRIMARY KEY,
            title TEXT,
            author TEXT,
            series TEXT,
            publisher TEXT,
            publication_date TEXT,
            kindle_price REAL,
            paperback_price REAL,
            rating REAL,
            review_count INTEGER,
            print_length TEXT,
            language TEXT,
            kindle_unlimited INTEGER DEFAULT 0,
            description TEXT,
            cover_url TEXT,
            categories TEXT,
            updated_at TEXT DEFAULT (datetime('now'))
        );

        CREATE TABLE IF NOT EXISTS bsr_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            asin TEXT NOT NULL,
            category TEXT NOT NULL,
            rank INTEGER NOT NULL,
            captured_at TEXT DEFAULT (datetime('now'))
        );

        CREATE TABLE IF NOT EXISTS category_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            category_url TEXT NOT NULL,
            asin TEXT NOT NULL,
            rank INTEGER,
            price REAL,
            rating REAL,
            review_count INTEGER,
            is_kindle_unlimited INTEGER DEFAULT 0,
            captured_at TEXT DEFAULT (datetime('now'))
        );

        CREATE INDEX IF NOT EXISTS idx_bsr_asin ON bsr_snapshots(asin, captured_at);
        CREATE INDEX IF NOT EXISTS idx_cat_url ON category_snapshots(category_url, captured_at);
    """)
    conn.commit()
    return conn

def parse_price_to_float(price_str: str | None) -> float | None:
    """Convert price string to float."""
    if not price_str:
        return None
    match = re.search(r"[\d.]+", price_str.replace(",", ""))
    if match:
        return float(match.group())
    return None

def save_book(conn: sqlite3.Connection, book: dict):
    """Save book details to database."""
    conn.execute("""
        INSERT OR REPLACE INTO books
        (asin, title, author, series, publisher, publication_date,
         kindle_price, paperback_price, rating, review_count,
         print_length, language, kindle_unlimited, description, cover_url, categories)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        book.get("asin"), book.get("title"), book.get("author"),
        book.get("series"), book.get("publisher"), book.get("publication_date"),
        parse_price_to_float(book.get("kindle_price")),
        parse_price_to_float(book.get("paperback_price")),
        book.get("rating"), book.get("review_count"),
        book.get("print_length"), book.get("language"),
        1 if book.get("kindle_unlimited") else 0,
        book.get("description"), book.get("cover_url"),
        json.dumps(book.get("categories", [])),
    ))

    # Save BSR snapshots
    for rank_data in book.get("bsr", []):
        conn.execute(
            "INSERT INTO bsr_snapshots (asin, category, rank) VALUES (?, ?, ?)",
            (book.get("asin"), rank_data["category"], rank_data["rank"]),
        )

    conn.commit()

Trend Analysis

import json
import csv

def find_trending_books(
    conn: sqlite3.Connection,
    category_url: str,
    days: int = 7,
    min_improvement: int = 10,
) -> list[dict]:
    """Find books whose rank improved most over the past N days."""
    rows = conn.execute("""
        WITH dated AS (
            SELECT asin, rank, captured_at,
                   ROW_NUMBER() OVER (PARTITION BY asin ORDER BY captured_at ASC) as rn_first,
                   ROW_NUMBER() OVER (PARTITION BY asin ORDER BY captured_at DESC) as rn_last
            FROM category_snapshots
            WHERE category_url = ?
              AND captured_at > datetime('now', ?)
        )
        SELECT a.asin, b.rank as title,
               a.rank as start_rank, b.rank as end_rank,
               a.rank - b.rank as improvement
        FROM dated a
        JOIN dated b ON a.asin = b.asin
        WHERE a.rn_first = 1 AND b.rn_last = 1
          AND a.rank > b.rank
          AND (a.rank - b.rank) >= ?
        ORDER BY improvement DESC
        LIMIT 20
    """, (category_url, f"-{days} days", min_improvement)).fetchall()

    # Enrich with title from books table
    results = []
    for asin, _, start_rank, end_rank, improvement in rows:
        book = conn.execute("SELECT title, author FROM books WHERE asin = ?", (asin,)).fetchone()
        results.append({
            "asin": asin,
            "title": book[0] if book else None,
            "author": book[1] if book else None,
            "start_rank": start_rank,
            "current_rank": end_rank,
            "improvement": improvement,
        })

    return results

def analyze_niche(
    conn: sqlite3.Connection,
    category_url: str,
) -> dict:
    """Analyze a Kindle niche category."""
    latest = conn.execute("""
        SELECT cs.asin, cs.rank, cs.price, cs.rating, cs.review_count, cs.is_kindle_unlimited,
               b.title, b.author
        FROM category_snapshots cs
        LEFT JOIN books b ON cs.asin = b.asin
        WHERE cs.category_url = ?
          AND cs.captured_at = (
              SELECT MAX(captured_at) FROM category_snapshots WHERE category_url = ?
          )
        ORDER BY cs.rank ASC
    """, (category_url, category_url)).fetchall()

    if not latest:
        return {}

    prices = [r[2] for r in latest if r[2] and r[2] > 0]
    ratings = [r[3] for r in latest if r[3]]
    reviews = [r[4] for r in latest if r[4]]
    ku_count = sum(1 for r in latest if r[5])

    return {
        "category": category_url.split("/")[-2],
        "total_books": len(latest),
        "price_avg": sum(prices) / len(prices) if prices else 0,
        "price_min": min(prices) if prices else 0,
        "price_max": max(prices) if prices else 0,
        "rating_avg": sum(ratings) / len(ratings) if ratings else 0,
        "reviews_avg": sum(reviews) / len(reviews) if reviews else 0,
        "reviews_median": sorted(reviews)[len(reviews) // 2] if reviews else 0,
        "kindle_unlimited_pct": ku_count / len(latest) * 100 if latest else 0,
        "top_10": [
            {"rank": r[1], "title": r[6], "author": r[7], "price": r[2]}
            for r in latest[:10]
        ],
    }

def export_category_to_csv(
    conn: sqlite3.Connection,
    category_url: str,
    output_path: str = "kindle_category.csv",
):
    """Export latest category snapshot to CSV."""
    rows = conn.execute("""
        SELECT cs.asin, cs.rank, b.title, b.author, cs.price,
               cs.rating, cs.review_count, cs.is_kindle_unlimited,
               b.publisher, b.publication_date, b.series
        FROM category_snapshots cs
        LEFT JOIN books b ON cs.asin = b.asin
        WHERE cs.category_url = ?
          AND cs.captured_at = (
              SELECT MAX(captured_at) FROM category_snapshots WHERE category_url = ?
          )
        ORDER BY cs.rank ASC
    """, (category_url, category_url)).fetchall()

    with open(output_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([
            "rank", "asin", "title", "author", "price", "rating",
            "review_count", "kindle_unlimited", "publisher", "pub_date", "series"
        ])
        for r in rows:
            writer.writerow([r[1], r[0]] + list(r[2:]))

    print(f"Exported {len(rows)} books to {output_path}")

Full Pipeline

KINDLE_CATEGORIES = [
    "https://www.amazon.com/Best-Sellers-Kindle-Store/zgbs/digital-text/",
    "https://www.amazon.com/Best-Sellers-Kindle-eBooks-Mystery/zgbs/digital-text/18580176011/",
    "https://www.amazon.com/Best-Sellers-Kindle-eBooks-Romance/zgbs/digital-text/6057053011/",
]

def run_kindle_pipeline(
    proxy_url: str = None,
    scrape_details: bool = True,
):
    """Full Kindle bestseller tracking pipeline."""
    conn = init_db()
    session = create_session(proxy_url)

    for cat_url in KINDLE_CATEGORIES:
        cat_name = cat_url.split("/")[-2][:30]
        print(f"\n--- Category: {cat_name} ---")

        # Scrape bestseller list
        books = scrape_kindle_bestsellers(cat_url, session)
        print(f"Found {len(books)} books")

        # Save to category snapshots
        for book in books:
            price = parse_price_to_float(book.get("price"))
            conn.execute("""
                INSERT INTO category_snapshots
                (category_url, asin, rank, price, rating, review_count, is_kindle_unlimited)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                cat_url, book["asin"], book.get("rank"),
                price, book.get("rating"), book.get("review_count"),
                1 if book.get("is_kindle_unlimited") else 0,
            ))
        conn.commit()

        # Optionally fetch full details for each book
        if scrape_details:
            detail_session = make_kindle_scraper(sticky_id=f"details_{cat_name}")
            for i, book in enumerate(books[:20]):  # Top 20 only to keep it manageable
                if not book.get("asin"):
                    continue

                print(f"  [{i+1}/20] Getting details for {book['asin']}")
                detail = scrape_kindle_book_details(book["asin"], detail_session)
                save_book(conn, {**book, **detail})
                time.sleep(random.uniform(6, 12))

        time.sleep(random.uniform(10, 20))

    # Print niche analysis
    print("\n=== Niche Analysis ===")
    for cat_url in KINDLE_CATEGORIES:
        analysis = analyze_niche(conn, cat_url)
        if analysis:
            print(f"\n{analysis['category']}:")
            print(f"  Books tracked: {analysis['total_books']}")
            print(f"  Avg price: ${analysis['price_avg']:.2f} (range ${analysis['price_min']:.2f}-${analysis['price_max']:.2f})")
            print(f"  Avg rating: {analysis['rating_avg']:.1f}")
            print(f"  Kindle Unlimited: {analysis['kindle_unlimited_pct']:.0f}%")

    conn.close()

# Run it
run_kindle_pipeline(proxy_url="http://user:[email protected]:9000")

Practical Considerations

Respect robots.txt. Amazon's robots.txt disallows many paths. Bestseller pages (/zgbs/) are allowed, but product search results are restricted. Know the difference.

Combine API and scraping. Use PA-API for lookups where you know the ASIN, and scraping for discovery (bestseller lists, category browsing). This reduces your scraping footprint.

Rotate everything. Rotate user agents, vary request timing, mix product detail requests with category browsing. With ThorData's residential proxy pool, IP rotation is automatic.

Handle Kindle Unlimited. Many books show "$0.00" because they're in Kindle Unlimited. The actual purchase price is separate. Track both kindle_price and is_kindle_unlimited for accurate pricing data.

Watch for A/B testing. Amazon constantly A/B tests page layouts. Build parsers with multiple fallback selectors and log parse failures so you know when to update them.

Republishing frequency. Kindle bestseller lists update hourly. For serious trend tracking, run the pipeline every 2-4 hours and store the full history. A daily snapshot is the minimum useful granularity.