Scraping Instacart Grocery Prices with Python (2026)
Instacart shows real-time grocery prices from multiple stores in your area — Costco, Kroger, Safeway, Aldi, and dozens more. That makes it a goldmine for price comparison data. The problem is there's no public API, and they actively fight scrapers.
Here's how to extract product prices, availability, and deal data from Instacart using Python.
How Instacart Structures Data
Instacart is a Next.js app that hydrates from server-rendered HTML. Product data lives in two places: embedded JSON-LD in the initial HTML, and XHR calls to their internal GraphQL API at https://www.instacart.com/graphql.
The HTML approach is simpler but gives you less data. The GraphQL approach gives you everything — prices, unit prices, stock status, store-specific pricing, and active coupons — but requires valid session cookies.
Understanding the data model first saves a lot of debugging time:
- Zones: Geographic service areas (e.g., "San Francisco Bay Area")
- Retailers: Individual store brands available in your zone
- Store locations: Specific stores within a retailer brand (e.g., Safeway at 123 Main St)
- Products: Items with retailer-specific pricing (same SKU can have different prices at different stores)
- Aisles: Category groupings within a store
Basic Product Scraping from HTML
For a quick start, parse the structured data Instacart embeds on product pages:
import httpx
from selectolax.parser import HTMLParser
import json
import re
import time
import random
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-CH-UA": '"Google Chrome";v="126", "Chromium";v="126"',
"Sec-CH-UA-Mobile": "?0",
"Sec-CH-UA-Platform": '"macOS"',
}
def scrape_product_page(url: str, proxy: str | None = None) -> dict | None:
transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
client = httpx.Client(headers=HEADERS, transport=transport,
follow_redirects=True, timeout=25)
try:
r = client.get(url)
if r.status_code != 200:
return None
finally:
client.close()
tree = HTMLParser(r.text)
# Extract JSON-LD product data
for script in tree.css('script[type="application/ld+json"]'):
try:
data = json.loads(script.text())
if data.get("@type") == "Product":
return {
"name": data.get("name"),
"brand": data.get("brand", {}).get("name"),
"price": data.get("offers", {}).get("price"),
"currency": data.get("offers", {}).get("priceCurrency"),
"availability": data.get("offers", {}).get("availability"),
"image": data.get("image"),
"description": data.get("description"),
"sku": data.get("sku"),
"gtin": data.get("gtin13") or data.get("gtin12"),
}
except json.JSONDecodeError:
continue
# Fallback: extract from Next.js __NEXT_DATA__
next_data_script = tree.css_first("script#__NEXT_DATA__")
if next_data_script:
try:
next_data = json.loads(next_data_script.text())
product = (next_data
.get("props", {})
.get("pageProps", {})
.get("product", {}))
if product:
return {
"name": product.get("name"),
"brand": product.get("brand"),
"price": product.get("price"),
"unit_price": product.get("unitPrice"),
"in_stock": product.get("inStock"),
"size": product.get("size"),
}
except json.JSONDecodeError:
pass
return None
Using the Internal GraphQL API
For richer data — especially cross-store price comparison — hit the GraphQL endpoint directly. This requires session cookies from a logged-in (or location-set) browser session.
Getting the cookies: open DevTools in your browser while on Instacart, go to Application > Cookies, and copy _instacart_session. Also note the reese84 cookie value if present — this is Imperva's bot detection token.
class InstacartScraper:
GRAPHQL_URL = "https://www.instacart.com/graphql"
def __init__(self, session_cookie: str, postal_code: str = "94105",
proxy: str | None = None):
transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
self.client = httpx.Client(
headers={
"Content-Type": "application/json",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 Chrome/126.0.0.0 Safari/537.36"
),
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
"X-Client-Identifier": "web",
"Origin": "https://www.instacart.com",
"Referer": "https://www.instacart.com/",
},
cookies={"_instacart_session": session_cookie},
transport=transport,
timeout=25,
)
self.postal_code = postal_code
def search_products(self, query: str, store_id: str,
limit: int = 20, offset: int = 0) -> list[dict]:
"""Search for products within a specific store."""
payload = {
"operationName": "SearchResultsPlacements",
"variables": {
"query": query,
"storeId": store_id,
"first": limit,
"after": str(offset),
"postal_code": self.postal_code,
"includeDetails": True,
},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "search_results_hash_placeholder",
},
},
}
r = self.client.post(self.GRAPHQL_URL, json=payload)
if r.status_code == 401:
raise ValueError("Session expired — refresh cookies")
r.raise_for_status()
data = r.json()
items = []
placements = (data.get("data", {})
.get("searchResultsPlacements", {})
.get("placements", []))
for placement in placements:
for product in placement.get("products", []):
items.append(self._normalize_product(product, store_id))
return items
def _normalize_product(self, product: dict, store_id: str) -> dict:
"""Normalize a product dict from GraphQL response."""
# Price may be nested differently depending on store
price_raw = (product.get("price")
or product.get("displayPrice")
or product.get("originalPrice", ""))
price_clean = re.sub(r"[^\d.]", "", str(price_raw)) if price_raw else ""
unit_price_raw = product.get("pricePerUnit", "")
unit_price = re.sub(r"[^\d./oz lb]", "", str(unit_price_raw)) if unit_price_raw else ""
return {
"name": product.get("name", ""),
"brand": product.get("brand", ""),
"price": price_clean,
"unit_price": unit_price,
"size": product.get("size", ""),
"in_stock": product.get("inStock", product.get("available", False)),
"store_id": store_id,
"product_id": product.get("id", ""),
"image_url": product.get("imageUrl", ""),
"categories": product.get("categories", []),
}
def get_store_list(self) -> list[dict]:
"""Get available stores for the configured postal code."""
payload = {
"operationName": "GetRetailers",
"variables": {
"postal_code": self.postal_code,
"showNearby": True,
},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "retailers_hash_placeholder",
},
},
}
r = self.client.post(self.GRAPHQL_URL, json=payload)
r.raise_for_status()
stores = []
retailers = (r.json().get("data", {})
.get("retailers", {})
.get("retailers", []))
for ret in retailers:
stores.append({
"id": ret.get("id"),
"slug": ret.get("slug"),
"name": ret.get("name"),
"logo_url": ret.get("logoUrl"),
"delivery_fee": ret.get("deliveryFee"),
"min_order": ret.get("minOrderAmount"),
})
return stores
Cross-Store Price Comparison
The real value is comparing prices for the same product across multiple stores. Same-item price differences on Instacart can be 30-50% for identical products:
def compare_prices(
scraper: InstacartScraper,
product_name: str,
store_ids: list[str],
delay_range: tuple = (2.0, 4.0),
) -> list[dict]:
"""Compare prices for a product across multiple stores."""
all_results = []
for store_id in store_ids:
try:
products = scraper.search_products(product_name, store_id, limit=5)
for p in products:
# Only keep results that actually match (basic name check)
search_words = set(product_name.lower().split())
product_words = set(p["name"].lower().split())
if len(search_words & product_words) >= 2:
all_results.append(p)
except httpx.HTTPStatusError as e:
print(f"Store {store_id} failed: {e.response.status_code}")
if e.response.status_code == 429:
time.sleep(30)
delay = random.uniform(*delay_range)
time.sleep(delay)
# Sort by numeric price
def price_sort_key(x):
try:
return float(x.get("price", "999") or "999")
except ValueError:
return 999.0
all_results.sort(key=price_sort_key)
return all_results
def price_comparison_report(results: list[dict], product_query: str) -> str:
"""Generate a text comparison table."""
if not results:
return "No results found."
lines = [f"\nPrice comparison: '{product_query}'", "-" * 60]
for r in results:
in_stock = "IN STOCK" if r.get("in_stock") else "out of stock"
unit = f" ({r['unit_price']})" if r.get("unit_price") else ""
lines.append(
f" {r['store_id']:>12}: ${r['price']:<8} {unit:<20} {r['name'][:30]:<30} [{in_stock}]"
)
return "\n".join(lines)
# Example usage
stores = ["costco", "kroger", "safeway", "aldi", "target", "whole-foods"]
results = compare_prices(scraper, "organic whole milk gallon", stores)
print(price_comparison_report(results, "organic whole milk gallon"))
Dealing with Anti-Bot Measures
Instacart uses Imperva (formerly Incapsula) for bot detection, plus their own fingerprinting layer.
The Imperva reese84 cookie problem. On first visit from a new IP, Imperva serves a JavaScript challenge that must execute in a real browser to generate a valid reese84 cookie. Plain HTTP clients like httpx can't solve this — you need either:
- A real browser session (Playwright/Puppeteer) to generate the initial cookie
- A proxy provider that pre-solves these challenges
- Reusing an existing
reese84value (they expire but last several hours)
Getting a valid reese84 cookie with Playwright:
from playwright.sync_api import sync_playwright
def get_instacart_cookies(postal_code: str = "94105") -> dict:
"""Launch a real browser to get valid Instacart session cookies."""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Safari/537.36"
),
locale="en-US",
timezone_id="America/Los_Angeles",
)
page = context.new_page()
# Visit Instacart and set location
page.goto("https://www.instacart.com", wait_until="networkidle")
page.wait_for_timeout(2000)
# Extract cookies
cookies = {c["name"]: c["value"] for c in context.cookies()}
browser.close()
return cookies
IP reputation scoring. Datacenter IPs get blocked immediately. Even many "residential" proxy providers have IPs that Instacart has already flagged. You need clean residential proxies with high reputation scores.
ThorData's residential proxies work particularly well for Instacart because they offer city-level geo-targeting — important since Instacart pricing is zip-code specific and you need IPs that match the delivery area you're scraping. A San Francisco zip code should use Bay Area residential IPs, not random US IPs.
# ThorData proxy with city-level targeting
def get_proxy(country: str = "US", city: str = "") -> str:
"""Build ThorData proxy URL with optional geo-targeting."""
user = "YOUR_THORDATA_USER"
password = "YOUR_THORDATA_PASS"
if city:
return f"http://{user}-country-{country}-city-{city}:{password}@proxy.thordata.com:9000"
return f"http://{user}-country-{country}:{password}@proxy.thordata.com:9000"
# Match proxy location to postal code
proxy = get_proxy(country="US", city="SanFrancisco")
scraper = InstacartScraper(session_cookie, postal_code="94105", proxy=proxy)
Session fingerprinting. Instacart ties sessions to device fingerprints. Keep your proxy IP, user agent, and cookies consistent within each scraping session. Don't reuse a session that was initialized on one IP with a different IP later.
Rate limiting. More than 1 request per 2 seconds from the same session triggers soft blocks — empty results instead of explicit 403s. Space requests 2-4 seconds apart, with occasional longer pauses.
Paginating Product Search Results
For scraping a complete category or all products from a store, you need to paginate:
def scrape_full_category(
scraper: InstacartScraper,
store_id: str,
category_slug: str,
max_products: int = 500,
) -> list[dict]:
"""Scrape all products from a specific category in a store."""
all_products = []
offset = 0
page_size = 30
while offset < max_products:
try:
# Category browsing uses a different GraphQL operation
payload = {
"operationName": "BrowseAislePlacements",
"variables": {
"storeId": store_id,
"slug": category_slug,
"first": page_size,
"after": str(offset),
"postal_code": scraper.postal_code,
},
"extensions": {
"persistedQuery": {"version": 1, "sha256Hash": "aisle_hash"},
},
}
r = scraper.client.post(scraper.GRAPHQL_URL, json=payload)
if r.status_code == 429:
print("Rate limited — sleeping 30s")
time.sleep(30)
continue
r.raise_for_status()
data = r.json()
products = (data.get("data", {})
.get("browseAislePlacements", {})
.get("products", []))
if not products:
break
for p in products:
all_products.append(scraper._normalize_product(p, store_id))
offset += page_size
print(f" Fetched {len(all_products)} products from {category_slug}")
time.sleep(random.uniform(2, 4))
except Exception as e:
print(f"Error at offset {offset}: {e}")
time.sleep(10)
break
return all_products
Tracking Deals and Sales
Instacart has a dedicated deals section per store. You can monitor active coupons and sale prices:
def get_store_deals(scraper: InstacartScraper, store_id: str) -> list[dict]:
"""Get active coupons and sale prices for a store."""
payload = {
"operationName": "StoreCoupons",
"variables": {
"storeId": store_id,
"first": 50,
},
"extensions": {
"persistedQuery": {"version": 1, "sha256Hash": "coupons_hash_placeholder"},
},
}
r = scraper.client.post(scraper.GRAPHQL_URL, json=payload)
r.raise_for_status()
deals = []
edges = (r.json().get("data", {})
.get("storeCoupons", {})
.get("edges", []))
for edge in edges:
coupon = edge.get("node", {})
deals.append({
"description": coupon.get("description"),
"discount": coupon.get("discountText"),
"discount_type": coupon.get("discountType"),
"min_purchase": coupon.get("minimumPurchase"),
"max_discount": coupon.get("maxDiscount"),
"expiry": coupon.get("expiresAt"),
"products": [p.get("name") for p in coupon.get("products", [])],
"coupon_id": coupon.get("id"),
})
return deals
def monitor_deals(scraper: InstacartScraper, store_ids: list[str],
db_conn: sqlite3.Connection):
"""Track deals across stores over time."""
now = datetime.utcnow().isoformat()
db_conn.execute("""
CREATE TABLE IF NOT EXISTS deals (
id TEXT,
store_id TEXT,
description TEXT,
discount TEXT,
discount_type TEXT,
min_purchase REAL,
expiry TEXT,
products TEXT,
seen_at TEXT,
PRIMARY KEY (id, store_id)
)
""")
for store_id in store_ids:
deals = get_store_deals(scraper, store_id)
for deal in deals:
db_conn.execute(
"INSERT OR REPLACE INTO deals VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(deal.get("coupon_id", ""), store_id, deal["description"],
deal["discount"], deal.get("discount_type"),
float(deal["min_purchase"] or 0) if deal.get("min_purchase") else None,
deal.get("expiry"),
json.dumps(deal["products"]), now)
)
db_conn.commit()
print(f"{store_id}: {len(deals)} active deals")
time.sleep(random.uniform(2, 4))
Storing Price History in SQLite
For price tracking over time, use SQLite with timestamps:
import sqlite3
from datetime import datetime
def init_grocery_db(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS products (
product_id TEXT,
store_id TEXT,
name TEXT,
brand TEXT,
size TEXT,
image_url TEXT,
PRIMARY KEY (product_id, store_id)
);
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
store_id TEXT,
price REAL,
unit_price TEXT,
in_stock INTEGER,
scraped_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_price_history_product
ON price_history(product_id, store_id, scraped_at);
CREATE TABLE IF NOT EXISTS price_alerts (
product_id TEXT,
store_id TEXT,
target_price REAL,
alert_email TEXT,
created_at TEXT,
PRIMARY KEY (product_id, store_id, target_price)
);
""")
conn.commit()
return conn
def store_prices(db_conn: sqlite3.Connection, products: list[dict]):
"""Store current prices with timestamp."""
now = datetime.utcnow().isoformat()
for p in products:
# Upsert product metadata
db_conn.execute(
"INSERT OR REPLACE INTO products (product_id, store_id, name, brand, size, image_url) "
"VALUES (?, ?, ?, ?, ?, ?)",
(p.get("product_id", p["name"]), p["store_id"],
p["name"], p.get("brand"), p.get("size"), p.get("image_url"))
)
# Record price snapshot
try:
price_val = float(p["price"]) if p.get("price") else None
except ValueError:
price_val = None
if price_val is not None:
db_conn.execute(
"INSERT INTO price_history (product_id, store_id, price, unit_price, in_stock, scraped_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(p.get("product_id", p["name"]), p["store_id"],
price_val, p.get("unit_price"),
int(p.get("in_stock", False)), now)
)
db_conn.commit()
def get_price_trend(db_conn: sqlite3.Connection,
product_id: str, store_id: str,
days: int = 30) -> list[dict]:
"""Get price history for a product over the past N days."""
rows = db_conn.execute("""
SELECT price, unit_price, in_stock, scraped_at
FROM price_history
WHERE product_id = ? AND store_id = ?
AND scraped_at >= datetime('now', ?)
ORDER BY scraped_at
""", (product_id, store_id, f"-{days} days")).fetchall()
return [
{"price": r[0], "unit_price": r[1], "in_stock": bool(r[2]), "date": r[3]}
for r in rows
]
def find_price_drops(db_conn: sqlite3.Connection,
threshold_pct: float = 10.0) -> list[dict]:
"""Find products whose price has dropped by threshold_pct recently."""
# Compare most recent price to 7-day ago price
rows = db_conn.execute("""
WITH latest AS (
SELECT product_id, store_id, price, scraped_at
FROM price_history
WHERE scraped_at = (
SELECT MAX(scraped_at) FROM price_history h2
WHERE h2.product_id = price_history.product_id
AND h2.store_id = price_history.store_id
)
),
week_ago AS (
SELECT product_id, store_id, AVG(price) as avg_price
FROM price_history
WHERE scraped_at BETWEEN datetime('now', '-8 days')
AND datetime('now', '-6 days')
GROUP BY product_id, store_id
)
SELECT
l.product_id, l.store_id, p.name,
l.price as current_price,
w.avg_price as prev_price,
ROUND((w.avg_price - l.price) / w.avg_price * 100, 1) as drop_pct
FROM latest l
JOIN week_ago w ON l.product_id = w.product_id AND l.store_id = w.store_id
JOIN products p ON l.product_id = p.product_id AND l.store_id = p.store_id
WHERE drop_pct >= ?
ORDER BY drop_pct DESC
""", (threshold_pct,)).fetchall()
return [
{"product_id": r[0], "store_id": r[1], "name": r[2],
"current_price": r[3], "prev_price": r[4], "drop_pct": r[5]}
for r in rows
]
Practical Tips
Zip code matters more than you think. The same item at the same store chain can be priced differently by location. Instacart reflects actual store-level pricing, so a Safeway in San Francisco may charge different prices than a Safeway in Sacramento. Always set a specific delivery address, not just a city.
Stock changes hourly. Instacart reflects real-time store inventory. If you're tracking availability, scrape at consistent times each day for comparable data. Early morning tends to show more accurate stock than late evening.
Unit prices are your friend. The pricePerUnit field lets you do true apples-to-apples comparisons across different package sizes. A 64oz bottle at $4.99 ($0.078/oz) is cheaper than a 32oz at $2.99 ($0.093/oz) even though the sticker price is higher.
Store IDs change. Instacart sometimes reassigns store identifiers when a location changes or a chain rebrands. Store metadata periodically and validate your store IDs each week.
Pagination limits. GraphQL queries typically return max 30 products per request. For full category scrapes, calculate the total count first and plan your pagination loop accordingly.
Sessions expire. The _instacart_session cookie typically lasts 24-72 hours. Build logic to detect expiry (watch for 401 responses or empty results on known products) and refresh the cookie before it causes pipeline failures.
Grocery price data is useful for personal budgeting tools, competitive intelligence for CPG brands, regional cost-of-living analysis, and building deal-alert services. Keep your scraping volume reasonable — pulling a full store catalog every hour isn't necessary and will get you blocked faster than any other behavior.