How to Scrape Walmart Product Data with Python in 2026
How to Scrape Walmart Product Data with Python in 2026
Walmart is the largest retailer in the world by revenue, and walmart.com is the second-largest US e-commerce platform after Amazon. For price intelligence, competitor monitoring, product research, and market analysis, Walmart's product catalog is an essential data source. Their 170 million+ active customers and hundreds of millions of SKUs make it one of the richest retail datasets available on the public web.
Unlike Amazon's hybrid server-rendered pages, Walmart relies heavily on a GraphQL API that powers its frontend. This is both good and bad news for scrapers: the API returns clean structured data, but it's protected by sophisticated bot detection (PerimeterX/HUMAN Security). This guide covers the full technical stack for extracting Walmart product data in 2026 — from initial page access through to production-grade price monitoring pipelines.
Understanding Walmart's Architecture
Before diving into code, it helps to understand what you're working with. Walmart.com's architecture circa 2026:
- Frontend: React SPA with server-side rendering for SEO
- API layer: GraphQL endpoint at
walmart.com/orchestra/graphql - Bot protection: PerimeterX (now HUMAN Security) + Cloudflare
- Content delivery: Akamai CDN with edge caching
- Session management:
_px3cookie from PerimeterX tracks behavioral fingerprint
The page loads with some server-rendered content (for SEO), then hydrates via GraphQL calls. Product data is available both in the embedded JSON state within the HTML and via the API.
Method 1: Scraping the Embedded JSON State
The fastest and most reliable approach is extracting data from the __NEXT_DATA__ script tag that Walmart embeds in their server-rendered HTML. This doesn't require running JavaScript or making additional API calls:
import httpx
import json
import re
from typing import Optional
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Referer": "https://www.walmart.com/",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
}
def extract_product_id(product_url: str) -> Optional[str]:
"""Extract product ID from a Walmart URL."""
# Format: walmart.com/ip/Product-Name/123456789
# Also: walmart.com/ip/123456789
match = re.search(r"/ip/(?:[^/]+/)?(\d+)", product_url)
return match.group(1) if match else None
def scrape_product_page(product_url: str) -> Optional[dict]:
"""
Scrape a Walmart product page by extracting the embedded JSON state.
Most reliable method — doesn't depend on API schema.
"""
client = httpx.Client(timeout=30, headers=HEADERS, follow_redirects=True)
try:
resp = client.get(product_url)
if resp.status_code == 403:
return {"error": "bot_detection", "status": 403}
if resp.status_code == 404:
return {"error": "not_found", "status": 404}
resp.raise_for_status()
except httpx.TimeoutException:
return {"error": "timeout"}
html = resp.text
# Extract __NEXT_DATA__ (Next.js page props embedded in HTML)
match = re.search(r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', html, re.DOTALL)
if not match:
# Fallback: look for window.__WML_REDUX_INITIAL_STATE__
match = re.search(r'window\.__WML_REDUX_INITIAL_STATE__\s*=\s*({.*?});', html, re.DOTALL)
if not match:
return {"error": "no_embedded_json", "html_length": len(html)}
try:
page_data = json.loads(match.group(1))
except json.JSONDecodeError as e:
return {"error": f"json_parse_error: {e}"}
# Navigate the Next.js data structure to find product info
# Path varies slightly by page type
product = None
paths_to_try = [
["props", "pageProps", "initialData", "data", "product"],
["props", "pageProps", "product"],
["props", "initialState", "product", "products"],
]
for path in paths_to_try:
node = page_data
for key in path:
node = node.get(key, {}) if isinstance(node, dict) else {}
if node:
product = node
break
if not product:
return {"error": "product_not_found_in_json", "keys": list(page_data.get("props", {}).keys())}
return normalize_walmart_product(product, product_url)
def normalize_walmart_product(raw: dict, url: str) -> dict:
"""Normalize a raw Walmart product dict into a clean structure."""
# Price info
price_info = raw.get("priceInfo") or {}
current_price = price_info.get("currentPrice") or {}
was_price = price_info.get("wasPrice") or {}
unit_price = price_info.get("unitPrice") or {}
# Availability
availability = raw.get("availabilityStatus", "").upper()
# Fulfillment options (shipping/pickup/delivery)
fulfillment = raw.get("fulfillmentType") or []
# Images
images = []
image_info = raw.get("imageInfo") or {}
for img in (image_info.get("allImages") or []):
if img.get("url"):
images.append(img["url"])
# Variants
variants = []
variant_criteria = raw.get("variantCriteria") or []
for criterion in variant_criteria:
variants.append({
"type": criterion.get("name", ""),
"options": [v.get("name", "") for v in (criterion.get("values") or [])],
})
# Seller info
seller_info = raw.get("sellerInfo") or {}
return {
"item_id": raw.get("usItemId") or raw.get("itemId", ""),
"name": raw.get("name", ""),
"brand": raw.get("brand", ""),
"model": raw.get("model", ""),
"url": url,
"short_description": raw.get("shortDescription", ""),
"price": current_price.get("price"),
"price_string": current_price.get("priceString", ""),
"was_price": was_price.get("price"),
"was_price_string": was_price.get("priceString", ""),
"unit_price": unit_price.get("price"),
"unit_price_unit": unit_price.get("unitOfMeasure", ""),
"in_stock": availability in ("IN_STOCK", "AVAILABLE"),
"availability_status": availability,
"fulfillment_types": fulfillment,
"rating": raw.get("averageRating", 0),
"review_count": raw.get("numberOfReviews", 0),
"seller_id": seller_info.get("sellerId", ""),
"seller_name": seller_info.get("sellerDisplayName", ""),
"is_walmart_fulfilled": seller_info.get("type") == "WALMART",
"images": images[:5], # First 5 images
"variants": variants,
"categories": [
c.get("name", "") for c in (raw.get("categories") or [])
],
"upc": raw.get("upc", ""),
"gtin": raw.get("gtin13", ""),
}
Method 2: Direct GraphQL API Calls
When you need batch processing and want to avoid loading full HTML pages, call Walmart's GraphQL endpoint directly. The schema changes periodically, so check for updates when things break:
import httpx
import json
from typing import Optional
GRAPHQL_ENDPOINT = "https://www.walmart.com/orchestra/graphql"
GRAPHQL_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
"Accept": "application/json",
"Content-Type": "application/json",
"X-O-PLATFORM": "rweb",
"X-O-SEGMENT": "oaoh",
"X-O-GQL-QUERY": "query GetProductDetail",
"Referer": "https://www.walmart.com/",
"Origin": "https://www.walmart.com",
}
PRODUCT_QUERY = """
query GetProductDetail($itemId: String!) {
product(itemId: $itemId) {
usItemId
name
brand
shortDescription
model
upc
averageRating
numberOfReviews
priceInfo {
currentPrice {
price
priceString
currencyCode
}
wasPrice {
price
priceString
}
unitPrice {
price
priceString
unitOfMeasure
}
priceRanges {
minPrice { price priceString }
maxPrice { price priceString }
}
}
availabilityStatus
sellerInfo {
sellerId
sellerDisplayName
type
}
fulfillmentType
imageInfo {
thumbnailUrl
allImages { url }
}
categories {
name
url
}
variantCriteria {
name
isVariantTypeSwatch
values {
name
id
isAvailable
}
}
}
}
"""
def get_product_graphql(item_id: str, client: httpx.Client = None) -> Optional[dict]:
"""Fetch a single product via the GraphQL API."""
if client is None:
client = httpx.Client(timeout=30, headers=GRAPHQL_HEADERS)
payload = {
"query": PRODUCT_QUERY,
"variables": {"itemId": str(item_id)},
}
try:
resp = client.post(GRAPHQL_ENDPOINT, json=payload)
if resp.status_code == 403:
return None # Bot detection triggered
resp.raise_for_status()
data = resp.json()
if "errors" in data:
return {"errors": data["errors"]}
product = data.get("data", {}).get("product")
if product:
return normalize_walmart_product(product, f"https://www.walmart.com/ip/{item_id}")
return None
except (httpx.TimeoutException, httpx.NetworkError):
return None
def get_products_batch(item_ids: list[str], delay: float = 2.0) -> list[dict]:
"""Fetch multiple products with rate limiting."""
import time
import random
client = httpx.Client(timeout=30, headers=GRAPHQL_HEADERS)
results = []
for i, item_id in enumerate(item_ids):
product = get_product_graphql(item_id, client)
if product:
results.append(product)
print(f" [{i+1}/{len(item_ids)}] {product.get('name', item_id)[:50]}")
else:
print(f" [{i+1}/{len(item_ids)}] {item_id} — failed")
results.append({"item_id": item_id, "error": "not_found"})
# Randomized delay
wait = delay + random.uniform(0, delay * 0.5)
time.sleep(wait)
client.close()
return results
Method 3: Playwright with Intercepted GraphQL
The most robust approach for anti-bot-heavy environments — use a real browser and intercept the GraphQL responses as they happen:
import asyncio
import json
from playwright.async_api import async_playwright
async def scrape_product_playwright(product_url: str, proxy: dict = None) -> dict:
"""
Use a real Chromium browser to load the product page
and intercept the GraphQL response.
"""
graphql_data = {}
async with async_playwright() as p:
launch_opts = {"headless": True}
if proxy:
launch_opts["proxy"] = proxy
browser = await p.chromium.launch(**launch_opts)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/127.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
locale="en-US",
timezone_id="America/New_York",
)
# Inject stealth overrides before any page load
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => false});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3]});
window.chrome = { runtime: {} };
""")
page = await context.new_page()
# Capture GraphQL responses
async def handle_response(response):
if "orchestra/graphql" in response.url:
try:
data = await response.json()
if data.get("data", {}).get("product"):
graphql_data.update(data["data"]["product"])
except Exception:
pass
page.on("response", handle_response)
# Navigate to the product page
await page.goto(product_url, wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(2000)
# If no GraphQL data captured, extract from DOM
if not graphql_data:
# Try __NEXT_DATA__
next_data_text = await page.evaluate("""
() => {
const el = document.getElementById('__NEXT_DATA__');
return el ? el.textContent : null;
}
""")
if next_data_text:
try:
nd = json.loads(next_data_text)
# Navigate to product in the data structure
product_node = (
nd.get("props", {})
.get("pageProps", {})
.get("initialData", {})
.get("data", {})
.get("product", {})
)
if product_node:
graphql_data = product_node
except json.JSONDecodeError:
pass
# Final DOM fallback
if not graphql_data:
graphql_data = await page.evaluate("""
() => {
const getContent = (selector) => {
const el = document.querySelector(selector);
return el ? el.textContent.trim() : '';
};
const getAttr = (selector, attr) => {
const el = document.querySelector(selector);
return el ? el.getAttribute(attr) : '';
};
return {
name: getContent('h1[itemprop="name"], [data-automation-id="product-title"]'),
price: getContent('[itemprop="price"], [data-automation-id="product-price"] .f2'),
rating: getAttr('[itemprop="ratingValue"]', 'content'),
review_count: getAttr('[itemprop="reviewCount"]', 'content'),
in_stock: !!document.querySelector('[data-automation-id="add-to-cart-btn"]'),
};
}
""")
await browser.close()
if graphql_data:
return normalize_walmart_product(graphql_data, product_url)
return {"error": "no_data_extracted", "url": product_url}
Scraping Search Results
Walmart search results are also GraphQL-driven. Here's how to paginate through them:
async def scrape_walmart_search(
query: str,
max_pages: int = 5,
sort_by: str = "best_match", # best_match | price_low | price_high | rating
proxy: dict = None,
) -> list[dict]:
"""
Scrape Walmart search results pages.
Returns list of product summaries.
"""
all_products = []
async with async_playwright() as p:
launch_opts = {"headless": True}
if proxy:
launch_opts["proxy"] = proxy
browser = await p.chromium.launch(**launch_opts)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/127.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
)
await context.add_init_script(
"Object.defineProperty(navigator,'webdriver',{get:()=>false});"
)
page = await context.new_page()
for pg_num in range(1, max_pages + 1):
sort_param = f"&sort={sort_by}" if sort_by != "best_match" else ""
url = f"https://www.walmart.com/search?q={query}&page={pg_num}{sort_param}"
await page.goto(url, wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(2500)
# Extract product cards
products = await page.evaluate("""
() => {
const cards = document.querySelectorAll('[data-item-id]');
return Array.from(cards).map(card => {
const nameEl = card.querySelector('[data-automation-id="product-title"] span');
const priceEl = card.querySelector('[data-automation-id="product-price"] .f2');
const ratingEl = card.querySelector('[data-testid="product-ratings"]');
const reviewEl = card.querySelector('[data-testid="product-reviews"]');
const linkEl = card.querySelector('a[link-identifier="linkText"]');
const imgEl = card.querySelector('img[data-testid="productTile-atf-image"]');
const badgeEl = card.querySelector('[data-testid="item-badge"]');
return {
item_id: card.getAttribute('data-item-id'),
name: nameEl ? nameEl.textContent.trim() : '',
price: priceEl ? priceEl.textContent.trim() : '',
rating: ratingEl ? ratingEl.textContent.trim() : '',
reviews: reviewEl ? reviewEl.textContent.trim() : '',
url: linkEl ? 'https://www.walmart.com' + linkEl.getAttribute('href') : '',
thumbnail: imgEl ? imgEl.getAttribute('src') : '',
badge: badgeEl ? badgeEl.textContent.trim() : '',
};
}).filter(p => p.name && p.item_id);
}
""")
all_products.extend(products)
print(f" Page {pg_num}: {len(products)} products (total {len(all_products)})")
# Random delay between pages
await page.wait_for_timeout(3000 + 2000 * (hash(query + str(pg_num)) % 3))
await browser.close()
return all_products
Extracting Product Reviews
Walmart reviews have their own API endpoint. They're paginated with a limit and offset pattern:
import httpx
import time
def get_walmart_reviews(
item_id: str,
max_pages: int = 5,
sort_by: str = "relevancy", # relevancy | submission-desc | rating-desc | rating-asc
) -> list[dict]:
"""
Fetch product reviews from Walmart's review API.
"""
reviews_base = f"https://www.walmart.com/reviews/product/{item_id}"
client = httpx.Client(
timeout=30,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/127.0.0.0 Safari/537.36",
"Accept": "application/json",
}
)
all_reviews = []
limit = 50
for page in range(1, max_pages + 1):
offset = (page - 1) * limit
params = {
"limit": limit,
"offset": offset,
"sort": sort_by,
"filters": "",
}
url = f"https://www.walmart.com/reviews/api/fetch/v3"
params_with_item = {**params, "itemId": item_id}
try:
resp = client.get(url, params=params_with_item)
if resp.status_code != 200:
break
data = resp.json()
except Exception as e:
print(f" Reviews page {page} failed: {e}")
break
reviews = data.get("reviews") or []
for r in reviews:
all_reviews.append({
"id": r.get("reviewId", ""),
"rating": r.get("rating", 0),
"title": r.get("title", ""),
"text": r.get("reviewText", ""),
"author": r.get("authorId", ""),
"date": r.get("reviewSubmissionTime", ""),
"verified_purchase": r.get("badgeLabel") == "Verified Purchase",
"helpful_votes": r.get("positiveFeedback", 0),
"not_helpful_votes": r.get("negativeFeedback", 0),
})
total = data.get("totalResults", 0)
if offset + limit >= total:
break
time.sleep(1.5)
client.close()
return all_reviews
def analyze_sentiment(reviews: list[dict]) -> dict:
"""Basic sentiment analysis on review text."""
if not reviews:
return {}
ratings = [r["rating"] for r in reviews]
avg_rating = sum(ratings) / len(ratings)
# Rating distribution
dist = {i: ratings.count(i) for i in range(1, 6)}
# Verified purchase ratio
verified = sum(1 for r in reviews if r.get("verified_purchase"))
# Helpful vote analysis
helpful_reviews = sorted(reviews, key=lambda r: r.get("helpful_votes", 0), reverse=True)
return {
"total_reviews_analyzed": len(reviews),
"avg_rating": round(avg_rating, 2),
"rating_distribution": dist,
"verified_purchase_pct": round(100 * verified / len(reviews), 1),
"top_helpful_review": helpful_reviews[0]["title"] if helpful_reviews else "",
"five_star_pct": round(100 * dist.get(5, 0) / len(ratings), 1),
"one_star_pct": round(100 * dist.get(1, 0) / len(ratings), 1),
}
Price History Tracking and Database Schema
For competitor monitoring, you need historical data. Here's a complete SQLite schema for tracking Walmart prices over time:
import sqlite3
from datetime import datetime, date
def init_walmart_db(db_path: str = "walmart_tracker.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS products (
item_id TEXT PRIMARY KEY,
name TEXT,
brand TEXT,
model TEXT,
url TEXT,
category TEXT,
seller_id TEXT,
seller_name TEXT,
upc TEXT,
first_seen DATE,
last_seen DATE
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS price_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id TEXT NOT NULL,
price REAL,
was_price REAL,
in_stock INTEGER,
rating REAL,
review_count INTEGER,
snapshot_date DATE,
snapshot_ts TEXT,
FOREIGN KEY (item_id) REFERENCES products(item_id)
)
""")
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_price_snapshot_unique
ON price_snapshots(item_id, snapshot_date)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_price_item_date
ON price_snapshots(item_id, snapshot_date)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS price_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id TEXT,
old_price REAL,
new_price REAL,
change_pct REAL,
alert_type TEXT,
alert_date DATE,
notified INTEGER DEFAULT 0
)
""")
conn.commit()
return conn
def upsert_product(conn: sqlite3.Connection, product: dict):
"""Insert or update a product record."""
today = date.today().isoformat()
conn.execute("""
INSERT INTO products (item_id, name, brand, model, url, category,
seller_id, seller_name, upc, first_seen, last_seen)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(item_id) DO UPDATE SET
name=excluded.name, brand=excluded.brand, model=excluded.model,
seller_id=excluded.seller_id, seller_name=excluded.seller_name,
last_seen=excluded.last_seen
""", (
product.get("item_id"), product.get("name"), product.get("brand"),
product.get("model"), product.get("url"),
",".join(product.get("categories", [])),
product.get("seller_id"), product.get("seller_name"),
product.get("upc"), today, today,
))
conn.commit()
def record_snapshot(conn: sqlite3.Connection, product: dict):
"""Record a price snapshot. One per item per day."""
today = date.today().isoformat()
now = datetime.utcnow().isoformat()
# Check for price change
prev = conn.execute("""
SELECT price FROM price_snapshots
WHERE item_id = ? AND snapshot_date < ?
ORDER BY snapshot_date DESC LIMIT 1
""", (product["item_id"], today)).fetchone()
new_price = product.get("price")
old_price = prev[0] if prev else None
conn.execute("""
INSERT OR REPLACE INTO price_snapshots
(item_id, price, was_price, in_stock, rating, review_count, snapshot_date, snapshot_ts)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
product["item_id"], new_price, product.get("was_price"),
int(product.get("in_stock", True)),
product.get("rating"), product.get("review_count"),
today, now,
))
# Generate alert if significant price change
if old_price and new_price and old_price > 0:
change_pct = (new_price - old_price) / old_price * 100
if abs(change_pct) >= 10:
alert_type = "DROP" if change_pct < 0 else "INCREASE"
conn.execute("""
INSERT INTO price_alerts
(item_id, old_price, new_price, change_pct, alert_type, alert_date)
VALUES (?, ?, ?, ?, ?, ?)
""", (product["item_id"], old_price, new_price,
round(change_pct, 2), alert_type, today))
conn.commit()
def get_price_history(conn: sqlite3.Connection, item_id: str, days: int = 90) -> list[dict]:
"""Retrieve price history for a product."""
rows = conn.execute("""
SELECT snapshot_date, price, was_price, in_stock, rating, review_count
FROM price_snapshots
WHERE item_id = ?
ORDER BY snapshot_date DESC
LIMIT ?
""", (item_id, days)).fetchall()
return [
{
"date": r[0], "price": r[1], "was_price": r[2],
"in_stock": bool(r[3]), "rating": r[4], "reviews": r[5],
}
for r in rows
]
def get_pending_alerts(conn: sqlite3.Connection) -> list[dict]:
"""Get unnotified price alerts."""
rows = conn.execute("""
SELECT a.item_id, p.name, a.old_price, a.new_price,
a.change_pct, a.alert_type, a.alert_date
FROM price_alerts a
JOIN products p ON a.item_id = p.item_id
WHERE a.notified = 0
ORDER BY ABS(a.change_pct) DESC
""").fetchall()
return [
{
"item_id": r[0], "name": r[1], "old_price": r[2],
"new_price": r[3], "change_pct": r[4], "type": r[5], "date": r[6],
}
for r in rows
]
Handling Walmart's PerimeterX Bot Detection
Walmart uses PerimeterX (now HUMAN Security) as their primary bot detection layer. Understanding how it works helps you defeat it:
Layer 1: IP reputation scoring. PerimeterX maintains a database of known datacenter IP ranges, proxy services, and high-risk IP blocks. Any request from a datacenter IP triggers an immediate challenge or block. This is why ThorData's residential proxy network is essential for Walmart scraping — their residential IPs have genuine ISP attribution and pass this check reliably.
Layer 2: TLS/HTTP fingerprinting. Your TLS handshake reveals what HTTP library you're using. requests, httpx, and curl all have distinctive TLS fingerprints that PerimeterX recognizes. Playwright-launched Chromium uses a real browser fingerprint.
Layer 3: Browser fingerprinting. JavaScript probes check navigator.webdriver, Canvas and WebGL rendering, audio context timing, available fonts, screen dimensions, plugin arrays, and dozens of other browser properties.
Layer 4: Behavioral scoring. Mouse movement patterns, scroll velocity, click timing, time-on-page, and navigation path all contribute to a behavioral risk score. Automated sessions that navigate directly to product data without browsing get flagged.
Layer 5: The _px3 cookie. PerimeterX sets a behavioral fingerprint cookie after the initial visit. Subsequent requests with a missing or invalid _px3 trigger re-challenges.
Mitigation strategy that works in practice:
async def create_stealth_walmart_context(playwright, proxy_config: dict = None):
"""Create a properly configured stealth browser context for Walmart."""
launch_kwargs = {"headless": True}
if proxy_config:
launch_kwargs["proxy"] = proxy_config
browser = await playwright.chromium.launch(**launch_kwargs)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/127.0.0.0 Safari/537.36",
viewport={"width": 1920, "height": 1080},
locale="en-US",
timezone_id="America/Chicago", # Walmart HQ timezone
geolocation={"latitude": 36.3729, "longitude": -94.2088}, # Bentonville AR
permissions=["geolocation"],
color_scheme="light",
device_scale_factor=1,
has_touch=False,
is_mobile=False,
)
# Override automation tells
await context.add_init_script("""
// Remove webdriver flag
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
// Realistic plugin array
Object.defineProperty(navigator, 'plugins', {
get: () => [
{name: 'Chrome PDF Plugin'}, {name: 'Chrome PDF Viewer'},
{name: 'Native Client'},
]
});
// Languages
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
// Chrome runtime (headless Chrome lacks this)
if (!window.chrome) {
window.chrome = {
app: {isInstalled: false},
webstore: {onInstallStageChanged: {}, onDownloadProgress: {}},
runtime: {
PlatformOs: {MAC: 'mac', WIN: 'win', ANDROID: 'android', CROS: 'cros', LINUX: 'linux', OPENBSD: 'openbsd'},
PlatformArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'},
PlatformNaclArch: {ARM: 'arm', X86_32: 'x86-32', X86_64: 'x86-64'},
RequestUpdateCheckStatus: {THROTTLED: 'throttled', NO_UPDATE: 'no_update', UPDATE_AVAILABLE: 'update_available'},
OnInstalledReason: {INSTALL: 'install', UPDATE: 'update', CHROME_UPDATE: 'chrome_update', SHARED_MODULE_UPDATE: 'shared_module_update'},
OnRestartRequiredReason: {APP_UPDATE: 'app_update', OS_UPDATE: 'os_update', PERIODIC: 'periodic'},
},
};
}
""")
return browser, context
# Walmart scraping rate guidelines:
# - Search pages: 5-8 requests per minute per IP
# - Product pages: 10-12 requests per minute per IP
# - Review pages: 15-20 requests per minute per IP (less protected)
# - Always randomize delays: 3-8 seconds between product pages
# - Rotate browser context every 30-50 requests (reset cookies + fingerprint)
# - Rotate proxy every 20-30 requests
Production Monitoring Pipeline
Putting it all together for production-grade competitor monitoring:
import asyncio
import json
import time
import random
from datetime import datetime
async def monitor_product_list(
item_ids: list[str],
db_path: str = "walmart_tracker.db",
proxy_config: dict = None,
requests_per_context: int = 40,
) -> dict:
"""
Production pipeline: scrape a list of Walmart items and track prices.
Rotates browser contexts to stay under detection thresholds.
"""
db = init_walmart_db(db_path)
results = {"success": 0, "failed": 0, "price_drops": []}
async with async_playwright() as p:
browser, context = await create_stealth_walmart_context(p, proxy_config)
page = await context.new_page()
requests_in_context = 0
for i, item_id in enumerate(item_ids):
# Rotate context periodically
if requests_in_context >= requests_per_context:
print(f" Rotating browser context after {requests_in_context} requests...")
await browser.close()
browser, context = await create_stealth_walmart_context(p, proxy_config)
page = await context.new_page()
requests_in_context = 0
await asyncio.sleep(3)
url = f"https://www.walmart.com/ip/{item_id}"
print(f" [{i+1}/{len(item_ids)}] Scraping item {item_id}...")
try:
await page.goto(url, wait_until="networkidle", timeout=30000)
await asyncio.sleep(random.uniform(2, 4))
# Extract __NEXT_DATA__
next_data_text = await page.evaluate("""
() => {
const el = document.getElementById('__NEXT_DATA__');
return el ? el.textContent : null;
}
""")
product = None
if next_data_text:
nd = json.loads(next_data_text)
raw_product = (
nd.get("props", {})
.get("pageProps", {})
.get("initialData", {})
.get("data", {})
.get("product", {})
)
if raw_product:
product = normalize_walmart_product(raw_product, url)
if product and product.get("item_id"):
upsert_product(db, product)
record_snapshot(db, product)
results["success"] += 1
print(f" {product['name'][:40]} | ${product.get('price')} | "
f"{'In Stock' if product.get('in_stock') else 'Out of Stock'}")
else:
results["failed"] += 1
print(f" Could not extract product data")
requests_in_context += 1
except Exception as e:
print(f" Error: {e}")
results["failed"] += 1
requests_in_context += 1
# Randomized delay between products
delay = random.uniform(3, 7)
await asyncio.sleep(delay)
await browser.close()
# Check for price alerts
alerts = get_pending_alerts(db)
if alerts:
print(f"\nPrice alerts ({len(alerts)}):")
for alert in alerts:
print(f" {alert['type']}: {alert['name'][:40]}")
print(f" ${alert['old_price']} -> ${alert['new_price']} ({alert['change_pct']:+.1f}%)")
results["price_drops"] = [a for a in alerts if a["type"] == "DROP"]
db.close()
return results
Category and Department Scraping
For broader market research, scrape entire product categories:
async def scrape_walmart_category(
category_url: str,
max_pages: int = 10,
proxy: dict = None,
) -> list[dict]:
"""
Scrape a Walmart category page (e.g., /browse/electronics).
Handles infinite scroll / pagination.
"""
all_items = []
async with async_playwright() as p:
browser, context = await create_stealth_walmart_context(p, proxy)
page = await context.new_page()
for pg in range(1, max_pages + 1):
url = f"{category_url}?page={pg}" if "?" not in category_url else f"{category_url}&page={pg}"
await page.goto(url, wait_until="networkidle", timeout=30000)
await asyncio.sleep(3)
# Extract product grid
items = await page.evaluate("""
() => {
const cards = document.querySelectorAll('[data-item-id]');
return Array.from(cards).map(card => {
const titleEl = card.querySelector('[data-automation-id="product-title"]');
const priceEl = card.querySelector('[data-automation-id="product-price"] .f2');
const linkEl = card.querySelector('a[href*="/ip/"]');
return {
item_id: card.getAttribute('data-item-id'),
name: titleEl ? titleEl.textContent.trim() : '',
price_text: priceEl ? priceEl.textContent.trim() : '',
url: linkEl ? 'https://www.walmart.com' + linkEl.getAttribute('href') : '',
};
}).filter(i => i.item_id && i.name);
}
""")
if not items:
print(f" Page {pg}: no items found, stopping")
break
all_items.extend(items)
print(f" Page {pg}: {len(items)} items (total: {len(all_items)})")
await asyncio.sleep(random.uniform(3, 6))
await browser.close()
return all_items
Common Gotchas
Walmart's prices are geo-locked. If you're accessing from outside the US, many products return null prices or different pricing. Always use US residential proxies. ThorData with country-us targeting ensures you see US pricing.
Third-party sellers. Walmart Marketplace has millions of third-party products alongside Walmart's own inventory. The seller_name and seller_id fields distinguish them. Third-party sellers often have different return policies and pricing logic.
Item ID vs URL slug. Walmart product URLs include a human-readable slug that changes when names change, but the numeric item ID at the end is permanent. Use item ID as your primary key.
Availability status granularity. Don't just check "IN_STOCK" vs "OUT_OF_STOCK". The availability field can return IN_STOCK, OUT_OF_STOCK, UNAVAILABLE, LIMITED_QUANTITY, ROLLBACK, and others. Map these appropriately for your use case.
The was_price vs rollback distinction. A wasPrice field indicates a price was reduced from an original. A "ROLLBACK" badge means a temporary promotional price. These have different product longevity implications.
Anti-bot detection evolves. PerimeterX updates their detection methods regularly. If your scraper suddenly starts failing at scale, inspect what's changed in the page's JavaScript before diving into complex evasion — sometimes a simple header update is all that's needed.
Legal Considerations
Walmart's Terms of Use prohibit scraping. The hiQ Labs v. LinkedIn (2022) Ninth Circuit ruling established that scraping publicly accessible data doesn't automatically violate the Computer Fraud and Abuse Act. Walmart's public product pages are accessible to anyone — the terms create a contractual restriction, not a criminal one.
For commercial use at scale, consider Walmart's official affiliate and partner APIs — the Walmart Affiliate API provides product data with proper authorization. For research, competitive analysis, and personal monitoring at reasonable volumes, keep your scraping rates conservative, don't hammer their servers, and store only what you need for your specific use case.
Conclusion
Walmart scraping in 2026 requires real browser automation for reliable results — the PerimeterX bot detection blocks naive HTTP clients quickly. Playwright gives you a genuine Chromium fingerprint, and the embedded __NEXT_DATA__ JSON is a cleaner extraction target than parsing rendered HTML. For production monitoring at scale, ThorData residential proxies paired with context rotation every 30-40 requests keeps you under detection thresholds. The price tracking database design shown here handles the core use case well: track it daily, alert on drops, and you have actionable competitor intelligence with a few days of data.