← Back to blog

Scraping Best Buy Product Data with Python (2026)

Scraping Best Buy Product Data with Python (2026)

Best Buy is the largest electronics retailer in the US, which makes it a primary source for product pricing, specs, and customer reviews on laptops, TVs, headphones, and everything else with a circuit board. They actually have a public API — but it's limited. For the full picture, you need to supplement it with web scraping.

Here's how to get comprehensive electronics data from Best Buy using both approaches, with production-ready code for price monitoring, spec extraction, and review analysis.

Why Scrape Best Buy?

Best Buy's product catalog is one of the most comprehensive sources for electronics data in North America:

The Best Buy Products API

Best Buy offers a free Products API at developer.bestbuy.com. You get an API key instantly — no approval process, 50,000 requests per day. That's generous enough for most use cases.

import httpx
import sqlite3
import json
import time
import random
import re
from typing import Optional, Dict, List, Any
from datetime import datetime, timedelta

class BestBuyAPI:
    """Client for the Best Buy Products API."""

    BASE_URL = "https://api.bestbuy.com/v1"

    def __init__(self, api_key: str, rate_limit_rps: float = 5.0):
        self.api_key = api_key
        self.min_delay = 1.0 / rate_limit_rps
        self.client = httpx.Client(
            base_url=self.BASE_URL,
            timeout=20,
        )
        self._last_request = 0.0

    def _rate_limit(self):
        """Enforce rate limiting between requests."""
        elapsed = time.time() - self._last_request
        if elapsed < self.min_delay:
            time.sleep(self.min_delay - elapsed + random.uniform(0, 0.1))
        self._last_request = time.time()

    def _request(self, path: str, params: Dict) -> Optional[Dict]:
        """Make an API request with error handling and retry."""
        self._rate_limit()
        params["apiKey"] = self.api_key
        params["format"] = "json"

        for attempt in range(4):
            try:
                r = self.client.get(path, params=params)

                if r.status_code == 200:
                    return r.json()
                elif r.status_code == 429:
                    wait = 30 * (attempt + 1)
                    print(f"[429] Rate limited, waiting {wait}s")
                    time.sleep(wait)
                elif r.status_code == 403:
                    print(f"[403] API key issue or daily limit reached")
                    return None
                elif r.status_code == 404:
                    return None
                else:
                    print(f"[ERROR] HTTP {r.status_code}")
                    if attempt < 3:
                        time.sleep(5 * (attempt + 1))

            except httpx.TimeoutException:
                print(f"[TIMEOUT] Attempt {attempt + 1}")
                time.sleep(5 * (attempt + 1))
            except httpx.HTTPError as e:
                print(f"[ERROR] {e}")
                return None

        return None

    def search_products(
        self,
        query: str,
        page: int = 1,
        page_size: int = 50,
        sort: str = "relevance",
        category_id: Optional[str] = None,
    ) -> Dict:
        """Search products with optional category filtering."""
        filter_expr = f"(search={query})"
        if category_id:
            filter_expr = f"(categoryPath.id={category_id}&search={query})"

        params = {
            "page": page,
            "pageSize": min(page_size, 100),
            "sort": f"{sort}.asc",
            "show": (
                "sku,name,salePrice,regularPrice,onSale,"
                "categoryPath.name,manufacturer,modelNumber,"
                "shortDescription,customerReviewAverage,"
                "customerReviewCount,url,image,upc,"
                "freeShipping,inStoreAvailability,onlineAvailability,"
                "startDate,new,active"
            ),
        }

        return self._request(f"/products{filter_expr}", params) or {}

    def get_product_by_sku(self, sku: int) -> Optional[Dict]:
        """Get full product details by SKU."""
        params = {
            "show": (
                "sku,name,salePrice,regularPrice,onSale,"
                "categoryPath.name,manufacturer,modelNumber,"
                "longDescription,shortDescription,"
                "customerReviewAverage,customerReviewCount,"
                "details.name,details.value,features.feature,"
                "url,image,upc,weight,shippingWeight,"
                "depth,height,width,color,"
                "freeShipping,inStoreAvailability,onlineAvailability,"
                "startDate,new,active,type"
            ),
        }
        return self._request(f"/products/{sku}.json", params)

    def search_paginated(
        self,
        query: str,
        max_pages: int = 10,
        page_size: int = 100,
    ) -> List[Dict]:
        """Paginate through all search results."""
        all_products = []

        for page in range(1, max_pages + 1):
            data = self.search_products(query, page=page, page_size=page_size)

            if not data:
                break

            products = data.get("products", [])
            if not products:
                break

            all_products.extend(products)
            total_pages = data.get("totalPages", 1)

            print(f"  Page {page}/{min(max_pages, total_pages)}: {len(products)} products")

            if page >= total_pages:
                break

        return all_products

Getting Detailed Specifications

The API returns specifications in the details field — key-value pairs with processor type, RAM, screen size, battery life, and everything else on the spec sheet.

def get_product_specs(api: BestBuyAPI, sku: int) -> Optional[Dict]:
    """Get complete product specifications."""
    product = api.get_product_by_sku(sku)
    if not product:
        return None

    # Build clean specs dict from details array
    specs = {}
    for detail in product.get("details", []):
        if detail.get("name") and detail.get("value"):
            specs[detail["name"]] = detail["value"]

    # Features as a list
    features = [f["feature"] for f in product.get("features", []) if f.get("feature")]

    # Category path as list
    category_path = [c.get("name", "") for c in product.get("categoryPath", [])]

    return {
        "sku": product["sku"],
        "name": product["name"],
        "manufacturer": product.get("manufacturer"),
        "model": product.get("modelNumber"),
        "upc": product.get("upc"),
        "type": product.get("type"),
        "price": {
            "sale": product.get("salePrice"),
            "regular": product.get("regularPrice"),
            "on_sale": product.get("onSale", False),
            "discount_pct": _calc_discount(
                product.get("salePrice"), product.get("regularPrice")
            ),
        },
        "availability": {
            "in_store": product.get("inStoreAvailability", False),
            "online": product.get("onlineAvailability", False),
        },
        "shipping": {
            "free": product.get("freeShipping", False),
            "weight_lbs": product.get("shippingWeight"),
        },
        "dimensions": {
            "weight": product.get("weight"),
            "depth": product.get("depth"),
            "height": product.get("height"),
            "width": product.get("width"),
        },
        "ratings": {
            "average": product.get("customerReviewAverage"),
            "count": product.get("customerReviewCount"),
        },
        "category_path": category_path,
        "description_short": product.get("shortDescription", "")[:500],
        "description_long": product.get("longDescription", "")[:1000],
        "specs": specs,
        "features": features[:20],
        "url": product.get("url"),
        "image": product.get("image"),
        "scraped_at": datetime.utcnow().isoformat(),
    }


def _calc_discount(sale: Optional[float], regular: Optional[float]) -> Optional[float]:
    """Calculate discount percentage."""
    if sale and regular and regular > 0 and sale < regular:
        return round((1 - sale / regular) * 100, 1)
    return None

Scraping Customer Reviews

The API gives you average rating and count, but not individual review text. For review content, you need to scrape the website. Reviews are loaded via a separate endpoint.

def scrape_reviews(
    sku: int,
    max_pages: int = 10,
    proxy: Optional[str] = None,
) -> List[Dict]:
    """Scrape individual reviews for a product."""
    from selectolax.parser import HTMLParser

    reviews = []
    user_agents = [
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
    ]

    client_kwargs = {"timeout": 25, "follow_redirects": True}
    if proxy:
        client_kwargs["proxies"] = {"http://": proxy, "https://": proxy}

    for page in range(1, max_pages + 1):
        headers = {
            "User-Agent": random.choice(user_agents),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.5",
            "Accept-Encoding": "gzip, deflate, br",
            "Referer": f"https://www.bestbuy.com/site/{sku}.p",
            "DNT": "1",
        }

        url = f"https://www.bestbuy.com/site/reviews/{sku}?page={page}&pageSize=20"

        try:
            with httpx.Client(**client_kwargs) as client:
                r = client.get(url, headers=headers)
        except httpx.HTTPError as e:
            print(f"[ERROR] Review page {page}: {e}")
            break

        if r.status_code != 200:
            print(f"[ERROR] HTTP {r.status_code} on review page {page}")
            break

        tree = HTMLParser(r.text)
        review_nodes = tree.css("[data-testid='customer-review']")

        if not review_nodes:
            # Try alternate selectors
            review_nodes = tree.css(".review-item") or tree.css("[class*='ReviewItem']")

        if not review_nodes:
            print(f"  No reviews found on page {page}")
            break

        for node in review_nodes:
            title_el = node.css_first(".review-title, [class*='ReviewTitle']")
            body_el = node.css_first(".review-body, [class*='ReviewBody']")
            rating_el = node.css_first("[aria-label*='Rating'], [data-testid='review-rating']")
            date_el = node.css_first(".review-date, [class*='ReviewDate'], time")
            author_el = node.css_first(".review-author, [class*='ReviewAuthor']")
            verified_el = node.css_first("[data-testid='verified-purchaser'], [class*='VerifiedPurchaser']")

            rating = None
            if rating_el:
                aria = rating_el.attributes.get("aria-label", "") or ""
                for word in aria.split():
                    try:
                        rating = int(word)
                        break
                    except ValueError:
                        continue

            reviews.append({
                "page": page,
                "title": title_el.text(strip=True) if title_el else "",
                "body": body_el.text(strip=True) if body_el else "",
                "rating": rating,
                "date": date_el.text(strip=True) if date_el else "",
                "author": author_el.text(strip=True) if author_el else "",
                "verified_purchase": verified_el is not None,
                "sku": sku,
            })

        print(f"  Review page {page}: {len(review_nodes)} reviews")
        time.sleep(random.uniform(2.0, 4.0))

    return reviews

Price Monitoring Over Time

Best Buy runs frequent sales. Tracking prices systematically lets you identify sale patterns and build price drop alerts.

class PriceMonitor:
    """Track Best Buy product prices over time."""

    def __init__(self, db_path: str, api: BestBuyAPI):
        self.api = api
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("PRAGMA journal_mode=WAL")
        self._init_tables()

    def _init_tables(self):
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                sku INTEGER,
                name TEXT,
                sale_price REAL,
                regular_price REAL,
                on_sale INTEGER,
                discount_pct REAL,
                online_available INTEGER,
                store_available INTEGER,
                checked_at TEXT
            );

            CREATE TABLE IF NOT EXISTS price_alerts (
                sku INTEGER PRIMARY KEY,
                name TEXT,
                target_price REAL,
                current_price REAL,
                alert_sent INTEGER DEFAULT 0,
                added_at TEXT
            );

            CREATE INDEX IF NOT EXISTS idx_price_sku ON price_history(sku, checked_at DESC);
        """)
        self.conn.commit()

    def check_prices(self, skus: List[int]) -> List[Dict]:
        """Check current prices for a list of SKUs."""
        results = []
        now = datetime.utcnow().isoformat()

        for sku in skus:
            product = self.api.get_product_by_sku(sku)
            if not product:
                print(f"  [WARN] SKU {sku} not found")
                continue

            sale_price = product.get("salePrice")
            regular_price = product.get("regularPrice")
            discount = _calc_discount(sale_price, regular_price)

            self.conn.execute(
                """INSERT INTO price_history
                   (sku, name, sale_price, regular_price, on_sale, discount_pct,
                    online_available, store_available, checked_at)
                   VALUES (?,?,?,?,?,?,?,?,?)""",
                (
                    sku,
                    product.get("name", "")[:200],
                    sale_price,
                    regular_price,
                    int(product.get("onSale", False)),
                    discount,
                    int(product.get("onlineAvailability", False)),
                    int(product.get("inStoreAvailability", False)),
                    now,
                )
            )
            results.append({
                "sku": sku,
                "name": product.get("name"),
                "price": sale_price,
                "discount": discount,
            })

        self.conn.commit()
        return results

    def get_price_history(self, sku: int, days: int = 90) -> List[Dict]:
        """Get price history for a SKU over N days."""
        cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
        rows = self.conn.execute(
            """SELECT sale_price, regular_price, on_sale, discount_pct, checked_at
               FROM price_history
               WHERE sku = ? AND checked_at >= ?
               ORDER BY checked_at ASC""",
            (sku, cutoff),
        ).fetchall()

        return [
            {
                "price": r[0],
                "regular": r[1],
                "on_sale": bool(r[2]),
                "discount_pct": r[3],
                "checked_at": r[4],
            }
            for r in rows
        ]

    def get_best_deals(self, min_discount_pct: float = 15.0) -> List[Dict]:
        """Get currently best-discounted products."""
        # Get latest price for each SKU
        rows = self.conn.execute(
            """SELECT ph.sku, ph.name, ph.sale_price, ph.regular_price, ph.discount_pct
               FROM price_history ph
               INNER JOIN (
                   SELECT sku, MAX(checked_at) as max_checked
                   FROM price_history
                   GROUP BY sku
               ) latest ON ph.sku = latest.sku AND ph.checked_at = latest.max_checked
               WHERE ph.on_sale = 1
                 AND ph.discount_pct >= ?
               ORDER BY ph.discount_pct DESC""",
            (min_discount_pct,),
        ).fetchall()

        return [
            {
                "sku": r[0],
                "name": r[1],
                "sale_price": r[2],
                "regular_price": r[3],
                "discount_pct": r[4],
            }
            for r in rows
        ]

    def add_alert(self, sku: int, target_price: float):
        """Add a price alert for a SKU."""
        product = self.api.get_product_by_sku(sku)
        name = product.get("name", f"SKU {sku}") if product else f"SKU {sku}"

        self.conn.execute(
            """INSERT OR REPLACE INTO price_alerts (sku, name, target_price, current_price, added_at)
               VALUES (?,?,?,?,?)""",
            (sku, name[:200], target_price, product.get("salePrice") if product else None, datetime.utcnow().isoformat())
        )
        self.conn.commit()
        print(f"Alert added: {name} — alert at ${target_price}")

Handling Anti-Bot Protection

The API is clean — no bot detection. But web scraping for reviews and detailed product pages triggers Akamai Bot Manager.

TLS Fingerprinting

Standard Python httpx requests have a recognizable TLS fingerprint. Libraries like curl_cffi impersonate Chrome's TLS behavior:

from curl_cffi import requests as cffi_requests

def scrape_with_cffi(url: str, proxy: Optional[str] = None) -> Optional[str]:
    """Fetch URL using Chrome TLS impersonation."""
    proxies = None
    if proxy:
        proxies = {"http": proxy, "https": proxy}

    try:
        r = cffi_requests.get(
            url,
            impersonate="chrome126",
            proxies=proxies,
            timeout=25,
        )
        if r.status_code == 200:
            return r.text
        print(f"[ERROR] HTTP {r.status_code}")
        return None
    except Exception as e:
        print(f"[ERROR] {e}")
        return None

ThorData Proxy Integration

For review scraping at scale, you need residential proxies. Datacenter IPs get flagged by Akamai within a handful of requests. ThorData's residential proxy network provides proxies with clean Akamai reputation scores — important because Akamai maintains a separate IP blocklist from other bot detection systems.

class ThorDataProxyPool:
    """ThorData residential proxy pool."""

    def __init__(self, username: str, password: str):
        self.username = username
        self.password = password
        self.host = "gate.thordata.com"
        self.port = 9000

    def get_proxy(self, country: str = "US", session_id: Optional[str] = None) -> str:
        user = f"{self.username}-country-{country}"
        if session_id:
            user = f"{user}-session-{session_id}"
        return f"http://{user}:{self.password}@{self.host}:{self.port}"

    def get_rotating(self) -> str:
        """Fresh US residential IP for each request."""
        return self.get_proxy("US")

    def get_sticky(self, session_id: str) -> str:
        """Sticky session — same IP for a browsing session."""
        return self.get_proxy("US", session_id=session_id)


def scrape_reviews_with_proxy(
    sku: int,
    proxy_pool: ThorDataProxyPool,
    max_pages: int = 10,
) -> List[Dict]:
    """Scrape reviews using rotating ThorData residential proxies."""
    all_reviews = []

    for page in range(1, max_pages + 1):
        # Rotate proxy every 3 pages to avoid session-based tracking
        proxy = proxy_pool.get_rotating() if page % 3 == 1 else proxy_pool.get_sticky(f"sku{sku}")
        url = f"https://www.bestbuy.com/site/reviews/{sku}?page={page}&pageSize=20"

        html = scrape_with_cffi(url, proxy=proxy)
        if not html:
            break

        # Parse reviews from HTML
        from selectolax.parser import HTMLParser
        tree = HTMLParser(html)
        nodes = tree.css("[data-testid='customer-review']") or tree.css(".review-item")

        if not nodes:
            break

        for node in nodes:
            title = node.css_first(".review-title")
            body = node.css_first(".review-body")
            rating_el = node.css_first("[aria-label*='Rating']")

            rating = None
            if rating_el:
                for word in (rating_el.attributes.get("aria-label") or "").split():
                    try:
                        rating = int(word)
                        break
                    except ValueError:
                        continue

            all_reviews.append({
                "sku": sku,
                "page": page,
                "title": title.text(strip=True) if title else "",
                "body": body.text(strip=True) if body else "",
                "rating": rating,
            })

        print(f"  Page {page}: {len(nodes)} reviews")
        time.sleep(random.uniform(2.5, 5.0))

    return all_reviews

Full Data Pipeline

Combining API data and scraped reviews into a complete product intelligence pipeline:

def build_product_database(
    search_queries: List[str],
    db_path: str = "bestbuy.db",
    api_key: str = "",
    proxy_pool: Optional[ThorDataProxyPool] = None,
    scrape_reviews: bool = True,
    max_products_per_query: int = 200,
) -> Dict:
    """Complete pipeline: search → specs → reviews → database."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS products (
            sku INTEGER PRIMARY KEY,
            name TEXT,
            manufacturer TEXT,
            model TEXT,
            upc TEXT,
            sale_price REAL,
            regular_price REAL,
            on_sale INTEGER,
            discount_pct REAL,
            rating_avg REAL,
            rating_count INTEGER,
            category TEXT,
            specs TEXT,
            features TEXT,
            url TEXT,
            image TEXT,
            scraped_at TEXT
        );
        CREATE TABLE IF NOT EXISTS reviews (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            sku INTEGER,
            title TEXT,
            body TEXT,
            rating INTEGER,
            date TEXT,
            author TEXT,
            verified INTEGER,
            FOREIGN KEY (sku) REFERENCES products(sku)
        );
        CREATE INDEX IF NOT EXISTS idx_reviews_sku ON reviews(sku);
        CREATE INDEX IF NOT EXISTS idx_products_price ON products(sale_price);
    """)
    conn.commit()

    api = BestBuyAPI(api_key)
    stats = {"products_found": 0, "products_saved": 0, "reviews_saved": 0, "errors": 0}

    for query in search_queries:
        print(f"\n[QUERY] {query}")
        products = api.search_paginated(query, max_pages=max_products_per_query // 100 + 1)
        stats["products_found"] += len(products)

        for product in products[:max_products_per_query]:
            sku = product.get("sku")
            if not sku:
                continue

            # Check if already scraped
            existing = conn.execute("SELECT sku FROM products WHERE sku = ?", (sku,)).fetchone()
            if existing:
                continue

            # Get full specs
            specs = get_product_specs(api, sku)
            if not specs:
                stats["errors"] += 1
                continue

            conn.execute(
                """INSERT OR REPLACE INTO products
                   (sku, name, manufacturer, model, upc, sale_price, regular_price, on_sale,
                    discount_pct, rating_avg, rating_count, category, specs, features, url, image, scraped_at)
                   VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
                (
                    specs["sku"],
                    specs["name"][:300],
                    specs.get("manufacturer"),
                    specs.get("model"),
                    specs.get("upc"),
                    specs["price"]["sale"],
                    specs["price"]["regular"],
                    int(specs["price"]["on_sale"]),
                    specs["price"]["discount_pct"],
                    specs["ratings"]["average"],
                    specs["ratings"]["count"],
                    " > ".join(specs["category_path"]),
                    json.dumps(specs.get("specs", {})),
                    json.dumps(specs.get("features", [])),
                    specs.get("url"),
                    specs.get("image"),
                    specs["scraped_at"],
                )
            )
            conn.commit()
            stats["products_saved"] += 1

            # Scrape reviews if requested
            if scrape_reviews and specs["ratings"]["count"] and specs["ratings"]["count"] > 0:
                proxy = proxy_pool.get_rotating() if proxy_pool else None
                reviews = scrape_reviews(sku, max_pages=3, proxy=proxy)

                for rev in reviews:
                    conn.execute(
                        "INSERT INTO reviews (sku, title, body, rating, date, author, verified) VALUES (?,?,?,?,?,?,?)",
                        (sku, rev.get("title"), rev.get("body"), rev.get("rating"),
                         rev.get("date"), rev.get("author"), int(rev.get("verified_purchase", False)))
                    )
                conn.commit()
                stats["reviews_saved"] += len(reviews)
                print(f"  SKU {sku}: {len(reviews)} reviews")

    conn.close()
    print(f"\nPipeline complete: {stats}")
    return stats


# Real-world usage example
if __name__ == "__main__":
    QUERIES = [
        "rtx 4070 laptop gaming",
        "4k oled tv 55 inch",
        "true wireless earbuds noise cancelling",
        "mechanical keyboard",
    ]

    # pool = ThorDataProxyPool("YOUR_USER", "YOUR_PASS")
    build_product_database(
        QUERIES,
        api_key="YOUR_BESTBUY_API_KEY",
        # proxy_pool=pool,
        scrape_reviews=True,
    )

Tips for Production Use

Use the API first. It covers 80% of use cases without any bot detection headaches. The 50K daily limit is generous — even a comprehensive product database project rarely hits it. Save web scraping for the gaps the API doesn't fill.

Cache aggressively. Product specs don't change often. Cache API responses for 24 hours and only re-check prices and availability frequently. Historical price data is permanent — never re-fetch it.

SKU is king. Best Buy SKUs are stable identifiers that persist across product page redesigns. Use them as primary keys in your database, never URLs or product names.

Watch for open-box pricing. Best Buy has open-box deals that appear on product pages but aren't in the standard API response. That data requires scraping the product page HTML.

Rate limiting is generous on the API. The 50K/day limit resets at midnight Pacific. If you need more, rotate between multiple API keys (each requires a separate developer account but there's no cost).

Review scraping needs residential proxies. Akamai's detection becomes aggressive after 30+ requests from a datacenter IP. ThorData's residential proxy network handles the IP reputation aspect — make sure to also use curl_cffi for proper TLS fingerprint impersonation.

Best Buy data is valuable for electronics price comparison engines, product recommendation systems, review sentiment analysis, competitive intelligence for electronics manufacturers, and consumer research applications. The combination of their official API and targeted web scraping gives you one of the most comprehensive electronics product datasets available.