← Back to blog

Scraping Amazon Product Data Without Getting Blocked (2026)

Scraping Amazon Product Data Without Getting Blocked (2026)

Amazon runs one of the most aggressive anti-bot systems on the web. Send a few requests with a default Python user-agent and you'll hit a CAPTCHA wall within minutes. Scale that up and your IP gets blacklisted for hours.

But people still need Amazon data — price monitoring, competitor analysis, review tracking. Here's what actually works in 2026 without getting your infrastructure burned.

Why Amazon Is So Hard to Scrape

Amazon uses a layered defense system:

The key insight: Amazon product pages are more accessible than search results. Search result pages have the tightest bot detection. Individual product pages (accessed via direct ASIN URLs) are comparatively easier to scrape.

What You Can Actually Get From Public Pages

Without logging in, you can reliably extract:

What you can't get without authentication: detailed seller analytics, your purchase history, subscriber discounts, full review text beyond the first page.

The Proxy Situation: Residential Is Non-Negotiable

For Amazon specifically, datacenter proxies are nearly useless. Amazon maintains blocklists of major datacenter IP ranges. Even rotating through thousands of datacenter IPs, you'll see CAPTCHA rates above 60%.

Residential proxies are the baseline requirement. These route through real ISP connections, making requests look like normal household traffic.

ThorData's residential proxy network works well for Amazon scraping — their pool covers IPs across multiple regions, which helps when Amazon serves different content based on location. The geo-targeting is useful for price comparison across markets.

Budget reality: expect to pay $3-8 per GB of residential proxy traffic. Amazon product pages average 200-400KB each, so you're looking at roughly 2,500-5,000 pages per GB.

Complete Amazon Product Scraper

Here's a full working scraper with proxy rotation, CAPTCHA detection, retry logic, and structured data extraction:

#!/usr/bin/env python3
"""
Amazon Product Scraper — Residential Proxy + Anti-Detection

Scrapes product data from Amazon product pages using direct ASIN URLs.
Handles CAPTCHA detection, automatic retry with backoff, proxy rotation,
and exports to JSON or CSV.

Usage:
    python amazon_scraper.py B0BSHF7WHW B0D1XD1ZV3
    python amazon_scraper.py --file asins.txt --format csv --output products.csv
    python amazon_scraper.py B0BSHF7WHW --domain amazon.co.uk

Requirements:
    pip install httpx selectolax
"""

import httpx
import json
import csv
import time
import random
import re
import argparse
import sys
from datetime import datetime, timezone
from pathlib import Path
from selectolax.parser import HTMLParser


# Browser User-Agent pool (keep updated — stale UAs are a detection signal)
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 "
    "Firefox/132.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 "
    "(KHTML, like Gecko) Version/18.1 Safari/605.1.15",
]

# Accept-Language variations to match different browser profiles
ACCEPT_LANGUAGES = [
    "en-US,en;q=0.9",
    "en-US,en;q=0.9,es;q=0.8",
    "en-GB,en;q=0.9,en-US;q=0.8",
    "en-US,en;q=0.5",
]


class AmazonScraper:
    def __init__(self, proxy_url: str = None, domain: str = "amazon.com",
                 delay_range: tuple = (5.0, 12.0)):
        """
        Args:
            proxy_url: Residential proxy URL (http://user:pass@host:port)
            domain: Amazon domain (amazon.com, amazon.co.uk, amazon.de, etc.)
            delay_range: Random delay between requests in seconds
        """
        self.domain = domain
        self.base_url = f"https://www.{domain}"
        self.delay_range = delay_range
        self.request_count = 0
        self.captcha_count = 0
        self.success_count = 0

        self.client_kwargs = {
            "timeout": 30,
            "follow_redirects": True,
            "http2": True,
        }
        if proxy_url:
            self.client_kwargs["proxy"] = proxy_url

    def _get_headers(self) -> dict:
        """Generate realistic browser headers with randomization."""
        ua = random.choice(USER_AGENTS)
        return {
            "User-Agent": ua,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,"
                      "image/avif,image/webp,*/*;q=0.8",
            "Accept-Language": random.choice(ACCEPT_LANGUAGES),
            "Accept-Encoding": "gzip, deflate, br",
            "DNT": "1",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
            "Sec-Fetch-User": "?1",
            "Cache-Control": "max-age=0",
        }

    def _is_captcha(self, html: str) -> bool:
        """Detect CAPTCHA / bot challenge pages."""
        captcha_signals = [
            "captcha", "robot check", "automated access",
            "enter the characters", "type the characters",
            "sorry, we just need to make sure",
            "[email protected]",
        ]
        html_lower = html.lower()
        return any(signal in html_lower for signal in captcha_signals)

    def _is_dog_page(self, html: str) -> bool:
        """Detect the Amazon 'sorry' dog page (soft block)."""
        return "sorry" in html.lower() and "dogs of amazon" in html.lower()

    def _rate_limit(self):
        """Apply adaptive rate limiting."""
        self.request_count += 1

        # Longer pause every 20 requests
        if self.request_count % 20 == 0:
            pause = random.uniform(20, 40)
            print(f"  Cooling down {pause:.0f}s after {self.request_count} requests "
                  f"({self.captcha_count} CAPTCHAs so far)...")
            time.sleep(pause)
        else:
            time.sleep(random.uniform(*self.delay_range))

    def fetch_product_page(self, asin: str, max_retries: int = 3) -> str | None:
        """
        Fetch raw HTML for a product page with retry logic.
        Returns HTML string or None on failure.
        """
        url = f"{self.base_url}/dp/{asin}"

        for attempt in range(max_retries):
            try:
                headers = self._get_headers()
                with httpx.Client(**self.client_kwargs) as client:
                    resp = client.get(url, headers=headers)

                if resp.status_code == 503 or self._is_captcha(resp.text):
                    self.captcha_count += 1
                    wait = (2 ** attempt) * 10 + random.uniform(5, 15)
                    print(f"  CAPTCHA on {asin} (attempt {attempt+1}). "
                          f"Backing off {wait:.0f}s...")
                    time.sleep(wait)
                    continue

                if self._is_dog_page(resp.text):
                    print(f"  Dog page on {asin} — IP may be flagged. "
                          f"Waiting 60s...")
                    time.sleep(60)
                    continue

                if resp.status_code == 404:
                    print(f"  {asin}: product not found (404)")
                    return None

                if resp.status_code == 200:
                    self.success_count += 1
                    return resp.text

                print(f"  {asin}: HTTP {resp.status_code} on attempt {attempt+1}")
                time.sleep(5)

            except httpx.RequestError as e:
                print(f"  {asin}: connection error - {e}")
                time.sleep(5)

        print(f"  {asin}: all {max_retries} attempts failed")
        return None

    def parse_product(self, html: str, asin: str) -> dict:
        """Extract structured product data from HTML."""
        tree = HTMLParser(html)
        product = {"asin": asin, "url": f"{self.base_url}/dp/{asin}"}

        # Title
        title_el = tree.css_first("#productTitle")
        product["title"] = title_el.text(strip=True) if title_el else None

        # Price — Amazon uses multiple price containers
        price = None
        for selector in [
            ".a-price .a-offscreen",
            "#priceblock_ourprice",
            "#priceblock_dealprice",
            "span.a-price span.a-offscreen",
            "#corePrice_feature_div .a-offscreen",
        ]:
            el = tree.css_first(selector)
            if el:
                price = el.text(strip=True)
                break
        product["price"] = price

        # Original price (for deals)
        orig_price_el = tree.css_first(
            ".a-price.a-text-price .a-offscreen, "
            "#listPrice, .basisPrice .a-offscreen"
        )
        product["original_price"] = (
            orig_price_el.text(strip=True) if orig_price_el else None
        )

        # Rating
        rating_el = tree.css_first("#acrPopover span.a-size-base, #acrPopover .a-icon-alt")
        if rating_el:
            rating_text = rating_el.text(strip=True)
            match = re.search(r'(\d+\.?\d*)', rating_text)
            product["rating"] = float(match.group(1)) if match else None
        else:
            product["rating"] = None

        # Review count
        review_el = tree.css_first("#acrCustomerReviewText")
        if review_el:
            review_text = review_el.text(strip=True).replace(",", "")
            match = re.search(r'(\d+)', review_text)
            product["review_count"] = int(match.group(1)) if match else 0
        else:
            product["review_count"] = 0

        # Availability
        avail_el = tree.css_first("#availability span, #availability")
        product["availability"] = (
            avail_el.text(strip=True) if avail_el else "Unknown"
        )

        # Brand
        brand_el = tree.css_first("#bylineInfo, a#brand")
        if brand_el:
            brand_text = brand_el.text(strip=True)
            brand_text = re.sub(r'^(Visit the |Brand: )', '', brand_text)
            brand_text = brand_text.replace(" Store", "")
            product["brand"] = brand_text
        else:
            product["brand"] = None

        # Bullet points / feature list
        bullets = []
        for li in tree.css("#feature-bullets ul li span.a-list-item"):
            text = li.text(strip=True)
            if text and "see more product details" not in text.lower():
                bullets.append(text)
        product["features"] = bullets

        # Product description
        desc_el = tree.css_first("#productDescription p, #productDescription span")
        product["description"] = desc_el.text(strip=True) if desc_el else None

        # Best Seller Rank
        bsr_el = tree.css_first("#SalesRank, th:contains('Best Sellers Rank') + td")
        if bsr_el:
            bsr_text = bsr_el.text(strip=True)
            match = re.search(r'#([\d,]+)', bsr_text)
            product["bsr"] = (
                int(match.group(1).replace(",", "")) if match else None
            )
            # Extract category
            cat_match = re.search(r'in\s+(.+?)(?:\(|$)', bsr_text)
            product["bsr_category"] = (
                cat_match.group(1).strip() if cat_match else None
            )
        else:
            product["bsr"] = None
            product["bsr_category"] = None

        # Images (high-res URLs from the page source)
        images = []
        img_matches = re.findall(
            r'"hiRes"\s*:\s*"(https://[^"]+)"', html
        )
        images.extend(img_matches)
        if not images:
            # Fallback to main image
            main_img = tree.css_first("#landingImage, #imgBlkFront")
            if main_img:
                src = main_img.attributes.get("src", "")
                if src:
                    images.append(src)
        product["images"] = images[:10]  # cap at 10

        # ASIN confirmation from page
        asin_el = tree.css_first("th:contains('ASIN') + td, input#ASIN")
        if asin_el:
            product["asin_confirmed"] = asin_el.text(strip=True)

        product["scraped_at"] = datetime.now(timezone.utc).isoformat()
        product["domain"] = self.domain

        return product

    def scrape_product(self, asin: str) -> dict | None:
        """Fetch and parse a single product."""
        html = self.fetch_product_page(asin)
        if not html:
            return None
        return self.parse_product(html, asin)

    def scrape_batch(self, asins: list[str]) -> list[dict]:
        """Scrape multiple products with rate limiting."""
        results = []
        total = len(asins)

        for i, asin in enumerate(asins):
            asin = asin.strip()
            if not asin:
                continue

            print(f"[{i+1}/{total}] Scraping {asin}...")
            product = self.scrape_product(asin)

            if product:
                results.append(product)
                title = (product["title"] or "No title")[:60]
                print(f"  {title}")
                print(f"  Price: {product['price']} | "
                      f"Rating: {product['rating']} | "
                      f"Reviews: {product['review_count']:,}")

            if i < total - 1:
                self._rate_limit()

        print(f"\nDone. Scraped {len(results)}/{total} products.")
        print(f"Success rate: {len(results)/total*100:.0f}% "
              f"| CAPTCHAs hit: {self.captcha_count}")
        return results


def export_json(products: list[dict], filename: str):
    """Export products to JSON."""
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(products, f, indent=2, ensure_ascii=False)
    print(f"Exported {len(products)} products to {filename}")


def export_csv(products: list[dict], filename: str):
    """Export products to CSV (features flattened to semicolons)."""
    if not products:
        return

    flat = []
    for p in products:
        row = {k: v for k, v in p.items() if k not in ("features", "images")}
        row["features"] = "; ".join(p.get("features", []))
        row["image_count"] = len(p.get("images", []))
        row["main_image"] = p["images"][0] if p.get("images") else ""
        flat.append(row)

    fieldnames = list(flat[0].keys())
    with open(filename, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(flat)
    print(f"Exported {len(flat)} products to {filename}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Scrape Amazon product data via ASIN"
    )
    parser.add_argument("asins", nargs="*", help="Amazon ASINs to scrape")
    parser.add_argument("--file", "-f", help="File with ASINs (one per line)")
    parser.add_argument("--format", choices=["json", "csv"], default="json")
    parser.add_argument("--output", "-o", help="Output filename")
    parser.add_argument("--domain", default="amazon.com",
                        help="Amazon domain (amazon.com, amazon.co.uk, etc.)")
    parser.add_argument("--proxy", help="Proxy URL (http://user:pass@host:port)")
    args = parser.parse_args()

    asins = list(args.asins)
    if args.file:
        asins.extend(Path(args.file).read_text().strip().splitlines())

    if not asins:
        print("No ASINs provided. Usage: python amazon_scraper.py B0BSHF7WHW")
        sys.exit(1)

    scraper = AmazonScraper(proxy_url=args.proxy, domain=args.domain)
    products = scraper.scrape_batch(asins)

    out_file = args.output or f"amazon_products.{args.format}"
    if args.format == "csv":
        export_csv(products, out_file)
    else:
        export_json(products, out_file)

Expected output

Running python amazon_scraper.py B0BSHF7WHW B0D1XD1ZV3 --format json:

[
  {
    "asin": "B0BSHF7WHW",
    "url": "https://www.amazon.com/dp/B0BSHF7WHW",
    "title": "Apple AirPods Pro (2nd Generation) Wireless Ear Buds",
    "price": "$189.99",
    "original_price": "$249.00",
    "rating": 4.7,
    "review_count": 87432,
    "availability": "In Stock",
    "brand": "Apple",
    "features": [
      "PIONEERING HEARING — AirPods Pro 2 unlock world-class hearing aid...",
      "INTELLIGENT NOISE CONTROL — Active Noise Cancellation removes...",
      "IMPROVED SOUND AND CALL QUALITY — A custom Apple-designed chip..."
    ],
    "bsr": 3,
    "bsr_category": "Electronics",
    "images": [
      "https://m.media-amazon.com/images/I/61f1YfTkTDL._AC_SL1500_.jpg"
    ],
    "domain": "amazon.com",
    "scraped_at": "2026-03-30T14:22:00+00:00"
  }
]

CSV output:

asin,url,title,price,original_price,rating,review_count,availability,brand,bsr,bsr_category,features,image_count,main_image
B0BSHF7WHW,https://www.amazon.com/dp/B0BSHF7WHW,"Apple AirPods Pro...",$189.99,$249.00,4.7,87432,In Stock,Apple,3,Electronics,"PIONEERING HEARING...;INTELLIGENT NOISE...",6,https://m.media-amazon.com/...

Use Case 1: Price Monitoring Dashboard

Track price changes over time — useful for deal alerts, competitor pricing, or purchase timing:

"""
Amazon Price Monitor
Runs on a schedule to track price history for a watchlist of ASINs.
Appends to a CSV log for time-series analysis.
"""

def monitor_prices(asins: list[str], log_file: str = "price_history.csv",
                   proxy_url: str = None):
    """Scrape current prices and append to history log."""
    scraper = AmazonScraper(proxy_url=proxy_url)
    products = scraper.scrape_batch(asins)

    today = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
    file_exists = Path(log_file).exists()

    with open(log_file, "a", newline="", encoding="utf-8") as f:
        fieldnames = [
            "timestamp", "asin", "title", "price", "original_price",
            "rating", "review_count", "availability", "bsr"
        ]
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()

        for p in products:
            writer.writerow({
                "timestamp": today,
                "asin": p["asin"],
                "title": (p["title"] or "")[:80],
                "price": p["price"],
                "original_price": p["original_price"],
                "rating": p["rating"],
                "review_count": p["review_count"],
                "availability": p["availability"],
                "bsr": p["bsr"],
            })

    # Print price summary
    print(f"\n{'ASIN':<14} {'Price':>10} {'Was':>10} {'Rating':>7} {'Reviews':>8}")
    print("-" * 55)
    for p in products:
        print(f"{p['asin']:<14} {p['price'] or 'N/A':>10} "
              f"{p['original_price'] or '-':>10} "
              f"{p['rating'] or '-':>7} {p['review_count']:>8,}")

    return products


# Run daily via cron:
# 0 9 * * * cd /path/to && python amazon_scraper.py --monitor

Use Case 2: Cross-Market Price Comparison

Compare the same product across different Amazon marketplaces to find price differences:

"""
Cross-market Amazon price comparison.
Checks the same ASIN across multiple Amazon domains.
"""

AMAZON_DOMAINS = [
    "amazon.com",      # US
    "amazon.co.uk",    # UK
    "amazon.de",       # Germany
    "amazon.fr",       # France
    "amazon.co.jp",    # Japan
    "amazon.ca",       # Canada
]

def compare_markets(asin: str, domains: list[str] = None,
                    proxy_url: str = None):
    """Compare prices for an ASIN across Amazon marketplaces."""
    domains = domains or AMAZON_DOMAINS

    print(f"\nCross-market price comparison for {asin}")
    print("=" * 60)

    results = []
    for domain in domains:
        print(f"\n  Checking {domain}...")
        scraper = AmazonScraper(proxy_url=proxy_url, domain=domain)
        product = scraper.scrape_product(asin)

        if product and product["price"]:
            results.append({
                "domain": domain,
                "price": product["price"],
                "availability": product["availability"],
                "rating": product["rating"],
                "review_count": product["review_count"],
            })
            print(f"    Price: {product['price']} | {product['availability']}")
        else:
            print(f"    Not available or blocked")

        time.sleep(random.uniform(5, 10))

    if results:
        print(f"\n{'Domain':<20} {'Price':>12} {'Rating':>8} {'Reviews':>10}")
        print("-" * 52)
        for r in results:
            print(f"{r['domain']:<20} {r['price']:>12} "
                  f"{r['rating'] or '-':>8} {r['review_count']:>10,}")

    return results

Use Case 3: Review Trend Tracker

Monitor how review counts and ratings change over time — useful for detecting review manipulation or tracking product reception:

"""
Amazon Review Trend Tracker
Tracks review count and rating changes daily.
Flags anomalies like sudden review spikes (possible fake reviews).
"""

def track_review_trends(asins: list[str], history_file: str = "review_trends.csv",
                        proxy_url: str = None):
    """Track review metrics and flag anomalies."""
    scraper = AmazonScraper(proxy_url=proxy_url)
    products = scraper.scrape_batch(asins)

    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    history = Path(history_file)

    # Load previous data for comparison
    previous = {}
    if history.exists():
        with open(history, "r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                key = row["asin"]
                if key not in previous or row["date"] > previous[key]["date"]:
                    previous[key] = row

    # Append new data
    file_exists = history.exists()
    with open(history, "a", newline="", encoding="utf-8") as f:
        fieldnames = ["date", "asin", "title", "rating", "review_count", "bsr", "flag"]
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()

        for p in products:
            flag = ""
            prev = previous.get(p["asin"])
            if prev:
                old_reviews = int(prev.get("review_count", 0))
                new_reviews = p["review_count"]
                daily_increase = new_reviews - old_reviews

                # Flag if review count jumped by more than 5% in one day
                if old_reviews > 0 and daily_increase > old_reviews * 0.05:
                    flag = f"SPIKE: +{daily_increase} reviews in 1 day"
                    print(f"  WARNING: {p['asin']} - {flag}")

                # Flag if rating dropped significantly
                old_rating = float(prev.get("rating") or 0)
                if old_rating > 0 and p["rating"] and p["rating"] < old_rating - 0.2:
                    flag += f" RATING_DROP: {old_rating} -> {p['rating']}"

            writer.writerow({
                "date": today,
                "asin": p["asin"],
                "title": (p["title"] or "")[:60],
                "rating": p["rating"],
                "review_count": p["review_count"],
                "bsr": p["bsr"],
                "flag": flag,
            })

    return products

Rate Limiting: The Single Most Important Thing

The single biggest mistake is going too fast. Even with perfect proxies, sending 10 requests per second from any pattern will trigger detection.

What works in practice

Setting Value Why
Delay between requests 5-12 seconds random Fixed intervals are a bot signature
Cooldown every 20 requests 20-40 seconds Prevents cumulative detection
CAPTCHA backoff Exponential (10s, 20s, 40s) Don't retry immediately
Max requests per IP per hour ~50-80 Beyond this, blocks increase sharply
Session duration Fresh client per 50 requests Prevents cookie tracking

Signals that get you blocked

What Breaks and When to Expect It

Amazon updates their anti-bot measures roughly every 2-4 weeks. Common breakage points:

Plan for maintenance. Any Amazon scraper that works today will need updates within a month.

The Easier Alternative: Pre-Built Actors

If you don't want to maintain proxy infrastructure and fight Amazon's constantly-changing selectors, Apify's Amazon scraper actors handle the anti-bot layer for you. They maintain the selectors, rotate proxies internally, and handle CAPTCHAs. You pay per result instead of managing infrastructure.

This makes more sense when you need data reliably at scale rather than building scraping as a core competency.

Summary

The formula for scraping Amazon without blocks in 2026: residential proxies + slow request rates + direct product page URLs + realistic browser headers + adaptive backoff. Skip any of these and you'll hit CAPTCHAs immediately.

The complete scraper above handles CAPTCHA detection, exponential backoff, dog-page detection, multi-domain support, and clean CSV/JSON export. For production use, combine it with the price monitoring or review tracking pipelines to build ongoing data collection that survives Amazon's regular anti-bot updates.