Scraping Amazon Product Data in 2026: ASIN, Price History, Reviews, and Searches
Scraping Amazon Product Data in 2026: ASIN, Price History, Reviews, and Searches
Amazon is the single hardest major website to scrape in 2026. If you've tried it, you already know -- datacenter IPs get blocked within seconds, CAPTCHAs appear out of nowhere, and even well-crafted requests return 503 "Robot Check" pages. I've spent months figuring out what actually works, so here's the honest breakdown.
Table of Contents
- Why Amazon Is So Hard to Scrape
- Understanding Amazon's Product Structure
- Approach 1: Amazon Product Advertising API (Official)
- Approach 2: Keepa API for Price History
- Approach 3: Playwright with Residential Proxies
- Approach 4: Parsing JSON-LD Structured Data
- Approach 5: Review Scraping
- Approach 6: Search Results and Category Pages
- Approach 7: API Services (Rainforest, Oxylabs)
- Anti-Detection: Headers, TLS, and Fingerprints
- Proxy Strategy for Amazon
- Rate Limiting and Request Scheduling
- Storing Amazon Data: Schema Design
- Price Monitoring Pipeline
- Common Errors and Fixes
- Which Approach Should You Use?
1. Why Amazon Is So Hard to Scrape {#why-hard}
Amazon's anti-bot infrastructure is arguably the most sophisticated on the public web. Here's what you're up against:
Instant datacenter IP bans. Send a single request from an AWS, GCP, DigitalOcean, or Linode IP and you'll get a CAPTCHA or 503 before your second request fires. Amazon maintains massive blocklists of every major cloud provider's IP ranges.
TLS fingerprinting. Amazon checks your TLS client hello against known browser fingerprints. Python's requests library and httpx have identifiable TLS fingerprints that Amazon blocks immediately. You need either browser automation or a library like curl-cffi that impersonates a real browser's TLS handshake.
Browser fingerprinting. Amazon checks JavaScript execution patterns, WebGL rendering, canvas hashes, and navigator properties. Headless Chrome with default settings is detected within 1-2 page loads.
Behavioral analysis. Request rate, click patterns, and navigation sequences are all analyzed. Loading product pages directly without a search page visit first is a pattern bots exhibit. Human users browse -- they don't teleport directly to product pages.
Dynamic structure. CSS class names rotate, DOM structure shifts between A/B tests, and pages render differently based on geo, login state, and detected bot score. No CSS selector survives more than a few weeks unchanged.
The bottom line: brute-force scraping Amazon at scale without the right tools is not viable in 2026.
2. Understanding Amazon's Product Structure {#product-structure}
Before writing any code, understand how Amazon organizes products:
- ASIN (Amazon Standard Identification Number): The 10-character alphanumeric ID for every product. This is your primary key. Example:
B0DCXZJQ8V. - URL pattern: Product pages live at
https://www.amazon.com/dp/{ASIN}/orhttps://www.amazon.com/gp/product/{ASIN}/. The slug before/dp/is cosmetic -- only the ASIN matters. - Parent vs. Child ASINs: A single product listing can have multiple child ASINs (sizes, colors, styles). The parent ASIN groups them. When you scrape a product page, you're usually looking at a child ASIN.
- Offers listing: The page at
/gp/offer-listing/{ASIN}/shows all third-party sellers for a product, with their prices and conditions. - Review pages: Reviews live at
/product-reviews/{ASIN}/, paginated in sets of 10. - Category pages: Browse by category from the department tree, paginated with
&page=N.
def normalize_amazon_url(url_or_asin: str) -> str:
"""Convert any Amazon product URL or ASIN to canonical form."""
import re
# Extract ASIN from various URL formats
asin_patterns = [
r'/dp/([A-Z0-9]{10})',
r'/gp/product/([A-Z0-9]{10})',
r'/product/([A-Z0-9]{10})',
]
for pattern in asin_patterns:
m = re.search(pattern, url_or_asin)
if m:
return f"https://www.amazon.com/dp/{m.group(1)}/"
# If it's a bare ASIN
if re.match(r'^[A-Z0-9]{10}$', url_or_asin.upper()):
return f"https://www.amazon.com/dp/{url_or_asin.upper()}/"
return url_or_asin
def extract_asin(url: str) -> str | None:
"""Extract ASIN from any Amazon URL."""
import re
for pattern in [r'/dp/([A-Z0-9]{10})', r'/gp/product/([A-Z0-9]{10})']:
m = re.search(pattern, url)
if m:
return m.group(1)
return None
3. Approach 1: Amazon Product Advertising API (Official) {#pa-api}
The cleanest path. You need an Amazon Associates (affiliate) account, which gives you access to the PA-API 5.0. It returns structured JSON with product details, pricing, images, and review summaries.
Requirements: - Amazon Associates account (free to create) - At least 3 qualifying sales within 180 days to maintain access - Access Key ID and Secret Access Key from your Associates dashboard
# Install: pip install paapi5-python-sdk
from paapi5_python_sdk.api.default_api import DefaultApi
from paapi5_python_sdk.models.get_items_request import GetItemsRequest
from paapi5_python_sdk.models.get_items_resource import GetItemsResource
from paapi5_python_sdk.models.search_items_request import SearchItemsRequest
from paapi5_python_sdk.models.search_items_resource import SearchItemsResource
from paapi5_python_sdk.rest import ApiException
ACCESS_KEY = "your_access_key"
SECRET_KEY = "your_secret_key"
PARTNER_TAG = "yourtag-20"
REGION = "us-east-1"
HOST = "webservices.amazon.com"
api = DefaultApi(
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
host=HOST,
region=REGION
)
def get_products_by_asin(asins: list[str]) -> list[dict]:
"""Fetch product details for up to 10 ASINs at once."""
request = GetItemsRequest(
partner_tag=PARTNER_TAG,
partner_type="Associates",
item_ids=asins[:10], # Max 10 per request
resources=[
GetItemsResource.ITEMINFO_TITLE,
GetItemsResource.ITEMINFO_FEATURES,
GetItemsResource.ITEMINFO_PRODUCTINFO,
GetItemsResource.OFFERS_LISTINGS_PRICE,
GetItemsResource.OFFERS_LISTINGS_DELIVERYINFO_ISPRIMEELIGIBLE,
GetItemsResource.OFFERS_SUMMARIES_HIGHESTPRICE,
GetItemsResource.OFFERS_SUMMARIES_LOWESTPRICE,
GetItemsResource.IMAGES_PRIMARY_LARGE,
GetItemsResource.CUSTOMERRATINGS,
GetItemsResource.BROWSENODEINFO_BROWSENODES,
]
)
try:
response = api.get_items(request)
if not response.items_result:
return []
products = []
for item in response.items_result.items:
price = None
if item.offers and item.offers.listings:
listing = item.offers.listings[0]
price = listing.price.display_amount if listing.price else None
products.append({
"asin": item.asin,
"title": item.item_info.title.display_value if item.item_info.title else None,
"price": price,
"rating": item.customer_ratings.star_rating.value if item.customer_ratings else None,
"ratings_count": item.customer_ratings.count.value if item.customer_ratings else None,
"url": item.detail_page_url,
"image": (item.images.primary.large.url
if item.images and item.images.primary else None),
"is_prime": (item.offers.listings[0].delivery_info.is_prime_eligible
if item.offers and item.offers.listings else None),
"features": [f.display_value for f in
(item.item_info.features.display_values or [])
if item.item_info.features] if item.item_info else [],
})
return products
except ApiException as e:
print(f"PA-API error: {e}")
return []
def search_products_pa_api(keywords: str, category: str = None,
min_price: float = None,
max_price: float = None) -> list[dict]:
"""Search Amazon products via PA-API."""
request = SearchItemsRequest(
partner_tag=PARTNER_TAG,
partner_type="Associates",
keywords=keywords,
search_index=category or "All",
min_price=int(min_price * 100) if min_price else None,
max_price=int(max_price * 100) if max_price else None,
resources=[
SearchItemsResource.ITEMINFO_TITLE,
SearchItemsResource.OFFERS_LISTINGS_PRICE,
SearchItemsResource.CUSTOMERRATINGS,
SearchItemsResource.IMAGES_PRIMARY_MEDIUM,
]
)
try:
response = api.search_items(request)
if not response.search_result:
return []
return [{"asin": item.asin, "title": item.item_info.title.display_value}
for item in response.search_result.items]
except ApiException as e:
print(f"Search error: {e}")
return []
Limits: 1 request per second, up to 8,700 requests per day (scales with affiliate revenue). Max 10 ASINs per GetItems call. The API doesn't return full review text -- just aggregate ratings.
4. Approach 2: Keepa API for Price History {#keepa}
For historical pricing data, Keepa is the answer. They've been tracking Amazon prices since 2011 and their API is the most reliable source for price history, sales rank trends, and deal detection.
import requests
KEEPA_API_KEY = "your_keepa_key"
KEEPA_BASE = "https://api.keepa.com"
def get_keepa_product(asin: str, domain: int = 1,
days: int = 90) -> dict:
"""Get product data including price history from Keepa."""
params = {
"key": KEEPA_API_KEY,
"domain": domain, # 1=amazon.com, 3=amazon.co.uk, 4=amazon.de
"asin": asin,
"history": 1,
"days": days,
"stats": 1,
"buybox": 1,
"offers": 20,
}
resp = requests.get(f"{KEEPA_BASE}/product", params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if not data.get("products"):
return {}
product = data["products"][0]
return _parse_keepa_product(product)
def _parse_keepa_product(product: dict) -> dict:
"""Parse Keepa product data into a clean structure."""
# Keepa timestamps are minutes since 2011-01-01 00:00:00 UTC
KEEPA_EPOCH = 1293840000 # Unix timestamp of 2011-01-01
def keepa_time_to_unix(keepa_minutes: int) -> int:
return KEEPA_EPOCH + keepa_minutes * 60
def parse_price_history(csv_data: list) -> list[dict]:
"""Parse Keepa's [timestamp, price, timestamp, price, ...] format."""
if not csv_data:
return []
history = []
for i in range(0, len(csv_data) - 1, 2):
ts = csv_data[i]
price = csv_data[i + 1]
if ts > 0 and price > 0:
history.append({
"timestamp": keepa_time_to_unix(ts),
"price_cents": price,
"price": price / 100,
})
return history
csv = product.get("csv", [])
# csv[0] = Amazon price, csv[1] = Marketplace new, csv[2] = Marketplace used
# csv[3] = Sales rank, csv[16] = Buy Box price
stats = product.get("stats", {})
return {
"asin": product.get("asin"),
"title": product.get("title"),
"brand": product.get("brand"),
"model": product.get("model"),
"sales_rank": product.get("salesRankCurrent"),
"sales_rank_reference": product.get("salesRankReference"),
"rating": product.get("rating") / 10 if product.get("rating") else None,
"review_count": product.get("reviewCount"),
"amazon_price_current": csv[0][-1] / 100 if csv and csv[0] else None,
"amazon_price_history": parse_price_history(csv[0]) if csv else [],
"buybox_price_current": (csv[16][-1] / 100
if csv and len(csv) > 16 and csv[16] else None),
"price_30d_avg": stats.get("avg30", [None, None])[1],
"price_90d_avg": stats.get("avg90", [None, None])[1],
"price_all_time_low": stats.get("atl", [None, None])[1],
"price_all_time_high": stats.get("ath", [None, None])[1],
"out_of_stock_percentage_30d": stats.get("outOfStockPercentage30", 0),
"categories": product.get("categories", []),
"images": product.get("imagesCSV", "").split(","),
}
def search_keepa(query: str, domain: int = 1,
sort_by: int = 0) -> list[str]:
"""Search for products on Keepa. Returns list of ASINs."""
params = {
"key": KEEPA_API_KEY,
"domain": domain,
"type": "search",
"term": query,
"sortType": sort_by, # 0=relevance, 1=sales rank, 2=price
}
resp = requests.get(f"{KEEPA_BASE}/search", params=params, timeout=15)
resp.raise_for_status()
return resp.json().get("asinList", [])
# Usage
product = get_keepa_product("B0DCXZJQ8V")
print(f"Current price: ${product['amazon_price_current']}")
print(f"30-day average: ${product['price_30d_avg'] / 100:.2f}" if product.get('price_30d_avg') else "No history")
print(f"All-time low: ${product['price_all_time_low'] / 100:.2f}" if product.get('price_all_time_low') else "No ATL data")
Cost: Keepa charges per "token" -- roughly 1 token per product with history enabled. Plans start around $15/month for 50 tokens/minute. For tracking known ASINs it's excellent value; cheaper than building your own historical tracking infrastructure by far.
5. Approach 3: Playwright with Residential Proxies {#playwright}
When you need data the APIs don't provide -- full review text, Q&A sections, detailed seller info, search results, or category pages -- you'll need browser automation with residential proxies.
import asyncio
import json
import random
import time
from playwright.async_api import async_playwright
PROXY_HOST = "proxy.thordata.com"
PROXY_PORT = 9000
PROXY_USER = "your_user"
PROXY_PASS = "your_pass"
def get_proxy_config(country: str = "US") -> dict:
user = f"{PROXY_USER}-country-{country.lower()}"
return {
"server": f"http://{PROXY_HOST}:{PROXY_PORT}",
"username": user,
"password": PROXY_PASS,
}
async def create_amazon_context(playwright, country: str = "US"):
"""Create a browser context configured for Amazon."""
browser = await playwright.chromium.launch(
headless=True,
proxy=get_proxy_config(country),
args=[
"--disable-blink-features=AutomationControlled",
"--disable-features=IsolateOrigins,site-per-process",
"--no-sandbox",
]
)
context = await browser.new_context(
viewport={"width": 1366, "height": 768},
locale="en-US",
timezone_id="America/New_York",
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
),
)
# Remove webdriver property
await context.add_init_script("""
delete Object.getPrototypeOf(navigator).webdriver;
""")
return browser, context
async def scrape_amazon_product(page, asin: str) -> dict:
"""Scrape a single Amazon product page."""
url = f"https://www.amazon.com/dp/{asin}/"
# Navigate with a realistic referer
response = await page.goto(
url,
wait_until="domcontentloaded",
timeout=30000,
)
title = await page.title()
if response.status == 503 or "Robot Check" in title:
return {"asin": asin, "error": "bot_detected_503"}
if "/ap/signin" in page.url:
return {"asin": asin, "error": "redirected_to_login"}
if response.status == 404:
return {"asin": asin, "error": "not_found"}
# Method 1: JSON-LD structured data (most stable)
ld_json = await page.evaluate("""
() => {
const scripts = document.querySelectorAll('script[type="application/ld+json"]');
for (const s of scripts) {
try {
const data = JSON.parse(s.textContent);
if (data['@type'] === 'Product') return data;
if (Array.isArray(data)) {
const product = data.find(d => d['@type'] === 'Product');
if (product) return product;
}
} catch (e) {}
}
return null;
}
""")
if ld_json:
offers = ld_json.get("offers", {})
if isinstance(offers, list):
offers = offers[0] if offers else {}
return {
"asin": asin,
"name": ld_json.get("name"),
"description": ld_json.get("description"),
"brand": (ld_json.get("brand") or {}).get("name"),
"rating": (ld_json.get("aggregateRating") or {}).get("ratingValue"),
"review_count": (ld_json.get("aggregateRating") or {}).get("reviewCount"),
"price": offers.get("price"),
"currency": offers.get("priceCurrency"),
"availability": offers.get("availability", "").split("/")[-1],
"image": ld_json.get("image"),
"url": ld_json.get("url", url),
"source": "json_ld",
}
# Method 2: Direct DOM parsing (fallback)
product = {"asin": asin, "source": "dom_parse"}
title_el = await page.query_selector("#productTitle")
if title_el:
product["name"] = (await title_el.text_content()).strip()
price_el = await page.query_selector(".a-price .a-offscreen")
if price_el:
product["price"] = (await price_el.text_content()).strip()
rating_el = await page.query_selector("i.a-icon-star span.a-icon-alt")
if rating_el:
product["rating"] = (await rating_el.text_content()).strip().split()[0]
return product
async def scrape_amazon_products_batch(asins: list[str],
country: str = "US") -> list[dict]:
"""Scrape multiple Amazon products with randomized delays."""
results = []
async with async_playwright() as p:
browser, context = await create_amazon_context(p, country)
page = await context.new_page()
# Warm up: visit Amazon homepage first
await page.goto("https://www.amazon.com/", wait_until="domcontentloaded")
await page.wait_for_timeout(random.randint(2000, 4000))
for i, asin in enumerate(asins):
data = await scrape_amazon_product(page, asin)
results.append(data)
if data.get("error") == "bot_detected_503":
print(f"[{i+1}] Bot detected on {asin}, backing off...")
await page.wait_for_timeout(30000)
# Create fresh context after detection
await context.close()
await browser.close()
browser, context = await create_amazon_context(p, country)
page = await context.new_page()
await page.goto("https://www.amazon.com/")
await page.wait_for_timeout(3000)
else:
# Normal delay between products
delay = random.randint(4000, 9000)
await page.wait_for_timeout(delay)
await browser.close()
return results
# Usage
import asyncio
results = asyncio.run(scrape_amazon_products_batch(
["B0DCXZJQ8V", "B0BN93M8SP", "B0D5BP2BNR"],
country="US"
))
6. Approach 4: Parsing JSON-LD Structured Data {#json-ld}
The JSON-LD <script type="application/ld+json"> block is the most stable data source on Amazon product pages. While CSS classes and DOM structure shift constantly with A/B tests, the structured data block follows schema.org conventions and changes far less frequently.
from curl_cffi import requests as cffi_requests
import json
import re
def scrape_product_json_ld(asin: str, proxy: str = None) -> dict:
"""Scrape Amazon product using curl-cffi to bypass TLS fingerprinting."""
url = f"https://www.amazon.com/dp/{asin}/"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Site": "none",
"Sec-Fetch-Mode": "navigate",
}
session = cffi_requests.Session()
kwargs = {
"headers": headers,
"impersonate": "chrome131",
"timeout": 15,
}
if proxy:
kwargs["proxies"] = {"https": proxy}
resp = session.get(url, **kwargs)
if resp.status_code != 200:
return {"asin": asin, "error": f"status_{resp.status_code}"}
html = resp.text
if "Robot Check" in html or "api-services-support" in html:
return {"asin": asin, "error": "bot_detected"}
# Extract all JSON-LD blocks
for match in re.finditer(
r'<script[^>]+type="application/ld\+json"[^>]*>(.*?)</script>',
html, re.DOTALL
):
try:
data = json.loads(match.group(1))
# Handle both single objects and arrays
candidates = data if isinstance(data, list) else [data]
for candidate in candidates:
if candidate.get("@type") == "Product":
return _parse_product_schema(candidate, asin)
except json.JSONDecodeError:
continue
return {"asin": asin, "error": "no_product_schema_found"}
def _parse_product_schema(schema: dict, asin: str) -> dict:
"""Parse schema.org Product JSON-LD into clean product dict."""
offers = schema.get("offers", {})
if isinstance(offers, list):
# Take the lowest-priced offer
try:
offers = min(offers, key=lambda o: float(o.get("price", 999999)))
except (ValueError, TypeError):
offers = offers[0] if offers else {}
rating = schema.get("aggregateRating", {})
return {
"asin": asin,
"name": schema.get("name"),
"description": schema.get("description"),
"brand": (schema.get("brand") or {}).get("name"),
"sku": schema.get("sku"),
"gtin13": schema.get("gtin13"),
"price": offers.get("price"),
"currency": offers.get("priceCurrency"),
"availability": offers.get("availability", "").replace(
"https://schema.org/", ""
),
"condition": offers.get("itemCondition", "").replace(
"https://schema.org/", ""
),
"rating": rating.get("ratingValue"),
"review_count": rating.get("reviewCount"),
"best_rating": rating.get("bestRating"),
"image": schema.get("image"),
"url": offers.get("url"),
"source": "json_ld",
}
7. Approach 5: Review Scraping {#reviews}
Amazon reviews live at /product-reviews/{ASIN}/ and can be scraped with careful browser automation:
async def scrape_reviews(page, asin: str,
max_pages: int = 5) -> list[dict]:
"""Scrape Amazon product reviews."""
reviews = []
base_url = f"https://www.amazon.com/product-reviews/{asin}/"
for page_num in range(1, max_pages + 1):
url = f"{base_url}?pageNumber={page_num}"
await page.goto(url, wait_until="domcontentloaded")
await page.wait_for_timeout(random.randint(2000, 4000))
title = await page.title()
if "Robot Check" in title:
break
# Extract reviews
review_items = await page.query_selector_all(
"div[data-hook='review']"
)
if not review_items:
break
for item in review_items:
try:
title_el = await item.query_selector(
"a[data-hook='review-title'] span:not(.a-letter-space)"
)
body_el = await item.query_selector(
"span[data-hook='review-body'] span"
)
rating_el = await item.query_selector(
"i[data-hook='review-star-rating'] span.a-icon-alt"
)
author_el = await item.query_selector(
"span.a-profile-name"
)
date_el = await item.query_selector(
"span[data-hook='review-date']"
)
verified_el = await item.query_selector(
"span[data-hook='avp-badge']"
)
helpful_el = await item.query_selector(
"span[data-hook='helpful-vote-statement']"
)
review_title = (await title_el.text_content()).strip() if title_el else ""
body = (await body_el.text_content()).strip() if body_el else ""
rating_text = (await rating_el.text_content()).strip() if rating_el else ""
rating = float(rating_text.split()[0]) if rating_text else None
reviews.append({
"asin": asin,
"title": review_title,
"body": body,
"rating": rating,
"author": (await author_el.text_content()).strip() if author_el else "",
"date": (await date_el.text_content()).replace("Reviewed in", "").strip() if date_el else "",
"verified_purchase": verified_el is not None,
"helpful_votes": (await helpful_el.text_content()).strip() if helpful_el else "",
"page_num": page_num,
})
except Exception:
continue
await page.wait_for_timeout(random.randint(3000, 6000))
return reviews
def analyze_reviews(reviews: list[dict]) -> dict:
"""Compute summary statistics from scraped reviews."""
if not reviews:
return {}
ratings = [r["rating"] for r in reviews if r.get("rating")]
verified = [r for r in reviews if r.get("verified_purchase")]
import statistics
return {
"total": len(reviews),
"verified_purchase_count": len(verified),
"avg_rating": statistics.mean(ratings) if ratings else 0,
"rating_distribution": {
str(i): sum(1 for r in ratings if int(r) == i)
for i in range(1, 6)
},
"verified_purchase_rate": len(verified) / len(reviews) if reviews else 0,
}
8. Approach 6: Search Results and Category Pages {#search}
async def scrape_search_results(page, query: str,
max_pages: int = 3) -> list[dict]:
"""Scrape Amazon search results pages."""
products = []
for page_num in range(1, max_pages + 1):
url = (f"https://www.amazon.com/s?k={query.replace(' ', '+')}"
f"&page={page_num}")
await page.goto(url, wait_until="domcontentloaded")
await page.wait_for_timeout(random.randint(2000, 4000))
# Extract products from search result cards
cards = await page.query_selector_all(
"div[data-component-type='s-search-result']"
)
for card in cards:
asin = await card.get_attribute("data-asin")
if not asin:
continue
title_el = await card.query_selector("h2 a.a-link-normal span")
price_el = await card.query_selector("span.a-price .a-offscreen")
rating_el = await card.query_selector("i.a-icon-star-small span.a-icon-alt")
review_count_el = await card.query_selector("span.a-size-base.s-underline-text")
img_el = await card.query_selector("img.s-image")
badge_el = await card.query_selector("span.a-badge-text")
prime_el = await card.query_selector("i.aok-relative.s-prime")
products.append({
"asin": asin,
"title": (await title_el.text_content()).strip() if title_el else "",
"price": (await price_el.text_content()).strip() if price_el else "",
"rating": (await rating_el.text_content()).strip().split()[0] if rating_el else "",
"review_count": (await review_count_el.text_content()).strip() if review_count_el else "",
"image": await img_el.get_attribute("src") if img_el else "",
"badge": (await badge_el.text_content()).strip() if badge_el else "",
"is_prime": prime_el is not None,
"page": page_num,
})
await page.wait_for_timeout(random.randint(3000, 6000))
return products
9. Approach 7: API Services {#api-services}
For Amazon search results and high-volume product data, managed scraping APIs handle the proxy rotation and CAPTCHA solving for you:
import requests
def scrape_via_rainforest(asin: str, api_key: str,
amazon_domain: str = "amazon.com") -> dict:
"""Get Amazon product data via Rainforest API."""
params = {
"api_key": api_key,
"type": "product",
"asin": asin,
"amazon_domain": amazon_domain,
"include_summarization_attributes": True,
"include_a_plus_body": True,
}
resp = requests.get("https://api.rainforestapi.com/request",
params=params, timeout=30)
resp.raise_for_status()
return resp.json().get("product", {})
def search_via_rainforest(query: str, api_key: str,
page: int = 1) -> list[dict]:
"""Search Amazon via Rainforest API."""
params = {
"api_key": api_key,
"type": "search",
"amazon_domain": "amazon.com",
"search_term": query,
"page": page,
}
resp = requests.get("https://api.rainforestapi.com/request",
params=params, timeout=30)
resp.raise_for_status()
return resp.json().get("search_results", [])
These services cost $1-5 per 1,000 requests. Worth it if you need search result data or product scraping at scale without maintaining your own proxy infrastructure.
10. Anti-Detection: Headers, TLS, and Fingerprints {#anti-detection}
from curl_cffi import requests as cffi_requests
import random
# Rotate between multiple browser profiles
BROWSER_PROFILES = [
{
"impersonate": "chrome131",
"ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
},
{
"impersonate": "chrome130",
"ua": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
},
{
"impersonate": "safari17_0",
"ua": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/17.0 Safari/605.1.15",
},
]
def get_amazon_session(proxy: str = None) -> cffi_requests.Session:
"""Create a curl-cffi session mimicking a real browser."""
profile = random.choice(BROWSER_PROFILES)
session = cffi_requests.Session()
session.impersonate = profile["impersonate"]
# These headers must be present and realistic
session.headers = {
"User-Agent": profile["ua"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,"
"image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"DNT": "1",
}
if proxy:
session.proxies = {"https": proxy}
return session
def warm_up_session(session: cffi_requests.Session):
"""Visit Amazon homepage before scraping product pages."""
session.get("https://www.amazon.com/", timeout=15)
time.sleep(random.uniform(2, 5))
# Optional: visit a category page too
session.get("https://www.amazon.com/gp/bestsellers/", timeout=15)
time.sleep(random.uniform(1, 3))
11. Proxy Strategy for Amazon {#proxies}
Amazon maintains blocklists of virtually every major datacenter IP range. Residential proxies are non-negotiable for any direct HTML scraping.
ThorData provides rotating residential proxy pools with US targeting. Their IPs are genuine residential addresses, which is the minimum bar for Amazon not to instantly block you.
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000
def get_amazon_proxy(state: str = None) -> str:
"""Get US residential proxy, optionally targeting a specific state."""
if state:
# State-level targeting helps match Amazon's geo-pricing
user = f"{THORDATA_USER}-country-us-state-{state.lower()}"
else:
user = f"{THORDATA_USER}-country-us"
return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"
def test_proxy_for_amazon(proxy: str) -> bool:
"""Verify proxy works for Amazon and isn't in a blocked range."""
session = get_amazon_session(proxy)
try:
resp = session.get("https://www.amazon.com/dp/B0DCXZJQ8V/", timeout=10)
return resp.status_code == 200 and "Robot Check" not in resp.text
except Exception:
return False
12. Rate Limiting and Request Scheduling {#rate-limits}
import time
import random
import sqlite3
from collections import deque
class AmazonRateLimiter:
"""Token bucket rate limiter for Amazon scraping."""
def __init__(self, requests_per_minute: int = 10,
burst_size: int = 3):
self.interval = 60.0 / requests_per_minute
self.burst_size = burst_size
self.tokens = deque()
def wait(self):
now = time.time()
# Remove tokens older than the rate window
while self.tokens and now - self.tokens[0] > 60:
self.tokens.popleft()
if len(self.tokens) >= self.burst_size:
# Wait until oldest token expires
sleep_time = 60 - (now - self.tokens[0]) + random.uniform(1, 3)
time.sleep(max(sleep_time, self.interval))
self.tokens.append(time.time())
# Always add a random delay even within rate limits
time.sleep(random.uniform(0.5, 2.0))
rate_limiter = AmazonRateLimiter(requests_per_minute=8, burst_size=2)
def scrape_with_scheduling(asins: list[str],
output_db: str = "amazon_products.db"):
"""Scrape ASINs with rate limiting and progress tracking."""
import asyncio
conn = init_amazon_db(output_db)
# Check which ASINs are already scraped
existing = set(row[0] for row in conn.execute("SELECT asin FROM products"))
pending = [a for a in asins if a not in existing]
print(f"{len(pending)} ASINs to scrape ({len(existing)} already done)")
for i, asin in enumerate(pending):
rate_limiter.wait()
try:
product = scrape_product_json_ld(asin)
if not product.get("error"):
save_product(conn, product)
print(f"[{i+1}/{len(pending)}] OK: {asin} - {product.get('name', '')[:50]}")
else:
print(f"[{i+1}/{len(pending)}] ERR: {asin} - {product['error']}")
except Exception as e:
print(f"[{i+1}/{len(pending)}] FAIL: {asin} - {e}")
13. Storing Amazon Data: Schema Design {#storage}
import sqlite3
import json
import time
def init_amazon_db(db_path: str = "amazon.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS products (
asin TEXT PRIMARY KEY,
name TEXT,
brand TEXT,
description TEXT,
price REAL,
currency TEXT,
availability TEXT,
rating REAL,
review_count INTEGER,
category_id TEXT,
image_url TEXT,
url TEXT,
source TEXT,
scraped_at REAL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS price_history (
asin TEXT,
price REAL,
currency TEXT,
recorded_at REAL,
source TEXT,
PRIMARY KEY (asin, recorded_at)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS reviews (
asin TEXT,
review_title TEXT,
body TEXT,
rating REAL,
author TEXT,
review_date TEXT,
verified_purchase INTEGER,
helpful_votes TEXT,
page_num INTEGER,
scraped_at REAL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_price_history_asin ON price_history(asin)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_reviews_asin ON reviews(asin)")
conn.commit()
return conn
def save_product(conn: sqlite3.Connection, product: dict):
now = time.time()
conn.execute("""
INSERT OR REPLACE INTO products VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
product.get("asin"), product.get("name"), product.get("brand"),
product.get("description"), product.get("price"), product.get("currency"),
product.get("availability"), product.get("rating"),
product.get("review_count"), product.get("category_id"),
product.get("image"), product.get("url"), product.get("source"), now
))
# Also record in price history
if product.get("price"):
conn.execute("""
INSERT OR IGNORE INTO price_history VALUES (?,?,?,?,?)
""", (product["asin"], product["price"],
product.get("currency", "USD"), now, product.get("source", "scrape")))
conn.commit()
14. Price Monitoring Pipeline {#price-monitoring}
import sqlite3
import time
from datetime import datetime
def build_price_monitor(asins: list[str], db_path: str = "price_monitor.db",
check_interval_hours: int = 6):
"""Set up a simple price monitoring pipeline."""
conn = init_amazon_db(db_path)
def run_check():
print(f"\n[{datetime.now():%Y-%m-%d %H:%M}] Price check starting...")
for asin in asins:
time.sleep(random.uniform(30, 90)) # Space out checks
# Try PA-API first (cheaper), fall back to scraping
products = get_products_by_asin([asin])
if products:
product = products[0]
product["asin"] = asin
save_product(conn, product)
print(f" {asin}: ${product.get('price', 'N/A')}")
else:
print(f" {asin}: PA-API miss, skipping (use scraping for fallback)")
check_price_alerts(conn, asins)
def check_price_alerts(conn, asins: list[str]):
"""Check if any prices have dropped significantly."""
for asin in asins:
rows = conn.execute("""
SELECT price, recorded_at FROM price_history
WHERE asin = ?
ORDER BY recorded_at DESC
LIMIT 10
""", (asin,)).fetchall()
if len(rows) < 2:
continue
current = rows[0][0]
historical_avg = sum(r[0] for r in rows[1:]) / len(rows[1:])
if current and historical_avg and current < historical_avg * 0.85:
print(f" PRICE DROP ALERT: {asin} is ${current:.2f} "
f"(avg was ${historical_avg:.2f}, "
f"{(1 - current/historical_avg)*100:.0f}% drop)")
return run_check
# Build and run
monitor = build_price_monitor(
asins=["B0DCXZJQ8V", "B0BN93M8SP"],
check_interval_hours=6
)
monitor() # Run one check
15. Common Errors and Fixes {#errors}
| Error | Cause | Fix |
|---|---|---|
| 503 "Robot Check" page | Bot detection triggered | Rotate residential IP, increase delays, check TLS fingerprint |
| CAPTCHA (image challenge) | Suspicious request pattern | Switch to residential proxy, reduce rate |
Redirect to /ap/signin |
Session flagged as bot | Clear cookies, new context, new IP |
| Empty price field | Out of stock OR geo-mismatch | Check with US residential IP, verify ASIN active |
| 404 on product URL | ASIN deleted or discontinued | Remove from tracking |
| Prices shown in wrong currency | Non-US proxy IP | Use US-targeted proxy specifically |
curl-cffi import error |
Not installed | pip install curl-cffi |
PA-API 429 |
Rate limit exceeded (1 req/sec) | Add 1.1s sleep between requests |
| Keepa "TokensLeft: 0" | Quota exhausted | Wait for hourly refresh or upgrade plan |
16. Which Approach Should You Use? {#summary}
| Need | Best Approach | Cost |
|---|---|---|
| Product details for known ASINs | PA-API | Free with Associates account |
| Price history and trend data | Keepa API | ~$15-50/month |
| Review text and Q&A | Playwright + residential proxies | Proxy costs |
| Search results at scale | Rainforest/Oxylabs API or Playwright | $1-5/1K requests |
| Large-scale category scraping | Playwright + ThorData proxies | Variable |
| Price tracking pipeline | PA-API primary + Keepa for history | Combined above |
The days of scraping Amazon with requests and free proxies are long gone. In 2026, you either use official APIs, pay for quality proxy infrastructure, or accept that your scraper will break every few days.
The practical path for most projects: start with PA-API for basic product data, add Keepa for price history, and use ThorData residential proxies with Playwright only when you need data those APIs don't cover -- full review text, search results, Q&A sections, and competitor analysis at category scale. Pick the approach that matches your data needs and budget, and always respect Amazon's Terms of Service for your specific use case.