← Back to blog

Scraping Instacart Grocery Prices with Python (2026)

Instacart shows real-time grocery prices from multiple stores in your area — Costco, Kroger, Safeway, Aldi, and dozens more. That makes it a goldmine for price comparison data. The problem is there's no public API, and they actively fight scrapers.

Here's how to extract product prices, availability, and deal data from Instacart using Python.

How Instacart Structures Data

Instacart is a Next.js app that hydrates from server-rendered HTML. Product data lives in two places: embedded JSON-LD in the initial HTML, and XHR calls to their internal GraphQL API at https://www.instacart.com/graphql.

The HTML approach is simpler but gives you less data. The GraphQL approach gives you everything — prices, unit prices, stock status, store-specific pricing, and active coupons — but requires valid session cookies.

Understanding the data model first saves a lot of debugging time:

Basic Product Scraping from HTML

For a quick start, parse the structured data Instacart embeds on product pages:

import httpx
from selectolax.parser import HTMLParser
import json
import re
import time
import random

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/126.0.0.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
    "DNT": "1",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "none",
    "Sec-CH-UA": '"Google Chrome";v="126", "Chromium";v="126"',
    "Sec-CH-UA-Mobile": "?0",
    "Sec-CH-UA-Platform": '"macOS"',
}


def scrape_product_page(url: str, proxy: str | None = None) -> dict | None:
    transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
    client = httpx.Client(headers=HEADERS, transport=transport,
                          follow_redirects=True, timeout=25)

    try:
        r = client.get(url)
        if r.status_code != 200:
            return None
    finally:
        client.close()

    tree = HTMLParser(r.text)

    # Extract JSON-LD product data
    for script in tree.css('script[type="application/ld+json"]'):
        try:
            data = json.loads(script.text())
            if data.get("@type") == "Product":
                return {
                    "name": data.get("name"),
                    "brand": data.get("brand", {}).get("name"),
                    "price": data.get("offers", {}).get("price"),
                    "currency": data.get("offers", {}).get("priceCurrency"),
                    "availability": data.get("offers", {}).get("availability"),
                    "image": data.get("image"),
                    "description": data.get("description"),
                    "sku": data.get("sku"),
                    "gtin": data.get("gtin13") or data.get("gtin12"),
                }
        except json.JSONDecodeError:
            continue

    # Fallback: extract from Next.js __NEXT_DATA__
    next_data_script = tree.css_first("script#__NEXT_DATA__")
    if next_data_script:
        try:
            next_data = json.loads(next_data_script.text())
            product = (next_data
                       .get("props", {})
                       .get("pageProps", {})
                       .get("product", {}))
            if product:
                return {
                    "name": product.get("name"),
                    "brand": product.get("brand"),
                    "price": product.get("price"),
                    "unit_price": product.get("unitPrice"),
                    "in_stock": product.get("inStock"),
                    "size": product.get("size"),
                }
        except json.JSONDecodeError:
            pass

    return None

Using the Internal GraphQL API

For richer data — especially cross-store price comparison — hit the GraphQL endpoint directly. This requires session cookies from a logged-in (or location-set) browser session.

Getting the cookies: open DevTools in your browser while on Instacart, go to Application > Cookies, and copy _instacart_session. Also note the reese84 cookie value if present — this is Imperva's bot detection token.

class InstacartScraper:
    GRAPHQL_URL = "https://www.instacart.com/graphql"

    def __init__(self, session_cookie: str, postal_code: str = "94105",
                 proxy: str | None = None):
        transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
        self.client = httpx.Client(
            headers={
                "Content-Type": "application/json",
                "User-Agent": (
                    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                    "AppleWebKit/537.36 Chrome/126.0.0.0 Safari/537.36"
                ),
                "Accept": "application/json",
                "Accept-Language": "en-US,en;q=0.9",
                "X-Client-Identifier": "web",
                "Origin": "https://www.instacart.com",
                "Referer": "https://www.instacart.com/",
            },
            cookies={"_instacart_session": session_cookie},
            transport=transport,
            timeout=25,
        )
        self.postal_code = postal_code

    def search_products(self, query: str, store_id: str,
                        limit: int = 20, offset: int = 0) -> list[dict]:
        """Search for products within a specific store."""
        payload = {
            "operationName": "SearchResultsPlacements",
            "variables": {
                "query": query,
                "storeId": store_id,
                "first": limit,
                "after": str(offset),
                "postal_code": self.postal_code,
                "includeDetails": True,
            },
            "extensions": {
                "persistedQuery": {
                    "version": 1,
                    "sha256Hash": "search_results_hash_placeholder",
                },
            },
        }

        r = self.client.post(self.GRAPHQL_URL, json=payload)

        if r.status_code == 401:
            raise ValueError("Session expired — refresh cookies")
        r.raise_for_status()

        data = r.json()
        items = []
        placements = (data.get("data", {})
                      .get("searchResultsPlacements", {})
                      .get("placements", []))

        for placement in placements:
            for product in placement.get("products", []):
                items.append(self._normalize_product(product, store_id))

        return items

    def _normalize_product(self, product: dict, store_id: str) -> dict:
        """Normalize a product dict from GraphQL response."""
        # Price may be nested differently depending on store
        price_raw = (product.get("price")
                     or product.get("displayPrice")
                     or product.get("originalPrice", ""))
        price_clean = re.sub(r"[^\d.]", "", str(price_raw)) if price_raw else ""

        unit_price_raw = product.get("pricePerUnit", "")
        unit_price = re.sub(r"[^\d./oz lb]", "", str(unit_price_raw)) if unit_price_raw else ""

        return {
            "name": product.get("name", ""),
            "brand": product.get("brand", ""),
            "price": price_clean,
            "unit_price": unit_price,
            "size": product.get("size", ""),
            "in_stock": product.get("inStock", product.get("available", False)),
            "store_id": store_id,
            "product_id": product.get("id", ""),
            "image_url": product.get("imageUrl", ""),
            "categories": product.get("categories", []),
        }

    def get_store_list(self) -> list[dict]:
        """Get available stores for the configured postal code."""
        payload = {
            "operationName": "GetRetailers",
            "variables": {
                "postal_code": self.postal_code,
                "showNearby": True,
            },
            "extensions": {
                "persistedQuery": {
                    "version": 1,
                    "sha256Hash": "retailers_hash_placeholder",
                },
            },
        }
        r = self.client.post(self.GRAPHQL_URL, json=payload)
        r.raise_for_status()

        stores = []
        retailers = (r.json().get("data", {})
                     .get("retailers", {})
                     .get("retailers", []))

        for ret in retailers:
            stores.append({
                "id": ret.get("id"),
                "slug": ret.get("slug"),
                "name": ret.get("name"),
                "logo_url": ret.get("logoUrl"),
                "delivery_fee": ret.get("deliveryFee"),
                "min_order": ret.get("minOrderAmount"),
            })

        return stores

Cross-Store Price Comparison

The real value is comparing prices for the same product across multiple stores. Same-item price differences on Instacart can be 30-50% for identical products:

def compare_prices(
    scraper: InstacartScraper,
    product_name: str,
    store_ids: list[str],
    delay_range: tuple = (2.0, 4.0),
) -> list[dict]:
    """Compare prices for a product across multiple stores."""
    all_results = []

    for store_id in store_ids:
        try:
            products = scraper.search_products(product_name, store_id, limit=5)
            for p in products:
                # Only keep results that actually match (basic name check)
                search_words = set(product_name.lower().split())
                product_words = set(p["name"].lower().split())
                if len(search_words & product_words) >= 2:
                    all_results.append(p)
        except httpx.HTTPStatusError as e:
            print(f"Store {store_id} failed: {e.response.status_code}")
            if e.response.status_code == 429:
                time.sleep(30)

        delay = random.uniform(*delay_range)
        time.sleep(delay)

    # Sort by numeric price
    def price_sort_key(x):
        try:
            return float(x.get("price", "999") or "999")
        except ValueError:
            return 999.0

    all_results.sort(key=price_sort_key)
    return all_results


def price_comparison_report(results: list[dict], product_query: str) -> str:
    """Generate a text comparison table."""
    if not results:
        return "No results found."

    lines = [f"\nPrice comparison: '{product_query}'", "-" * 60]
    for r in results:
        in_stock = "IN STOCK" if r.get("in_stock") else "out of stock"
        unit = f" ({r['unit_price']})" if r.get("unit_price") else ""
        lines.append(
            f"  {r['store_id']:>12}: ${r['price']:<8} {unit:<20} {r['name'][:30]:<30} [{in_stock}]"
        )
    return "\n".join(lines)


# Example usage
stores = ["costco", "kroger", "safeway", "aldi", "target", "whole-foods"]
results = compare_prices(scraper, "organic whole milk gallon", stores)
print(price_comparison_report(results, "organic whole milk gallon"))

Dealing with Anti-Bot Measures

Instacart uses Imperva (formerly Incapsula) for bot detection, plus their own fingerprinting layer.

The Imperva reese84 cookie problem. On first visit from a new IP, Imperva serves a JavaScript challenge that must execute in a real browser to generate a valid reese84 cookie. Plain HTTP clients like httpx can't solve this — you need either:

  1. A real browser session (Playwright/Puppeteer) to generate the initial cookie
  2. A proxy provider that pre-solves these challenges
  3. Reusing an existing reese84 value (they expire but last several hours)

Getting a valid reese84 cookie with Playwright:

from playwright.sync_api import sync_playwright

def get_instacart_cookies(postal_code: str = "94105") -> dict:
    """Launch a real browser to get valid Instacart session cookies."""
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        context = browser.new_context(
            user_agent=(
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
            locale="en-US",
            timezone_id="America/Los_Angeles",
        )
        page = context.new_page()

        # Visit Instacart and set location
        page.goto("https://www.instacart.com", wait_until="networkidle")
        page.wait_for_timeout(2000)

        # Extract cookies
        cookies = {c["name"]: c["value"] for c in context.cookies()}
        browser.close()

    return cookies

IP reputation scoring. Datacenter IPs get blocked immediately. Even many "residential" proxy providers have IPs that Instacart has already flagged. You need clean residential proxies with high reputation scores.

ThorData's residential proxies work particularly well for Instacart because they offer city-level geo-targeting — important since Instacart pricing is zip-code specific and you need IPs that match the delivery area you're scraping. A San Francisco zip code should use Bay Area residential IPs, not random US IPs.

# ThorData proxy with city-level targeting
def get_proxy(country: str = "US", city: str = "") -> str:
    """Build ThorData proxy URL with optional geo-targeting."""
    user = "YOUR_THORDATA_USER"
    password = "YOUR_THORDATA_PASS"
    if city:
        return f"http://{user}-country-{country}-city-{city}:{password}@proxy.thordata.com:9000"
    return f"http://{user}-country-{country}:{password}@proxy.thordata.com:9000"

# Match proxy location to postal code
proxy = get_proxy(country="US", city="SanFrancisco")
scraper = InstacartScraper(session_cookie, postal_code="94105", proxy=proxy)

Session fingerprinting. Instacart ties sessions to device fingerprints. Keep your proxy IP, user agent, and cookies consistent within each scraping session. Don't reuse a session that was initialized on one IP with a different IP later.

Rate limiting. More than 1 request per 2 seconds from the same session triggers soft blocks — empty results instead of explicit 403s. Space requests 2-4 seconds apart, with occasional longer pauses.

Paginating Product Search Results

For scraping a complete category or all products from a store, you need to paginate:

def scrape_full_category(
    scraper: InstacartScraper,
    store_id: str,
    category_slug: str,
    max_products: int = 500,
) -> list[dict]:
    """Scrape all products from a specific category in a store."""
    all_products = []
    offset = 0
    page_size = 30

    while offset < max_products:
        try:
            # Category browsing uses a different GraphQL operation
            payload = {
                "operationName": "BrowseAislePlacements",
                "variables": {
                    "storeId": store_id,
                    "slug": category_slug,
                    "first": page_size,
                    "after": str(offset),
                    "postal_code": scraper.postal_code,
                },
                "extensions": {
                    "persistedQuery": {"version": 1, "sha256Hash": "aisle_hash"},
                },
            }

            r = scraper.client.post(scraper.GRAPHQL_URL, json=payload)
            if r.status_code == 429:
                print("Rate limited — sleeping 30s")
                time.sleep(30)
                continue
            r.raise_for_status()
            data = r.json()

            products = (data.get("data", {})
                        .get("browseAislePlacements", {})
                        .get("products", []))

            if not products:
                break

            for p in products:
                all_products.append(scraper._normalize_product(p, store_id))

            offset += page_size
            print(f"  Fetched {len(all_products)} products from {category_slug}")
            time.sleep(random.uniform(2, 4))

        except Exception as e:
            print(f"Error at offset {offset}: {e}")
            time.sleep(10)
            break

    return all_products

Tracking Deals and Sales

Instacart has a dedicated deals section per store. You can monitor active coupons and sale prices:

def get_store_deals(scraper: InstacartScraper, store_id: str) -> list[dict]:
    """Get active coupons and sale prices for a store."""
    payload = {
        "operationName": "StoreCoupons",
        "variables": {
            "storeId": store_id,
            "first": 50,
        },
        "extensions": {
            "persistedQuery": {"version": 1, "sha256Hash": "coupons_hash_placeholder"},
        },
    }

    r = scraper.client.post(scraper.GRAPHQL_URL, json=payload)
    r.raise_for_status()

    deals = []
    edges = (r.json().get("data", {})
             .get("storeCoupons", {})
             .get("edges", []))

    for edge in edges:
        coupon = edge.get("node", {})
        deals.append({
            "description": coupon.get("description"),
            "discount": coupon.get("discountText"),
            "discount_type": coupon.get("discountType"),
            "min_purchase": coupon.get("minimumPurchase"),
            "max_discount": coupon.get("maxDiscount"),
            "expiry": coupon.get("expiresAt"),
            "products": [p.get("name") for p in coupon.get("products", [])],
            "coupon_id": coupon.get("id"),
        })

    return deals


def monitor_deals(scraper: InstacartScraper, store_ids: list[str],
                  db_conn: sqlite3.Connection):
    """Track deals across stores over time."""
    now = datetime.utcnow().isoformat()

    db_conn.execute("""
        CREATE TABLE IF NOT EXISTS deals (
            id TEXT,
            store_id TEXT,
            description TEXT,
            discount TEXT,
            discount_type TEXT,
            min_purchase REAL,
            expiry TEXT,
            products TEXT,
            seen_at TEXT,
            PRIMARY KEY (id, store_id)
        )
    """)

    for store_id in store_ids:
        deals = get_store_deals(scraper, store_id)
        for deal in deals:
            db_conn.execute(
                "INSERT OR REPLACE INTO deals VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
                (deal.get("coupon_id", ""), store_id, deal["description"],
                 deal["discount"], deal.get("discount_type"),
                 float(deal["min_purchase"] or 0) if deal.get("min_purchase") else None,
                 deal.get("expiry"),
                 json.dumps(deal["products"]), now)
            )
        db_conn.commit()
        print(f"{store_id}: {len(deals)} active deals")
        time.sleep(random.uniform(2, 4))

Storing Price History in SQLite

For price tracking over time, use SQLite with timestamps:

import sqlite3
from datetime import datetime

def init_grocery_db(db_path: str) -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS products (
            product_id      TEXT,
            store_id        TEXT,
            name            TEXT,
            brand           TEXT,
            size            TEXT,
            image_url       TEXT,
            PRIMARY KEY (product_id, store_id)
        );

        CREATE TABLE IF NOT EXISTS price_history (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id      TEXT,
            store_id        TEXT,
            price           REAL,
            unit_price      TEXT,
            in_stock        INTEGER,
            scraped_at      TEXT
        );

        CREATE INDEX IF NOT EXISTS idx_price_history_product
            ON price_history(product_id, store_id, scraped_at);

        CREATE TABLE IF NOT EXISTS price_alerts (
            product_id      TEXT,
            store_id        TEXT,
            target_price    REAL,
            alert_email     TEXT,
            created_at      TEXT,
            PRIMARY KEY (product_id, store_id, target_price)
        );
    """)
    conn.commit()
    return conn


def store_prices(db_conn: sqlite3.Connection, products: list[dict]):
    """Store current prices with timestamp."""
    now = datetime.utcnow().isoformat()

    for p in products:
        # Upsert product metadata
        db_conn.execute(
            "INSERT OR REPLACE INTO products (product_id, store_id, name, brand, size, image_url) "
            "VALUES (?, ?, ?, ?, ?, ?)",
            (p.get("product_id", p["name"]), p["store_id"],
             p["name"], p.get("brand"), p.get("size"), p.get("image_url"))
        )

        # Record price snapshot
        try:
            price_val = float(p["price"]) if p.get("price") else None
        except ValueError:
            price_val = None

        if price_val is not None:
            db_conn.execute(
                "INSERT INTO price_history (product_id, store_id, price, unit_price, in_stock, scraped_at) "
                "VALUES (?, ?, ?, ?, ?, ?)",
                (p.get("product_id", p["name"]), p["store_id"],
                 price_val, p.get("unit_price"),
                 int(p.get("in_stock", False)), now)
            )

    db_conn.commit()


def get_price_trend(db_conn: sqlite3.Connection,
                    product_id: str, store_id: str,
                    days: int = 30) -> list[dict]:
    """Get price history for a product over the past N days."""
    rows = db_conn.execute("""
        SELECT price, unit_price, in_stock, scraped_at
        FROM price_history
        WHERE product_id = ? AND store_id = ?
          AND scraped_at >= datetime('now', ?)
        ORDER BY scraped_at
    """, (product_id, store_id, f"-{days} days")).fetchall()

    return [
        {"price": r[0], "unit_price": r[1], "in_stock": bool(r[2]), "date": r[3]}
        for r in rows
    ]


def find_price_drops(db_conn: sqlite3.Connection,
                     threshold_pct: float = 10.0) -> list[dict]:
    """Find products whose price has dropped by threshold_pct recently."""
    # Compare most recent price to 7-day ago price
    rows = db_conn.execute("""
        WITH latest AS (
            SELECT product_id, store_id, price, scraped_at
            FROM price_history
            WHERE scraped_at = (
                SELECT MAX(scraped_at) FROM price_history h2
                WHERE h2.product_id = price_history.product_id
                  AND h2.store_id = price_history.store_id
            )
        ),
        week_ago AS (
            SELECT product_id, store_id, AVG(price) as avg_price
            FROM price_history
            WHERE scraped_at BETWEEN datetime('now', '-8 days')
                                 AND datetime('now', '-6 days')
            GROUP BY product_id, store_id
        )
        SELECT
            l.product_id, l.store_id, p.name,
            l.price as current_price,
            w.avg_price as prev_price,
            ROUND((w.avg_price - l.price) / w.avg_price * 100, 1) as drop_pct
        FROM latest l
        JOIN week_ago w ON l.product_id = w.product_id AND l.store_id = w.store_id
        JOIN products p ON l.product_id = p.product_id AND l.store_id = p.store_id
        WHERE drop_pct >= ?
        ORDER BY drop_pct DESC
    """, (threshold_pct,)).fetchall()

    return [
        {"product_id": r[0], "store_id": r[1], "name": r[2],
         "current_price": r[3], "prev_price": r[4], "drop_pct": r[5]}
        for r in rows
    ]

Practical Tips

Zip code matters more than you think. The same item at the same store chain can be priced differently by location. Instacart reflects actual store-level pricing, so a Safeway in San Francisco may charge different prices than a Safeway in Sacramento. Always set a specific delivery address, not just a city.

Stock changes hourly. Instacart reflects real-time store inventory. If you're tracking availability, scrape at consistent times each day for comparable data. Early morning tends to show more accurate stock than late evening.

Unit prices are your friend. The pricePerUnit field lets you do true apples-to-apples comparisons across different package sizes. A 64oz bottle at $4.99 ($0.078/oz) is cheaper than a 32oz at $2.99 ($0.093/oz) even though the sticker price is higher.

Store IDs change. Instacart sometimes reassigns store identifiers when a location changes or a chain rebrands. Store metadata periodically and validate your store IDs each week.

Pagination limits. GraphQL queries typically return max 30 products per request. For full category scrapes, calculate the total count first and plan your pagination loop accordingly.

Sessions expire. The _instacart_session cookie typically lasts 24-72 hours. Build logic to detect expiry (watch for 401 responses or empty results on known products) and refresh the cookie before it causes pipeline failures.

Grocery price data is useful for personal budgeting tools, competitive intelligence for CPG brands, regional cost-of-living analysis, and building deal-alert services. Keep your scraping volume reasonable — pulling a full store catalog every hour isn't necessary and will get you blocked faster than any other behavior.