← Back to blog

Scraping Amazon Product Data in 2026: ASIN, Price History, Reviews, and Searches

Scraping Amazon Product Data in 2026: ASIN, Price History, Reviews, and Searches

Amazon is the single hardest major website to scrape in 2026. If you've tried it, you already know -- datacenter IPs get blocked within seconds, CAPTCHAs appear out of nowhere, and even well-crafted requests return 503 "Robot Check" pages. I've spent months figuring out what actually works, so here's the honest breakdown.


Table of Contents

  1. Why Amazon Is So Hard to Scrape
  2. Understanding Amazon's Product Structure
  3. Approach 1: Amazon Product Advertising API (Official)
  4. Approach 2: Keepa API for Price History
  5. Approach 3: Playwright with Residential Proxies
  6. Approach 4: Parsing JSON-LD Structured Data
  7. Approach 5: Review Scraping
  8. Approach 6: Search Results and Category Pages
  9. Approach 7: API Services (Rainforest, Oxylabs)
  10. Anti-Detection: Headers, TLS, and Fingerprints
  11. Proxy Strategy for Amazon
  12. Rate Limiting and Request Scheduling
  13. Storing Amazon Data: Schema Design
  14. Price Monitoring Pipeline
  15. Common Errors and Fixes
  16. Which Approach Should You Use?

1. Why Amazon Is So Hard to Scrape {#why-hard}

Amazon's anti-bot infrastructure is arguably the most sophisticated on the public web. Here's what you're up against:

Instant datacenter IP bans. Send a single request from an AWS, GCP, DigitalOcean, or Linode IP and you'll get a CAPTCHA or 503 before your second request fires. Amazon maintains massive blocklists of every major cloud provider's IP ranges.

TLS fingerprinting. Amazon checks your TLS client hello against known browser fingerprints. Python's requests library and httpx have identifiable TLS fingerprints that Amazon blocks immediately. You need either browser automation or a library like curl-cffi that impersonates a real browser's TLS handshake.

Browser fingerprinting. Amazon checks JavaScript execution patterns, WebGL rendering, canvas hashes, and navigator properties. Headless Chrome with default settings is detected within 1-2 page loads.

Behavioral analysis. Request rate, click patterns, and navigation sequences are all analyzed. Loading product pages directly without a search page visit first is a pattern bots exhibit. Human users browse -- they don't teleport directly to product pages.

Dynamic structure. CSS class names rotate, DOM structure shifts between A/B tests, and pages render differently based on geo, login state, and detected bot score. No CSS selector survives more than a few weeks unchanged.

The bottom line: brute-force scraping Amazon at scale without the right tools is not viable in 2026.


2. Understanding Amazon's Product Structure {#product-structure}

Before writing any code, understand how Amazon organizes products:

def normalize_amazon_url(url_or_asin: str) -> str:
    """Convert any Amazon product URL or ASIN to canonical form."""
    import re
    # Extract ASIN from various URL formats
    asin_patterns = [
        r'/dp/([A-Z0-9]{10})',
        r'/gp/product/([A-Z0-9]{10})',
        r'/product/([A-Z0-9]{10})',
    ]
    for pattern in asin_patterns:
        m = re.search(pattern, url_or_asin)
        if m:
            return f"https://www.amazon.com/dp/{m.group(1)}/"

    # If it's a bare ASIN
    if re.match(r'^[A-Z0-9]{10}$', url_or_asin.upper()):
        return f"https://www.amazon.com/dp/{url_or_asin.upper()}/"

    return url_or_asin

def extract_asin(url: str) -> str | None:
    """Extract ASIN from any Amazon URL."""
    import re
    for pattern in [r'/dp/([A-Z0-9]{10})', r'/gp/product/([A-Z0-9]{10})']:
        m = re.search(pattern, url)
        if m:
            return m.group(1)
    return None

3. Approach 1: Amazon Product Advertising API (Official) {#pa-api}

The cleanest path. You need an Amazon Associates (affiliate) account, which gives you access to the PA-API 5.0. It returns structured JSON with product details, pricing, images, and review summaries.

Requirements: - Amazon Associates account (free to create) - At least 3 qualifying sales within 180 days to maintain access - Access Key ID and Secret Access Key from your Associates dashboard

# Install: pip install paapi5-python-sdk
from paapi5_python_sdk.api.default_api import DefaultApi
from paapi5_python_sdk.models.get_items_request import GetItemsRequest
from paapi5_python_sdk.models.get_items_resource import GetItemsResource
from paapi5_python_sdk.models.search_items_request import SearchItemsRequest
from paapi5_python_sdk.models.search_items_resource import SearchItemsResource
from paapi5_python_sdk.rest import ApiException

ACCESS_KEY = "your_access_key"
SECRET_KEY = "your_secret_key"
PARTNER_TAG = "yourtag-20"
REGION = "us-east-1"
HOST = "webservices.amazon.com"

api = DefaultApi(
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    host=HOST,
    region=REGION
)

def get_products_by_asin(asins: list[str]) -> list[dict]:
    """Fetch product details for up to 10 ASINs at once."""
    request = GetItemsRequest(
        partner_tag=PARTNER_TAG,
        partner_type="Associates",
        item_ids=asins[:10],  # Max 10 per request
        resources=[
            GetItemsResource.ITEMINFO_TITLE,
            GetItemsResource.ITEMINFO_FEATURES,
            GetItemsResource.ITEMINFO_PRODUCTINFO,
            GetItemsResource.OFFERS_LISTINGS_PRICE,
            GetItemsResource.OFFERS_LISTINGS_DELIVERYINFO_ISPRIMEELIGIBLE,
            GetItemsResource.OFFERS_SUMMARIES_HIGHESTPRICE,
            GetItemsResource.OFFERS_SUMMARIES_LOWESTPRICE,
            GetItemsResource.IMAGES_PRIMARY_LARGE,
            GetItemsResource.CUSTOMERRATINGS,
            GetItemsResource.BROWSENODEINFO_BROWSENODES,
        ]
    )

    try:
        response = api.get_items(request)
        if not response.items_result:
            return []

        products = []
        for item in response.items_result.items:
            price = None
            if item.offers and item.offers.listings:
                listing = item.offers.listings[0]
                price = listing.price.display_amount if listing.price else None

            products.append({
                "asin": item.asin,
                "title": item.item_info.title.display_value if item.item_info.title else None,
                "price": price,
                "rating": item.customer_ratings.star_rating.value if item.customer_ratings else None,
                "ratings_count": item.customer_ratings.count.value if item.customer_ratings else None,
                "url": item.detail_page_url,
                "image": (item.images.primary.large.url
                         if item.images and item.images.primary else None),
                "is_prime": (item.offers.listings[0].delivery_info.is_prime_eligible
                            if item.offers and item.offers.listings else None),
                "features": [f.display_value for f in
                            (item.item_info.features.display_values or [])
                            if item.item_info.features] if item.item_info else [],
            })

        return products

    except ApiException as e:
        print(f"PA-API error: {e}")
        return []

def search_products_pa_api(keywords: str, category: str = None,
                            min_price: float = None,
                            max_price: float = None) -> list[dict]:
    """Search Amazon products via PA-API."""
    request = SearchItemsRequest(
        partner_tag=PARTNER_TAG,
        partner_type="Associates",
        keywords=keywords,
        search_index=category or "All",
        min_price=int(min_price * 100) if min_price else None,
        max_price=int(max_price * 100) if max_price else None,
        resources=[
            SearchItemsResource.ITEMINFO_TITLE,
            SearchItemsResource.OFFERS_LISTINGS_PRICE,
            SearchItemsResource.CUSTOMERRATINGS,
            SearchItemsResource.IMAGES_PRIMARY_MEDIUM,
        ]
    )

    try:
        response = api.search_items(request)
        if not response.search_result:
            return []
        return [{"asin": item.asin, "title": item.item_info.title.display_value}
                for item in response.search_result.items]
    except ApiException as e:
        print(f"Search error: {e}")
        return []

Limits: 1 request per second, up to 8,700 requests per day (scales with affiliate revenue). Max 10 ASINs per GetItems call. The API doesn't return full review text -- just aggregate ratings.


4. Approach 2: Keepa API for Price History {#keepa}

For historical pricing data, Keepa is the answer. They've been tracking Amazon prices since 2011 and their API is the most reliable source for price history, sales rank trends, and deal detection.

import requests

KEEPA_API_KEY = "your_keepa_key"
KEEPA_BASE = "https://api.keepa.com"

def get_keepa_product(asin: str, domain: int = 1,
                       days: int = 90) -> dict:
    """Get product data including price history from Keepa."""
    params = {
        "key": KEEPA_API_KEY,
        "domain": domain,  # 1=amazon.com, 3=amazon.co.uk, 4=amazon.de
        "asin": asin,
        "history": 1,
        "days": days,
        "stats": 1,
        "buybox": 1,
        "offers": 20,
    }
    resp = requests.get(f"{KEEPA_BASE}/product", params=params, timeout=30)
    resp.raise_for_status()
    data = resp.json()

    if not data.get("products"):
        return {}

    product = data["products"][0]
    return _parse_keepa_product(product)

def _parse_keepa_product(product: dict) -> dict:
    """Parse Keepa product data into a clean structure."""
    # Keepa timestamps are minutes since 2011-01-01 00:00:00 UTC
    KEEPA_EPOCH = 1293840000  # Unix timestamp of 2011-01-01

    def keepa_time_to_unix(keepa_minutes: int) -> int:
        return KEEPA_EPOCH + keepa_minutes * 60

    def parse_price_history(csv_data: list) -> list[dict]:
        """Parse Keepa's [timestamp, price, timestamp, price, ...] format."""
        if not csv_data:
            return []
        history = []
        for i in range(0, len(csv_data) - 1, 2):
            ts = csv_data[i]
            price = csv_data[i + 1]
            if ts > 0 and price > 0:
                history.append({
                    "timestamp": keepa_time_to_unix(ts),
                    "price_cents": price,
                    "price": price / 100,
                })
        return history

    csv = product.get("csv", [])
    # csv[0] = Amazon price, csv[1] = Marketplace new, csv[2] = Marketplace used
    # csv[3] = Sales rank, csv[16] = Buy Box price

    stats = product.get("stats", {})

    return {
        "asin": product.get("asin"),
        "title": product.get("title"),
        "brand": product.get("brand"),
        "model": product.get("model"),
        "sales_rank": product.get("salesRankCurrent"),
        "sales_rank_reference": product.get("salesRankReference"),
        "rating": product.get("rating") / 10 if product.get("rating") else None,
        "review_count": product.get("reviewCount"),
        "amazon_price_current": csv[0][-1] / 100 if csv and csv[0] else None,
        "amazon_price_history": parse_price_history(csv[0]) if csv else [],
        "buybox_price_current": (csv[16][-1] / 100
                                  if csv and len(csv) > 16 and csv[16] else None),
        "price_30d_avg": stats.get("avg30", [None, None])[1],
        "price_90d_avg": stats.get("avg90", [None, None])[1],
        "price_all_time_low": stats.get("atl", [None, None])[1],
        "price_all_time_high": stats.get("ath", [None, None])[1],
        "out_of_stock_percentage_30d": stats.get("outOfStockPercentage30", 0),
        "categories": product.get("categories", []),
        "images": product.get("imagesCSV", "").split(","),
    }

def search_keepa(query: str, domain: int = 1,
                  sort_by: int = 0) -> list[str]:
    """Search for products on Keepa. Returns list of ASINs."""
    params = {
        "key": KEEPA_API_KEY,
        "domain": domain,
        "type": "search",
        "term": query,
        "sortType": sort_by,  # 0=relevance, 1=sales rank, 2=price
    }
    resp = requests.get(f"{KEEPA_BASE}/search", params=params, timeout=15)
    resp.raise_for_status()
    return resp.json().get("asinList", [])

# Usage
product = get_keepa_product("B0DCXZJQ8V")
print(f"Current price: ${product['amazon_price_current']}")
print(f"30-day average: ${product['price_30d_avg'] / 100:.2f}" if product.get('price_30d_avg') else "No history")
print(f"All-time low: ${product['price_all_time_low'] / 100:.2f}" if product.get('price_all_time_low') else "No ATL data")

Cost: Keepa charges per "token" -- roughly 1 token per product with history enabled. Plans start around $15/month for 50 tokens/minute. For tracking known ASINs it's excellent value; cheaper than building your own historical tracking infrastructure by far.


5. Approach 3: Playwright with Residential Proxies {#playwright}

When you need data the APIs don't provide -- full review text, Q&A sections, detailed seller info, search results, or category pages -- you'll need browser automation with residential proxies.

import asyncio
import json
import random
import time
from playwright.async_api import async_playwright

PROXY_HOST = "proxy.thordata.com"
PROXY_PORT = 9000
PROXY_USER = "your_user"
PROXY_PASS = "your_pass"

def get_proxy_config(country: str = "US") -> dict:
    user = f"{PROXY_USER}-country-{country.lower()}"
    return {
        "server": f"http://{PROXY_HOST}:{PROXY_PORT}",
        "username": user,
        "password": PROXY_PASS,
    }

async def create_amazon_context(playwright, country: str = "US"):
    """Create a browser context configured for Amazon."""
    browser = await playwright.chromium.launch(
        headless=True,
        proxy=get_proxy_config(country),
        args=[
            "--disable-blink-features=AutomationControlled",
            "--disable-features=IsolateOrigins,site-per-process",
            "--no-sandbox",
        ]
    )
    context = await browser.new_context(
        viewport={"width": 1366, "height": 768},
        locale="en-US",
        timezone_id="America/New_York",
        user_agent=(
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
        ),
    )
    # Remove webdriver property
    await context.add_init_script("""
        delete Object.getPrototypeOf(navigator).webdriver;
    """)
    return browser, context

async def scrape_amazon_product(page, asin: str) -> dict:
    """Scrape a single Amazon product page."""
    url = f"https://www.amazon.com/dp/{asin}/"

    # Navigate with a realistic referer
    response = await page.goto(
        url,
        wait_until="domcontentloaded",
        timeout=30000,
    )

    title = await page.title()

    if response.status == 503 or "Robot Check" in title:
        return {"asin": asin, "error": "bot_detected_503"}

    if "/ap/signin" in page.url:
        return {"asin": asin, "error": "redirected_to_login"}

    if response.status == 404:
        return {"asin": asin, "error": "not_found"}

    # Method 1: JSON-LD structured data (most stable)
    ld_json = await page.evaluate("""
        () => {
            const scripts = document.querySelectorAll('script[type="application/ld+json"]');
            for (const s of scripts) {
                try {
                    const data = JSON.parse(s.textContent);
                    if (data['@type'] === 'Product') return data;
                    if (Array.isArray(data)) {
                        const product = data.find(d => d['@type'] === 'Product');
                        if (product) return product;
                    }
                } catch (e) {}
            }
            return null;
        }
    """)

    if ld_json:
        offers = ld_json.get("offers", {})
        if isinstance(offers, list):
            offers = offers[0] if offers else {}
        return {
            "asin": asin,
            "name": ld_json.get("name"),
            "description": ld_json.get("description"),
            "brand": (ld_json.get("brand") or {}).get("name"),
            "rating": (ld_json.get("aggregateRating") or {}).get("ratingValue"),
            "review_count": (ld_json.get("aggregateRating") or {}).get("reviewCount"),
            "price": offers.get("price"),
            "currency": offers.get("priceCurrency"),
            "availability": offers.get("availability", "").split("/")[-1],
            "image": ld_json.get("image"),
            "url": ld_json.get("url", url),
            "source": "json_ld",
        }

    # Method 2: Direct DOM parsing (fallback)
    product = {"asin": asin, "source": "dom_parse"}

    title_el = await page.query_selector("#productTitle")
    if title_el:
        product["name"] = (await title_el.text_content()).strip()

    price_el = await page.query_selector(".a-price .a-offscreen")
    if price_el:
        product["price"] = (await price_el.text_content()).strip()

    rating_el = await page.query_selector("i.a-icon-star span.a-icon-alt")
    if rating_el:
        product["rating"] = (await rating_el.text_content()).strip().split()[0]

    return product

async def scrape_amazon_products_batch(asins: list[str],
                                        country: str = "US") -> list[dict]:
    """Scrape multiple Amazon products with randomized delays."""
    results = []
    async with async_playwright() as p:
        browser, context = await create_amazon_context(p, country)
        page = await context.new_page()

        # Warm up: visit Amazon homepage first
        await page.goto("https://www.amazon.com/", wait_until="domcontentloaded")
        await page.wait_for_timeout(random.randint(2000, 4000))

        for i, asin in enumerate(asins):
            data = await scrape_amazon_product(page, asin)
            results.append(data)

            if data.get("error") == "bot_detected_503":
                print(f"[{i+1}] Bot detected on {asin}, backing off...")
                await page.wait_for_timeout(30000)
                # Create fresh context after detection
                await context.close()
                await browser.close()
                browser, context = await create_amazon_context(p, country)
                page = await context.new_page()
                await page.goto("https://www.amazon.com/")
                await page.wait_for_timeout(3000)
            else:
                # Normal delay between products
                delay = random.randint(4000, 9000)
                await page.wait_for_timeout(delay)

        await browser.close()
    return results

# Usage
import asyncio
results = asyncio.run(scrape_amazon_products_batch(
    ["B0DCXZJQ8V", "B0BN93M8SP", "B0D5BP2BNR"],
    country="US"
))

6. Approach 4: Parsing JSON-LD Structured Data {#json-ld}

The JSON-LD <script type="application/ld+json"> block is the most stable data source on Amazon product pages. While CSS classes and DOM structure shift constantly with A/B tests, the structured data block follows schema.org conventions and changes far less frequently.

from curl_cffi import requests as cffi_requests
import json
import re

def scrape_product_json_ld(asin: str, proxy: str = None) -> dict:
    """Scrape Amazon product using curl-cffi to bypass TLS fingerprinting."""
    url = f"https://www.amazon.com/dp/{asin}/"
    headers = {
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "DNT": "1",
        "Upgrade-Insecure-Requests": "1",
        "Sec-Fetch-Site": "none",
        "Sec-Fetch-Mode": "navigate",
    }

    session = cffi_requests.Session()
    kwargs = {
        "headers": headers,
        "impersonate": "chrome131",
        "timeout": 15,
    }
    if proxy:
        kwargs["proxies"] = {"https": proxy}

    resp = session.get(url, **kwargs)

    if resp.status_code != 200:
        return {"asin": asin, "error": f"status_{resp.status_code}"}

    html = resp.text

    if "Robot Check" in html or "api-services-support" in html:
        return {"asin": asin, "error": "bot_detected"}

    # Extract all JSON-LD blocks
    for match in re.finditer(
        r'<script[^>]+type="application/ld\+json"[^>]*>(.*?)</script>',
        html, re.DOTALL
    ):
        try:
            data = json.loads(match.group(1))
            # Handle both single objects and arrays
            candidates = data if isinstance(data, list) else [data]
            for candidate in candidates:
                if candidate.get("@type") == "Product":
                    return _parse_product_schema(candidate, asin)
        except json.JSONDecodeError:
            continue

    return {"asin": asin, "error": "no_product_schema_found"}

def _parse_product_schema(schema: dict, asin: str) -> dict:
    """Parse schema.org Product JSON-LD into clean product dict."""
    offers = schema.get("offers", {})
    if isinstance(offers, list):
        # Take the lowest-priced offer
        try:
            offers = min(offers, key=lambda o: float(o.get("price", 999999)))
        except (ValueError, TypeError):
            offers = offers[0] if offers else {}

    rating = schema.get("aggregateRating", {})

    return {
        "asin": asin,
        "name": schema.get("name"),
        "description": schema.get("description"),
        "brand": (schema.get("brand") or {}).get("name"),
        "sku": schema.get("sku"),
        "gtin13": schema.get("gtin13"),
        "price": offers.get("price"),
        "currency": offers.get("priceCurrency"),
        "availability": offers.get("availability", "").replace(
            "https://schema.org/", ""
        ),
        "condition": offers.get("itemCondition", "").replace(
            "https://schema.org/", ""
        ),
        "rating": rating.get("ratingValue"),
        "review_count": rating.get("reviewCount"),
        "best_rating": rating.get("bestRating"),
        "image": schema.get("image"),
        "url": offers.get("url"),
        "source": "json_ld",
    }

7. Approach 5: Review Scraping {#reviews}

Amazon reviews live at /product-reviews/{ASIN}/ and can be scraped with careful browser automation:

async def scrape_reviews(page, asin: str,
                          max_pages: int = 5) -> list[dict]:
    """Scrape Amazon product reviews."""
    reviews = []
    base_url = f"https://www.amazon.com/product-reviews/{asin}/"

    for page_num in range(1, max_pages + 1):
        url = f"{base_url}?pageNumber={page_num}"
        await page.goto(url, wait_until="domcontentloaded")
        await page.wait_for_timeout(random.randint(2000, 4000))

        title = await page.title()
        if "Robot Check" in title:
            break

        # Extract reviews
        review_items = await page.query_selector_all(
            "div[data-hook='review']"
        )

        if not review_items:
            break

        for item in review_items:
            try:
                title_el = await item.query_selector(
                    "a[data-hook='review-title'] span:not(.a-letter-space)"
                )
                body_el = await item.query_selector(
                    "span[data-hook='review-body'] span"
                )
                rating_el = await item.query_selector(
                    "i[data-hook='review-star-rating'] span.a-icon-alt"
                )
                author_el = await item.query_selector(
                    "span.a-profile-name"
                )
                date_el = await item.query_selector(
                    "span[data-hook='review-date']"
                )
                verified_el = await item.query_selector(
                    "span[data-hook='avp-badge']"
                )
                helpful_el = await item.query_selector(
                    "span[data-hook='helpful-vote-statement']"
                )

                review_title = (await title_el.text_content()).strip() if title_el else ""
                body = (await body_el.text_content()).strip() if body_el else ""
                rating_text = (await rating_el.text_content()).strip() if rating_el else ""
                rating = float(rating_text.split()[0]) if rating_text else None

                reviews.append({
                    "asin": asin,
                    "title": review_title,
                    "body": body,
                    "rating": rating,
                    "author": (await author_el.text_content()).strip() if author_el else "",
                    "date": (await date_el.text_content()).replace("Reviewed in", "").strip() if date_el else "",
                    "verified_purchase": verified_el is not None,
                    "helpful_votes": (await helpful_el.text_content()).strip() if helpful_el else "",
                    "page_num": page_num,
                })
            except Exception:
                continue

        await page.wait_for_timeout(random.randint(3000, 6000))

    return reviews

def analyze_reviews(reviews: list[dict]) -> dict:
    """Compute summary statistics from scraped reviews."""
    if not reviews:
        return {}

    ratings = [r["rating"] for r in reviews if r.get("rating")]
    verified = [r for r in reviews if r.get("verified_purchase")]

    import statistics
    return {
        "total": len(reviews),
        "verified_purchase_count": len(verified),
        "avg_rating": statistics.mean(ratings) if ratings else 0,
        "rating_distribution": {
            str(i): sum(1 for r in ratings if int(r) == i)
            for i in range(1, 6)
        },
        "verified_purchase_rate": len(verified) / len(reviews) if reviews else 0,
    }

async def scrape_search_results(page, query: str,
                                 max_pages: int = 3) -> list[dict]:
    """Scrape Amazon search results pages."""
    products = []

    for page_num in range(1, max_pages + 1):
        url = (f"https://www.amazon.com/s?k={query.replace(' ', '+')}"
               f"&page={page_num}")
        await page.goto(url, wait_until="domcontentloaded")
        await page.wait_for_timeout(random.randint(2000, 4000))

        # Extract products from search result cards
        cards = await page.query_selector_all(
            "div[data-component-type='s-search-result']"
        )

        for card in cards:
            asin = await card.get_attribute("data-asin")
            if not asin:
                continue

            title_el = await card.query_selector("h2 a.a-link-normal span")
            price_el = await card.query_selector("span.a-price .a-offscreen")
            rating_el = await card.query_selector("i.a-icon-star-small span.a-icon-alt")
            review_count_el = await card.query_selector("span.a-size-base.s-underline-text")
            img_el = await card.query_selector("img.s-image")
            badge_el = await card.query_selector("span.a-badge-text")
            prime_el = await card.query_selector("i.aok-relative.s-prime")

            products.append({
                "asin": asin,
                "title": (await title_el.text_content()).strip() if title_el else "",
                "price": (await price_el.text_content()).strip() if price_el else "",
                "rating": (await rating_el.text_content()).strip().split()[0] if rating_el else "",
                "review_count": (await review_count_el.text_content()).strip() if review_count_el else "",
                "image": await img_el.get_attribute("src") if img_el else "",
                "badge": (await badge_el.text_content()).strip() if badge_el else "",
                "is_prime": prime_el is not None,
                "page": page_num,
            })

        await page.wait_for_timeout(random.randint(3000, 6000))

    return products

9. Approach 7: API Services {#api-services}

For Amazon search results and high-volume product data, managed scraping APIs handle the proxy rotation and CAPTCHA solving for you:

import requests

def scrape_via_rainforest(asin: str, api_key: str,
                           amazon_domain: str = "amazon.com") -> dict:
    """Get Amazon product data via Rainforest API."""
    params = {
        "api_key": api_key,
        "type": "product",
        "asin": asin,
        "amazon_domain": amazon_domain,
        "include_summarization_attributes": True,
        "include_a_plus_body": True,
    }
    resp = requests.get("https://api.rainforestapi.com/request",
                        params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("product", {})

def search_via_rainforest(query: str, api_key: str,
                           page: int = 1) -> list[dict]:
    """Search Amazon via Rainforest API."""
    params = {
        "api_key": api_key,
        "type": "search",
        "amazon_domain": "amazon.com",
        "search_term": query,
        "page": page,
    }
    resp = requests.get("https://api.rainforestapi.com/request",
                        params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("search_results", [])

These services cost $1-5 per 1,000 requests. Worth it if you need search result data or product scraping at scale without maintaining your own proxy infrastructure.


10. Anti-Detection: Headers, TLS, and Fingerprints {#anti-detection}

from curl_cffi import requests as cffi_requests
import random

# Rotate between multiple browser profiles
BROWSER_PROFILES = [
    {
        "impersonate": "chrome131",
        "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
              "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
    },
    {
        "impersonate": "chrome130",
        "ua": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
              "(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
    },
    {
        "impersonate": "safari17_0",
        "ua": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/605.1.15 "
              "(KHTML, like Gecko) Version/17.0 Safari/605.1.15",
    },
]

def get_amazon_session(proxy: str = None) -> cffi_requests.Session:
    """Create a curl-cffi session mimicking a real browser."""
    profile = random.choice(BROWSER_PROFILES)
    session = cffi_requests.Session()
    session.impersonate = profile["impersonate"]

    # These headers must be present and realistic
    session.headers = {
        "User-Agent": profile["ua"],
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,"
                  "image/avif,image/webp,image/apng,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "none",
        "Sec-Fetch-User": "?1",
        "DNT": "1",
    }

    if proxy:
        session.proxies = {"https": proxy}

    return session

def warm_up_session(session: cffi_requests.Session):
    """Visit Amazon homepage before scraping product pages."""
    session.get("https://www.amazon.com/", timeout=15)
    time.sleep(random.uniform(2, 5))
    # Optional: visit a category page too
    session.get("https://www.amazon.com/gp/bestsellers/", timeout=15)
    time.sleep(random.uniform(1, 3))

11. Proxy Strategy for Amazon {#proxies}

Amazon maintains blocklists of virtually every major datacenter IP range. Residential proxies are non-negotiable for any direct HTML scraping.

ThorData provides rotating residential proxy pools with US targeting. Their IPs are genuine residential addresses, which is the minimum bar for Amazon not to instantly block you.

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000

def get_amazon_proxy(state: str = None) -> str:
    """Get US residential proxy, optionally targeting a specific state."""
    if state:
        # State-level targeting helps match Amazon's geo-pricing
        user = f"{THORDATA_USER}-country-us-state-{state.lower()}"
    else:
        user = f"{THORDATA_USER}-country-us"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"

def test_proxy_for_amazon(proxy: str) -> bool:
    """Verify proxy works for Amazon and isn't in a blocked range."""
    session = get_amazon_session(proxy)
    try:
        resp = session.get("https://www.amazon.com/dp/B0DCXZJQ8V/", timeout=10)
        return resp.status_code == 200 and "Robot Check" not in resp.text
    except Exception:
        return False

12. Rate Limiting and Request Scheduling {#rate-limits}

import time
import random
import sqlite3
from collections import deque

class AmazonRateLimiter:
    """Token bucket rate limiter for Amazon scraping."""

    def __init__(self, requests_per_minute: int = 10,
                 burst_size: int = 3):
        self.interval = 60.0 / requests_per_minute
        self.burst_size = burst_size
        self.tokens = deque()

    def wait(self):
        now = time.time()

        # Remove tokens older than the rate window
        while self.tokens and now - self.tokens[0] > 60:
            self.tokens.popleft()

        if len(self.tokens) >= self.burst_size:
            # Wait until oldest token expires
            sleep_time = 60 - (now - self.tokens[0]) + random.uniform(1, 3)
            time.sleep(max(sleep_time, self.interval))

        self.tokens.append(time.time())
        # Always add a random delay even within rate limits
        time.sleep(random.uniform(0.5, 2.0))

rate_limiter = AmazonRateLimiter(requests_per_minute=8, burst_size=2)

def scrape_with_scheduling(asins: list[str],
                            output_db: str = "amazon_products.db"):
    """Scrape ASINs with rate limiting and progress tracking."""
    import asyncio
    conn = init_amazon_db(output_db)

    # Check which ASINs are already scraped
    existing = set(row[0] for row in conn.execute("SELECT asin FROM products"))
    pending = [a for a in asins if a not in existing]
    print(f"{len(pending)} ASINs to scrape ({len(existing)} already done)")

    for i, asin in enumerate(pending):
        rate_limiter.wait()
        try:
            product = scrape_product_json_ld(asin)
            if not product.get("error"):
                save_product(conn, product)
                print(f"[{i+1}/{len(pending)}] OK: {asin} - {product.get('name', '')[:50]}")
            else:
                print(f"[{i+1}/{len(pending)}] ERR: {asin} - {product['error']}")
        except Exception as e:
            print(f"[{i+1}/{len(pending)}] FAIL: {asin} - {e}")

13. Storing Amazon Data: Schema Design {#storage}

import sqlite3
import json
import time

def init_amazon_db(db_path: str = "amazon.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")

    conn.execute("""
        CREATE TABLE IF NOT EXISTS products (
            asin TEXT PRIMARY KEY,
            name TEXT,
            brand TEXT,
            description TEXT,
            price REAL,
            currency TEXT,
            availability TEXT,
            rating REAL,
            review_count INTEGER,
            category_id TEXT,
            image_url TEXT,
            url TEXT,
            source TEXT,
            scraped_at REAL
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS price_history (
            asin TEXT,
            price REAL,
            currency TEXT,
            recorded_at REAL,
            source TEXT,
            PRIMARY KEY (asin, recorded_at)
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS reviews (
            asin TEXT,
            review_title TEXT,
            body TEXT,
            rating REAL,
            author TEXT,
            review_date TEXT,
            verified_purchase INTEGER,
            helpful_votes TEXT,
            page_num INTEGER,
            scraped_at REAL
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_price_history_asin ON price_history(asin)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_reviews_asin ON reviews(asin)")

    conn.commit()
    return conn

def save_product(conn: sqlite3.Connection, product: dict):
    now = time.time()
    conn.execute("""
        INSERT OR REPLACE INTO products VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    """, (
        product.get("asin"), product.get("name"), product.get("brand"),
        product.get("description"), product.get("price"), product.get("currency"),
        product.get("availability"), product.get("rating"),
        product.get("review_count"), product.get("category_id"),
        product.get("image"), product.get("url"), product.get("source"), now
    ))
    # Also record in price history
    if product.get("price"):
        conn.execute("""
            INSERT OR IGNORE INTO price_history VALUES (?,?,?,?,?)
        """, (product["asin"], product["price"],
              product.get("currency", "USD"), now, product.get("source", "scrape")))
    conn.commit()

14. Price Monitoring Pipeline {#price-monitoring}

import sqlite3
import time
from datetime import datetime

def build_price_monitor(asins: list[str], db_path: str = "price_monitor.db",
                         check_interval_hours: int = 6):
    """Set up a simple price monitoring pipeline."""
    conn = init_amazon_db(db_path)

    def run_check():
        print(f"\n[{datetime.now():%Y-%m-%d %H:%M}] Price check starting...")
        for asin in asins:
            time.sleep(random.uniform(30, 90))  # Space out checks

            # Try PA-API first (cheaper), fall back to scraping
            products = get_products_by_asin([asin])
            if products:
                product = products[0]
                product["asin"] = asin
                save_product(conn, product)
                print(f"  {asin}: ${product.get('price', 'N/A')}")
            else:
                print(f"  {asin}: PA-API miss, skipping (use scraping for fallback)")

        check_price_alerts(conn, asins)

    def check_price_alerts(conn, asins: list[str]):
        """Check if any prices have dropped significantly."""
        for asin in asins:
            rows = conn.execute("""
                SELECT price, recorded_at FROM price_history
                WHERE asin = ?
                ORDER BY recorded_at DESC
                LIMIT 10
            """, (asin,)).fetchall()

            if len(rows) < 2:
                continue

            current = rows[0][0]
            historical_avg = sum(r[0] for r in rows[1:]) / len(rows[1:])

            if current and historical_avg and current < historical_avg * 0.85:
                print(f"  PRICE DROP ALERT: {asin} is ${current:.2f} "
                      f"(avg was ${historical_avg:.2f}, "
                      f"{(1 - current/historical_avg)*100:.0f}% drop)")

    return run_check

# Build and run
monitor = build_price_monitor(
    asins=["B0DCXZJQ8V", "B0BN93M8SP"],
    check_interval_hours=6
)
monitor()  # Run one check

15. Common Errors and Fixes {#errors}

Error Cause Fix
503 "Robot Check" page Bot detection triggered Rotate residential IP, increase delays, check TLS fingerprint
CAPTCHA (image challenge) Suspicious request pattern Switch to residential proxy, reduce rate
Redirect to /ap/signin Session flagged as bot Clear cookies, new context, new IP
Empty price field Out of stock OR geo-mismatch Check with US residential IP, verify ASIN active
404 on product URL ASIN deleted or discontinued Remove from tracking
Prices shown in wrong currency Non-US proxy IP Use US-targeted proxy specifically
curl-cffi import error Not installed pip install curl-cffi
PA-API 429 Rate limit exceeded (1 req/sec) Add 1.1s sleep between requests
Keepa "TokensLeft: 0" Quota exhausted Wait for hourly refresh or upgrade plan

16. Which Approach Should You Use? {#summary}

Need Best Approach Cost
Product details for known ASINs PA-API Free with Associates account
Price history and trend data Keepa API ~$15-50/month
Review text and Q&A Playwright + residential proxies Proxy costs
Search results at scale Rainforest/Oxylabs API or Playwright $1-5/1K requests
Large-scale category scraping Playwright + ThorData proxies Variable
Price tracking pipeline PA-API primary + Keepa for history Combined above

The days of scraping Amazon with requests and free proxies are long gone. In 2026, you either use official APIs, pay for quality proxy infrastructure, or accept that your scraper will break every few days.

The practical path for most projects: start with PA-API for basic product data, add Keepa for price history, and use ThorData residential proxies with Playwright only when you need data those APIs don't cover -- full review text, search results, Q&A sections, and competitor analysis at category scale. Pick the approach that matches your data needs and budget, and always respect Amazon's Terms of Service for your specific use case.