← Back to blog

How to Scrape Walmart Product Data with Python in 2026

How to Scrape Walmart Product Data with Python in 2026

Walmart is the largest retailer in the world by revenue, and walmart.com is the second-largest US e-commerce platform after Amazon. For price intelligence, competitor monitoring, product research, and market analysis, Walmart's product catalog is an essential data source. Their 170 million+ active customers and hundreds of millions of SKUs make it one of the richest retail datasets available on the public web.

Unlike Amazon's hybrid server-rendered pages, Walmart relies heavily on a GraphQL API that powers its frontend. This is both good and bad news for scrapers: the API returns clean structured data, but it's protected by sophisticated bot detection (PerimeterX/HUMAN Security). This guide covers the full technical stack for extracting Walmart product data in 2026 — from initial page access through to production-grade price monitoring pipelines.

Understanding Walmart's Architecture

Before diving into code, it helps to understand what you're working with. Walmart.com's architecture circa 2026:

The page loads with some server-rendered content (for SEO), then hydrates via GraphQL calls. Product data is available both in the embedded JSON state within the HTML and via the API.

Method 1: Scraping the Embedded JSON State

The fastest and most reliable approach is extracting data from the __NEXT_DATA__ script tag that Walmart embeds in their server-rendered HTML. This doesn't require running JavaScript or making additional API calls:

import httpx
import json
import re
from typing import Optional

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
                  "(KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
    "Referer": "https://www.walmart.com/",
    "sec-fetch-dest": "document",
    "sec-fetch-mode": "navigate",
    "sec-fetch-site": "same-origin",
    "sec-fetch-user": "?1",
    "upgrade-insecure-requests": "1",
}

def extract_product_id(product_url: str) -> Optional[str]:
    """Extract product ID from a Walmart URL."""
    # Format: walmart.com/ip/Product-Name/123456789
    # Also: walmart.com/ip/123456789
    match = re.search(r"/ip/(?:[^/]+/)?(\d+)", product_url)
    return match.group(1) if match else None


def scrape_product_page(product_url: str) -> Optional[dict]:
    """
    Scrape a Walmart product page by extracting the embedded JSON state.
    Most reliable method — doesn't depend on API schema.
    """
    client = httpx.Client(timeout=30, headers=HEADERS, follow_redirects=True)

    try:
        resp = client.get(product_url)
        if resp.status_code == 403:
            return {"error": "bot_detection", "status": 403}
        if resp.status_code == 404:
            return {"error": "not_found", "status": 404}
        resp.raise_for_status()
    except httpx.TimeoutException:
        return {"error": "timeout"}

    html = resp.text

    # Extract __NEXT_DATA__ (Next.js page props embedded in HTML)
    match = re.search(r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', html, re.DOTALL)
    if not match:
        # Fallback: look for window.__WML_REDUX_INITIAL_STATE__
        match = re.search(r'window\.__WML_REDUX_INITIAL_STATE__\s*=\s*({.*?});', html, re.DOTALL)

    if not match:
        return {"error": "no_embedded_json", "html_length": len(html)}

    try:
        page_data = json.loads(match.group(1))
    except json.JSONDecodeError as e:
        return {"error": f"json_parse_error: {e}"}

    # Navigate the Next.js data structure to find product info
    # Path varies slightly by page type
    product = None
    paths_to_try = [
        ["props", "pageProps", "initialData", "data", "product"],
        ["props", "pageProps", "product"],
        ["props", "initialState", "product", "products"],
    ]

    for path in paths_to_try:
        node = page_data
        for key in path:
            node = node.get(key, {}) if isinstance(node, dict) else {}
        if node:
            product = node
            break

    if not product:
        return {"error": "product_not_found_in_json", "keys": list(page_data.get("props", {}).keys())}

    return normalize_walmart_product(product, product_url)


def normalize_walmart_product(raw: dict, url: str) -> dict:
    """Normalize a raw Walmart product dict into a clean structure."""
    # Price info
    price_info = raw.get("priceInfo") or {}
    current_price = price_info.get("currentPrice") or {}
    was_price = price_info.get("wasPrice") or {}
    unit_price = price_info.get("unitPrice") or {}

    # Availability
    availability = raw.get("availabilityStatus", "").upper()

    # Fulfillment options (shipping/pickup/delivery)
    fulfillment = raw.get("fulfillmentType") or []

    # Images
    images = []
    image_info = raw.get("imageInfo") or {}
    for img in (image_info.get("allImages") or []):
        if img.get("url"):
            images.append(img["url"])

    # Variants
    variants = []
    variant_criteria = raw.get("variantCriteria") or []
    for criterion in variant_criteria:
        variants.append({
            "type": criterion.get("name", ""),
            "options": [v.get("name", "") for v in (criterion.get("values") or [])],
        })

    # Seller info
    seller_info = raw.get("sellerInfo") or {}

    return {
        "item_id": raw.get("usItemId") or raw.get("itemId", ""),
        "name": raw.get("name", ""),
        "brand": raw.get("brand", ""),
        "model": raw.get("model", ""),
        "url": url,
        "short_description": raw.get("shortDescription", ""),
        "price": current_price.get("price"),
        "price_string": current_price.get("priceString", ""),
        "was_price": was_price.get("price"),
        "was_price_string": was_price.get("priceString", ""),
        "unit_price": unit_price.get("price"),
        "unit_price_unit": unit_price.get("unitOfMeasure", ""),
        "in_stock": availability in ("IN_STOCK", "AVAILABLE"),
        "availability_status": availability,
        "fulfillment_types": fulfillment,
        "rating": raw.get("averageRating", 0),
        "review_count": raw.get("numberOfReviews", 0),
        "seller_id": seller_info.get("sellerId", ""),
        "seller_name": seller_info.get("sellerDisplayName", ""),
        "is_walmart_fulfilled": seller_info.get("type") == "WALMART",
        "images": images[:5],  # First 5 images
        "variants": variants,
        "categories": [
            c.get("name", "") for c in (raw.get("categories") or [])
        ],
        "upc": raw.get("upc", ""),
        "gtin": raw.get("gtin13", ""),
    }

Method 2: Direct GraphQL API Calls

When you need batch processing and want to avoid loading full HTML pages, call Walmart's GraphQL endpoint directly. The schema changes periodically, so check for updates when things break:

import httpx
import json
from typing import Optional

GRAPHQL_ENDPOINT = "https://www.walmart.com/orchestra/graphql"

GRAPHQL_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
                  "(KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
    "Accept": "application/json",
    "Content-Type": "application/json",
    "X-O-PLATFORM": "rweb",
    "X-O-SEGMENT": "oaoh",
    "X-O-GQL-QUERY": "query GetProductDetail",
    "Referer": "https://www.walmart.com/",
    "Origin": "https://www.walmart.com",
}

PRODUCT_QUERY = """
query GetProductDetail($itemId: String!) {
    product(itemId: $itemId) {
        usItemId
        name
        brand
        shortDescription
        model
        upc
        averageRating
        numberOfReviews
        priceInfo {
            currentPrice {
                price
                priceString
                currencyCode
            }
            wasPrice {
                price
                priceString
            }
            unitPrice {
                price
                priceString
                unitOfMeasure
            }
            priceRanges {
                minPrice { price priceString }
                maxPrice { price priceString }
            }
        }
        availabilityStatus
        sellerInfo {
            sellerId
            sellerDisplayName
            type
        }
        fulfillmentType
        imageInfo {
            thumbnailUrl
            allImages { url }
        }
        categories {
            name
            url
        }
        variantCriteria {
            name
            isVariantTypeSwatch
            values {
                name
                id
                isAvailable
            }
        }
    }
}
"""

def get_product_graphql(item_id: str, client: httpx.Client = None) -> Optional[dict]:
    """Fetch a single product via the GraphQL API."""
    if client is None:
        client = httpx.Client(timeout=30, headers=GRAPHQL_HEADERS)

    payload = {
        "query": PRODUCT_QUERY,
        "variables": {"itemId": str(item_id)},
    }

    try:
        resp = client.post(GRAPHQL_ENDPOINT, json=payload)

        if resp.status_code == 403:
            return None  # Bot detection triggered

        resp.raise_for_status()
        data = resp.json()

        if "errors" in data:
            return {"errors": data["errors"]}

        product = data.get("data", {}).get("product")
        if product:
            return normalize_walmart_product(product, f"https://www.walmart.com/ip/{item_id}")

        return None

    except (httpx.TimeoutException, httpx.NetworkError):
        return None


def get_products_batch(item_ids: list[str], delay: float = 2.0) -> list[dict]:
    """Fetch multiple products with rate limiting."""
    import time
    import random

    client = httpx.Client(timeout=30, headers=GRAPHQL_HEADERS)
    results = []

    for i, item_id in enumerate(item_ids):
        product = get_product_graphql(item_id, client)
        if product:
            results.append(product)
            print(f"  [{i+1}/{len(item_ids)}] {product.get('name', item_id)[:50]}")
        else:
            print(f"  [{i+1}/{len(item_ids)}] {item_id} — failed")
            results.append({"item_id": item_id, "error": "not_found"})

        # Randomized delay
        wait = delay + random.uniform(0, delay * 0.5)
        time.sleep(wait)

    client.close()
    return results

Method 3: Playwright with Intercepted GraphQL

The most robust approach for anti-bot-heavy environments — use a real browser and intercept the GraphQL responses as they happen:

import asyncio
import json
from playwright.async_api import async_playwright

async def scrape_product_playwright(product_url: str, proxy: dict = None) -> dict:
    """
    Use a real Chromium browser to load the product page
    and intercept the GraphQL response.
    """
    graphql_data = {}

    async with async_playwright() as p:
        launch_opts = {"headless": True}
        if proxy:
            launch_opts["proxy"] = proxy

        browser = await p.chromium.launch(**launch_opts)
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                       "AppleWebKit/537.36 (KHTML, like Gecko) "
                       "Chrome/127.0.0.0 Safari/537.36",
            viewport={"width": 1920, "height": 1080},
            locale="en-US",
            timezone_id="America/New_York",
        )

        # Inject stealth overrides before any page load
        await context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', {get: () => false});
            Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
            Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]});
            window.chrome = { runtime: {} };
        """)

        page = await context.new_page()

        # Capture GraphQL responses
        async def handle_response(response):
            if "orchestra/graphql" in response.url:
                try:
                    data = await response.json()
                    if data.get("data", {}).get("product"):
                        graphql_data.update(data["data"]["product"])
                except Exception:
                    pass

        page.on("response", handle_response)

        # Navigate to the product page
        await page.goto(product_url, wait_until="networkidle", timeout=30000)
        await page.wait_for_timeout(2000)

        # If no GraphQL data captured, extract from DOM
        if not graphql_data:
            # Try __NEXT_DATA__
            next_data_text = await page.evaluate("""
                () => {
                    const el = document.getElementById('__NEXT_DATA__');
                    return el ? el.textContent : null;
                }
            """)
            if next_data_text:
                try:
                    nd = json.loads(next_data_text)
                    # Navigate to product in the data structure
                    product_node = (
                        nd.get("props", {})
                        .get("pageProps", {})
                        .get("initialData", {})
                        .get("data", {})
                        .get("product", {})
                    )
                    if product_node:
                        graphql_data = product_node
                except json.JSONDecodeError:
                    pass

        # Final DOM fallback
        if not graphql_data:
            graphql_data = await page.evaluate("""
                () => {
                    const getContent = (selector) => {
                        const el = document.querySelector(selector);
                        return el ? el.textContent.trim() : '';
                    };
                    const getAttr = (selector, attr) => {
                        const el = document.querySelector(selector);
                        return el ? el.getAttribute(attr) : '';
                    };
                    return {
                        name: getContent('h1[itemprop="name"], [data-automation-id="product-title"]'),
                        price: getContent('[itemprop="price"], [data-automation-id="product-price"] .f2'),
                        rating: getAttr('[itemprop="ratingValue"]', 'content'),
                        review_count: getAttr('[itemprop="reviewCount"]', 'content'),
                        in_stock: !!document.querySelector('[data-automation-id="add-to-cart-btn"]'),
                    };
                }
            """)

        await browser.close()

    if graphql_data:
        return normalize_walmart_product(graphql_data, product_url)
    return {"error": "no_data_extracted", "url": product_url}

Scraping Search Results

Walmart search results are also GraphQL-driven. Here's how to paginate through them:

async def scrape_walmart_search(
    query: str,
    max_pages: int = 5,
    sort_by: str = "best_match",  # best_match | price_low | price_high | rating
    proxy: dict = None,
) -> list[dict]:
    """
    Scrape Walmart search results pages.
    Returns list of product summaries.
    """
    all_products = []

    async with async_playwright() as p:
        launch_opts = {"headless": True}
        if proxy:
            launch_opts["proxy"] = proxy

        browser = await p.chromium.launch(**launch_opts)
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                       "AppleWebKit/537.36 (KHTML, like Gecko) "
                       "Chrome/127.0.0.0 Safari/537.36",
            viewport={"width": 1920, "height": 1080},
        )
        await context.add_init_script(
            "Object.defineProperty(navigator,'webdriver',{get:()=>false});"
        )
        page = await context.new_page()

        for pg_num in range(1, max_pages + 1):
            sort_param = f"&sort={sort_by}" if sort_by != "best_match" else ""
            url = f"https://www.walmart.com/search?q={query}&page={pg_num}{sort_param}"

            await page.goto(url, wait_until="networkidle", timeout=30000)
            await page.wait_for_timeout(2500)

            # Extract product cards
            products = await page.evaluate("""
                () => {
                    const cards = document.querySelectorAll('[data-item-id]');
                    return Array.from(cards).map(card => {
                        const nameEl = card.querySelector('[data-automation-id="product-title"] span');
                        const priceEl = card.querySelector('[data-automation-id="product-price"] .f2');
                        const ratingEl = card.querySelector('[data-testid="product-ratings"]');
                        const reviewEl = card.querySelector('[data-testid="product-reviews"]');
                        const linkEl = card.querySelector('a[link-identifier="linkText"]');
                        const imgEl = card.querySelector('img[data-testid="productTile-atf-image"]');
                        const badgeEl = card.querySelector('[data-testid="item-badge"]');

                        return {
                            item_id: card.getAttribute('data-item-id'),
                            name: nameEl ? nameEl.textContent.trim() : '',
                            price: priceEl ? priceEl.textContent.trim() : '',
                            rating: ratingEl ? ratingEl.textContent.trim() : '',
                            reviews: reviewEl ? reviewEl.textContent.trim() : '',
                            url: linkEl ? 'https://www.walmart.com' + linkEl.getAttribute('href') : '',
                            thumbnail: imgEl ? imgEl.getAttribute('src') : '',
                            badge: badgeEl ? badgeEl.textContent.trim() : '',
                        };
                    }).filter(p => p.name && p.item_id);
                }
            """)

            all_products.extend(products)
            print(f"  Page {pg_num}: {len(products)} products (total {len(all_products)})")

            # Random delay between pages
            await page.wait_for_timeout(3000 + 2000 * (hash(query + str(pg_num)) % 3))

        await browser.close()

    return all_products

Extracting Product Reviews

Walmart reviews have their own API endpoint. They're paginated with a limit and offset pattern:

import httpx
import time

def get_walmart_reviews(
    item_id: str,
    max_pages: int = 5,
    sort_by: str = "relevancy",  # relevancy | submission-desc | rating-desc | rating-asc
) -> list[dict]:
    """
    Fetch product reviews from Walmart's review API.
    """
    reviews_base = f"https://www.walmart.com/reviews/product/{item_id}"
    client = httpx.Client(
        timeout=30,
        headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 Chrome/127.0.0.0 Safari/537.36",
            "Accept": "application/json",
        }
    )

    all_reviews = []
    limit = 50

    for page in range(1, max_pages + 1):
        offset = (page - 1) * limit
        params = {
            "limit": limit,
            "offset": offset,
            "sort": sort_by,
            "filters": "",
        }

        url = f"https://www.walmart.com/reviews/api/fetch/v3"
        params_with_item = {**params, "itemId": item_id}

        try:
            resp = client.get(url, params=params_with_item)
            if resp.status_code != 200:
                break
            data = resp.json()
        except Exception as e:
            print(f"  Reviews page {page} failed: {e}")
            break

        reviews = data.get("reviews") or []
        for r in reviews:
            all_reviews.append({
                "id": r.get("reviewId", ""),
                "rating": r.get("rating", 0),
                "title": r.get("title", ""),
                "text": r.get("reviewText", ""),
                "author": r.get("authorId", ""),
                "date": r.get("reviewSubmissionTime", ""),
                "verified_purchase": r.get("badgeLabel") == "Verified Purchase",
                "helpful_votes": r.get("positiveFeedback", 0),
                "not_helpful_votes": r.get("negativeFeedback", 0),
            })

        total = data.get("totalResults", 0)
        if offset + limit >= total:
            break

        time.sleep(1.5)

    client.close()
    return all_reviews


def analyze_sentiment(reviews: list[dict]) -> dict:
    """Basic sentiment analysis on review text."""
    if not reviews:
        return {}

    ratings = [r["rating"] for r in reviews]
    avg_rating = sum(ratings) / len(ratings)

    # Rating distribution
    dist = {i: ratings.count(i) for i in range(1, 6)}

    # Verified purchase ratio
    verified = sum(1 for r in reviews if r.get("verified_purchase"))

    # Helpful vote analysis
    helpful_reviews = sorted(reviews, key=lambda r: r.get("helpful_votes", 0), reverse=True)

    return {
        "total_reviews_analyzed": len(reviews),
        "avg_rating": round(avg_rating, 2),
        "rating_distribution": dist,
        "verified_purchase_pct": round(100 * verified / len(reviews), 1),
        "top_helpful_review": helpful_reviews[0]["title"] if helpful_reviews else "",
        "five_star_pct": round(100 * dist.get(5, 0) / len(ratings), 1),
        "one_star_pct": round(100 * dist.get(1, 0) / len(ratings), 1),
    }

Price History Tracking and Database Schema

For competitor monitoring, you need historical data. Here's a complete SQLite schema for tracking Walmart prices over time:

import sqlite3
from datetime import datetime, date

def init_walmart_db(db_path: str = "walmart_tracker.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS products (
            item_id TEXT PRIMARY KEY,
            name TEXT,
            brand TEXT,
            model TEXT,
            url TEXT,
            category TEXT,
            seller_id TEXT,
            seller_name TEXT,
            upc TEXT,
            first_seen DATE,
            last_seen DATE
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS price_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            item_id TEXT NOT NULL,
            price REAL,
            was_price REAL,
            in_stock INTEGER,
            rating REAL,
            review_count INTEGER,
            snapshot_date DATE,
            snapshot_ts TEXT,
            FOREIGN KEY (item_id) REFERENCES products(item_id)
        )
    """)

    conn.execute("""
        CREATE UNIQUE INDEX IF NOT EXISTS idx_price_snapshot_unique
        ON price_snapshots(item_id, snapshot_date)
    """)

    conn.execute("""
        CREATE INDEX IF NOT EXISTS idx_price_item_date
        ON price_snapshots(item_id, snapshot_date)
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS price_alerts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            item_id TEXT,
            old_price REAL,
            new_price REAL,
            change_pct REAL,
            alert_type TEXT,
            alert_date DATE,
            notified INTEGER DEFAULT 0
        )
    """)

    conn.commit()
    return conn


def upsert_product(conn: sqlite3.Connection, product: dict):
    """Insert or update a product record."""
    today = date.today().isoformat()
    conn.execute("""
        INSERT INTO products (item_id, name, brand, model, url, category,
                              seller_id, seller_name, upc, first_seen, last_seen)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(item_id) DO UPDATE SET
            name=excluded.name, brand=excluded.brand, model=excluded.model,
            seller_id=excluded.seller_id, seller_name=excluded.seller_name,
            last_seen=excluded.last_seen
    """, (
        product.get("item_id"), product.get("name"), product.get("brand"),
        product.get("model"), product.get("url"),
        ",".join(product.get("categories", [])),
        product.get("seller_id"), product.get("seller_name"),
        product.get("upc"), today, today,
    ))
    conn.commit()


def record_snapshot(conn: sqlite3.Connection, product: dict):
    """Record a price snapshot. One per item per day."""
    today = date.today().isoformat()
    now = datetime.utcnow().isoformat()

    # Check for price change
    prev = conn.execute("""
        SELECT price FROM price_snapshots
        WHERE item_id = ? AND snapshot_date < ?
        ORDER BY snapshot_date DESC LIMIT 1
    """, (product["item_id"], today)).fetchone()

    new_price = product.get("price")
    old_price = prev[0] if prev else None

    conn.execute("""
        INSERT OR REPLACE INTO price_snapshots
        (item_id, price, was_price, in_stock, rating, review_count, snapshot_date, snapshot_ts)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        product["item_id"], new_price, product.get("was_price"),
        int(product.get("in_stock", True)),
        product.get("rating"), product.get("review_count"),
        today, now,
    ))

    # Generate alert if significant price change
    if old_price and new_price and old_price > 0:
        change_pct = (new_price - old_price) / old_price * 100
        if abs(change_pct) >= 10:
            alert_type = "DROP" if change_pct < 0 else "INCREASE"
            conn.execute("""
                INSERT INTO price_alerts
                (item_id, old_price, new_price, change_pct, alert_type, alert_date)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (product["item_id"], old_price, new_price,
                  round(change_pct, 2), alert_type, today))

    conn.commit()


def get_price_history(conn: sqlite3.Connection, item_id: str, days: int = 90) -> list[dict]:
    """Retrieve price history for a product."""
    rows = conn.execute("""
        SELECT snapshot_date, price, was_price, in_stock, rating, review_count
        FROM price_snapshots
        WHERE item_id = ?
        ORDER BY snapshot_date DESC
        LIMIT ?
    """, (item_id, days)).fetchall()

    return [
        {
            "date": r[0], "price": r[1], "was_price": r[2],
            "in_stock": bool(r[3]), "rating": r[4], "reviews": r[5],
        }
        for r in rows
    ]


def get_pending_alerts(conn: sqlite3.Connection) -> list[dict]:
    """Get unnotified price alerts."""
    rows = conn.execute("""
        SELECT a.item_id, p.name, a.old_price, a.new_price,
               a.change_pct, a.alert_type, a.alert_date
        FROM price_alerts a
        JOIN products p ON a.item_id = p.item_id
        WHERE a.notified = 0
        ORDER BY ABS(a.change_pct) DESC
    """).fetchall()

    return [
        {
            "item_id": r[0], "name": r[1], "old_price": r[2],
            "new_price": r[3], "change_pct": r[4], "type": r[5], "date": r[6],
        }
        for r in rows
    ]

Handling Walmart's PerimeterX Bot Detection

Walmart uses PerimeterX (now HUMAN Security) as their primary bot detection layer. Understanding how it works helps you defeat it:

Layer 1: IP reputation scoring. PerimeterX maintains a database of known datacenter IP ranges, proxy services, and high-risk IP blocks. Any request from a datacenter IP triggers an immediate challenge or block. This is why ThorData's residential proxy network is essential for Walmart scraping — their residential IPs have genuine ISP attribution and pass this check reliably.

Layer 2: TLS/HTTP fingerprinting. Your TLS handshake reveals what HTTP library you're using. requests, httpx, and curl all have distinctive TLS fingerprints that PerimeterX recognizes. Playwright-launched Chromium uses a real browser fingerprint.

Layer 3: Browser fingerprinting. JavaScript probes check navigator.webdriver, Canvas and WebGL rendering, audio context timing, available fonts, screen dimensions, plugin arrays, and dozens of other browser properties.

Layer 4: Behavioral scoring. Mouse movement patterns, scroll velocity, click timing, time-on-page, and navigation path all contribute to a behavioral risk score. Automated sessions that navigate directly to product data without browsing get flagged.

Layer 5: The _px3 cookie. PerimeterX sets a behavioral fingerprint cookie after the initial visit. Subsequent requests with a missing or invalid _px3 trigger re-challenges.

Mitigation strategy that works in practice:

async def create_stealth_walmart_context(playwright, proxy_config: dict = None):
    """Create a properly configured stealth browser context for Walmart."""

    launch_kwargs = {"headless": True}
    if proxy_config:
        launch_kwargs["proxy"] = proxy_config

    browser = await playwright.chromium.launch(**launch_kwargs)

    context = await browser.new_context(
        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                   "AppleWebKit/537.36 (KHTML, like Gecko) "
                   "Chrome/127.0.0.0 Safari/537.36",
        viewport={"width": 1920, "height": 1080},
        locale="en-US",
        timezone_id="America/Chicago",  # Walmart HQ timezone
        geolocation={"latitude": 36.3729, "longitude": -94.2088},  # Bentonville AR
        permissions=["geolocation"],
        color_scheme="light",
        device_scale_factor=1,
        has_touch=False,
        is_mobile=False,
    )

    # Override automation tells
    await context.add_init_script("""
        // Remove webdriver flag
        Object.defineProperty(navigator, 'webdriver', {get: () => undefined});

        // Realistic plugin array
        Object.defineProperty(navigator, 'plugins', {
            get: () => [
                {name: 'Chrome PDF Plugin'}, {name: 'Chrome PDF Viewer'},
                {name: 'Native Client'},
            ]
        });

        // Languages
        Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});

        // Chrome runtime (headless Chrome lacks this)
        if (!window.chrome) {
            window.chrome = {
                app: {isInstalled: false},
                webstore: {onInstallStageChanged: {}, onDownloadProgress: {}},
                runtime: {
                    PlatformOs: {MAC: 'mac', WIN: 'win', ANDROID: 'android', CROS: 'cros', LINUX: 'linux', OPENBSD: 'openbsd'},
                    PlatformArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'},
                    PlatformNaclArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'},
                    RequestUpdateCheckStatus: {THROTTLED: 'throttled', NO_UPDATE: 'no_update', UPDATE_AVAILABLE: 'update_available'},
                    OnInstalledReason: {INSTALL: 'install', UPDATE: 'update', CHROME_UPDATE: 'chrome_update', SHARED_MODULE_UPDATE: 'shared_module_update'},
                    OnRestartRequiredReason: {APP_UPDATE: 'app_update', OS_UPDATE: 'os_update', PERIODIC: 'periodic'},
                },
            };
        }
    """)

    return browser, context


# Walmart scraping rate guidelines:
# - Search pages: 5-8 requests per minute per IP
# - Product pages: 10-12 requests per minute per IP
# - Review pages: 15-20 requests per minute per IP (less protected)
# - Always randomize delays: 3-8 seconds between product pages
# - Rotate browser context every 30-50 requests (reset cookies + fingerprint)
# - Rotate proxy every 20-30 requests

Production Monitoring Pipeline

Putting it all together for production-grade competitor monitoring:

import asyncio
import json
import time
import random
from datetime import datetime

async def monitor_product_list(
    item_ids: list[str],
    db_path: str = "walmart_tracker.db",
    proxy_config: dict = None,
    requests_per_context: int = 40,
) -> dict:
    """
    Production pipeline: scrape a list of Walmart items and track prices.
    Rotates browser contexts to stay under detection thresholds.
    """
    db = init_walmart_db(db_path)
    results = {"success": 0, "failed": 0, "price_drops": []}

    async with async_playwright() as p:
        browser, context = await create_stealth_walmart_context(p, proxy_config)
        page = await context.new_page()
        requests_in_context = 0

        for i, item_id in enumerate(item_ids):
            # Rotate context periodically
            if requests_in_context >= requests_per_context:
                print(f"  Rotating browser context after {requests_in_context} requests...")
                await browser.close()
                browser, context = await create_stealth_walmart_context(p, proxy_config)
                page = await context.new_page()
                requests_in_context = 0
                await asyncio.sleep(3)

            url = f"https://www.walmart.com/ip/{item_id}"
            print(f"  [{i+1}/{len(item_ids)}] Scraping item {item_id}...")

            try:
                await page.goto(url, wait_until="networkidle", timeout=30000)
                await asyncio.sleep(random.uniform(2, 4))

                # Extract __NEXT_DATA__
                next_data_text = await page.evaluate("""
                    () => {
                        const el = document.getElementById('__NEXT_DATA__');
                        return el ? el.textContent : null;
                    }
                """)

                product = None
                if next_data_text:
                    nd = json.loads(next_data_text)
                    raw_product = (
                        nd.get("props", {})
                        .get("pageProps", {})
                        .get("initialData", {})
                        .get("data", {})
                        .get("product", {})
                    )
                    if raw_product:
                        product = normalize_walmart_product(raw_product, url)

                if product and product.get("item_id"):
                    upsert_product(db, product)
                    record_snapshot(db, product)
                    results["success"] += 1
                    print(f"    {product['name'][:40]} | ${product.get('price')} | "
                          f"{'In Stock' if product.get('in_stock') else 'Out of Stock'}")
                else:
                    results["failed"] += 1
                    print(f"    Could not extract product data")

                requests_in_context += 1

            except Exception as e:
                print(f"    Error: {e}")
                results["failed"] += 1
                requests_in_context += 1

            # Randomized delay between products
            delay = random.uniform(3, 7)
            await asyncio.sleep(delay)

        await browser.close()

    # Check for price alerts
    alerts = get_pending_alerts(db)
    if alerts:
        print(f"\nPrice alerts ({len(alerts)}):")
        for alert in alerts:
            print(f"  {alert['type']}: {alert['name'][:40]}")
            print(f"    ${alert['old_price']} -> ${alert['new_price']} ({alert['change_pct']:+.1f}%)")
        results["price_drops"] = [a for a in alerts if a["type"] == "DROP"]

    db.close()
    return results

Category and Department Scraping

For broader market research, scrape entire product categories:

async def scrape_walmart_category(
    category_url: str,
    max_pages: int = 10,
    proxy: dict = None,
) -> list[dict]:
    """
    Scrape a Walmart category page (e.g., /browse/electronics).
    Handles infinite scroll / pagination.
    """
    all_items = []

    async with async_playwright() as p:
        browser, context = await create_stealth_walmart_context(p, proxy)
        page = await context.new_page()

        for pg in range(1, max_pages + 1):
            url = f"{category_url}?page={pg}" if "?" not in category_url else f"{category_url}&page={pg}"
            await page.goto(url, wait_until="networkidle", timeout=30000)
            await asyncio.sleep(3)

            # Extract product grid
            items = await page.evaluate("""
                () => {
                    const cards = document.querySelectorAll('[data-item-id]');
                    return Array.from(cards).map(card => {
                        const titleEl = card.querySelector('[data-automation-id="product-title"]');
                        const priceEl = card.querySelector('[data-automation-id="product-price"] .f2');
                        const linkEl = card.querySelector('a[href*="/ip/"]');
                        return {
                            item_id: card.getAttribute('data-item-id'),
                            name: titleEl ? titleEl.textContent.trim() : '',
                            price_text: priceEl ? priceEl.textContent.trim() : '',
                            url: linkEl ? 'https://www.walmart.com' + linkEl.getAttribute('href') : '',
                        };
                    }).filter(i => i.item_id && i.name);
                }
            """)

            if not items:
                print(f"  Page {pg}: no items found, stopping")
                break

            all_items.extend(items)
            print(f"  Page {pg}: {len(items)} items (total: {len(all_items)})")

            await asyncio.sleep(random.uniform(3, 6))

        await browser.close()

    return all_items

Common Gotchas

Walmart's prices are geo-locked. If you're accessing from outside the US, many products return null prices or different pricing. Always use US residential proxies. ThorData with country-us targeting ensures you see US pricing.

Third-party sellers. Walmart Marketplace has millions of third-party products alongside Walmart's own inventory. The seller_name and seller_id fields distinguish them. Third-party sellers often have different return policies and pricing logic.

Item ID vs URL slug. Walmart product URLs include a human-readable slug that changes when names change, but the numeric item ID at the end is permanent. Use item ID as your primary key.

Availability status granularity. Don't just check "IN_STOCK" vs "OUT_OF_STOCK". The availability field can return IN_STOCK, OUT_OF_STOCK, UNAVAILABLE, LIMITED_QUANTITY, ROLLBACK, and others. Map these appropriately for your use case.

The was_price vs rollback distinction. A wasPrice field indicates a price was reduced from an original. A "ROLLBACK" badge means a temporary promotional price. These have different product longevity implications.

Anti-bot detection evolves. PerimeterX updates their detection methods regularly. If your scraper suddenly starts failing at scale, inspect what's changed in the page's JavaScript before diving into complex evasion — sometimes a simple header update is all that's needed.

Walmart's Terms of Use prohibit scraping. The hiQ Labs v. LinkedIn (2022) Ninth Circuit ruling established that scraping publicly accessible data doesn't automatically violate the Computer Fraud and Abuse Act. Walmart's public product pages are accessible to anyone — the terms create a contractual restriction, not a criminal one.

For commercial use at scale, consider Walmart's official affiliate and partner APIs — the Walmart Affiliate API provides product data with proper authorization. For research, competitive analysis, and personal monitoring at reasonable volumes, keep your scraping rates conservative, don't hammer their servers, and store only what you need for your specific use case.

Conclusion

Walmart scraping in 2026 requires real browser automation for reliable results — the PerimeterX bot detection blocks naive HTTP clients quickly. Playwright gives you a genuine Chromium fingerprint, and the embedded __NEXT_DATA__ JSON is a cleaner extraction target than parsing rendered HTML. For production monitoring at scale, ThorData residential proxies paired with context rotation every 30-40 requests keeps you under detection thresholds. The price tracking database design shown here handles the core use case well: track it daily, alert on drops, and you have actionable competitor intelligence with a few days of data.