← Back to blog

How to Scrape Costco Deals in 2026 (Weekly Offers, Kirkland Data, Warehouse Pricing)

How to Scrape Costco Deals in 2026 (Weekly Offers, Kirkland Data, Warehouse Pricing)

Costco doesn't have a public API. Their website is one of the most hostile to scrapers in all of e-commerce. But the data is valuable — warehouse pricing, Kirkland brand comparisons, and weekly deal rotations are gold for price comparison tools and consumer research.

Here's how to get it done.

What Data You Can Extract

From Costco's public website:

What's behind the login wall: in-store-only prices (differ from online), purchase history, warehouse-specific inventory levels.

Why Costco Is Harder Than Other Retailers

Most e-commerce sites are scrape-friendly compared to Costco. Here's what makes it challenging:

This is where proxy quality makes the biggest difference. Cheap datacenter proxies won't even get past the Imperva challenge. You need clean residential IPs that haven't been flagged. I've had consistent results with ThorData's residential proxy network — their IPs pass Imperva's reputation checks, which is the hardest part of scraping Costco.

Working Python Code

This scraper uses Playwright to handle JavaScript rendering, combined with residential proxies to bypass Imperva:

import asyncio
import json
from playwright.async_api import async_playwright

PROXY_CONFIG = {
    "server": "http://proxy.thordata.com:9000",
    "username": "USER",
    "password": "PASS",
}


async def scrape_costco_search(query: str, max_pages: int = 3) -> list[dict]:
    """Search Costco and extract product listings."""
    products = []

    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=True,
            proxy=PROXY_CONFIG,
        )
        context = await browser.new_context(
            user_agent=(
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
            viewport={"width": 1440, "height": 900},
        )
        page = await context.new_page()

        # Navigate and wait for Imperva challenge to resolve
        url = f"https://www.costco.com/CatalogSearch?dept=All&keyword={query}"
        await page.goto(url, wait_until="networkidle", timeout=30000)
        await page.wait_for_timeout(3000)  # extra wait for JS

        for page_num in range(max_pages):
            # Extract product data from the page
            items = await page.evaluate("""
                () => {
                    const products = [];
                    document.querySelectorAll('[automation-id="productList"] .product').forEach(el => {
                        const name = el.querySelector('.description')?.textContent?.trim();
                        const priceEl = el.querySelector('.price');
                        const price = priceEl?.textContent?.trim()?.replace(/[^0-9.]/g, '');
                        const ratingEl = el.querySelector('.ratings .value');
                        const rating = ratingEl?.textContent?.trim();
                        const reviewEl = el.querySelector('.ratings .count');
                        const reviews = reviewEl?.textContent?.replace(/[^0-9]/g, '');
                        const link = el.querySelector('a.description')?.href;
                        const img = el.querySelector('img.product-img')?.src;
                        const itemNum = el.querySelector('.item-num')?.textContent?.replace('Item ', '');

                        if (name && price) {
                            products.push({
                                name, price: parseFloat(price),
                                rating: rating ? parseFloat(rating) : null,
                                reviews: reviews ? parseInt(reviews) : 0,
                                url: link, image: img,
                                item_number: itemNum?.trim(),
                            });
                        }
                    });
                    return products;
                }
            """)
            products.extend(items)

            # Try next page
            next_btn = await page.query_selector('a[aria-label="Next"]')
            if not next_btn or page_num == max_pages - 1:
                break
            await next_btn.click()
            await page.wait_for_load_state("networkidle")
            await page.wait_for_timeout(2000)

        await browser.close()

    return products


async def scrape_product_details(product_url: str) -> dict:
    """Scrape full details from a Costco product page."""
    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=True,
            proxy=PROXY_CONFIG,
        )
        context = await browser.new_context(
            user_agent=(
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
        )
        page = await context.new_page()
        await page.goto(product_url, wait_until="networkidle", timeout=30000)
        await page.wait_for_timeout(3000)

        details = await page.evaluate("""
            () => {
                const name = document.querySelector('h1[automation-id="productName"]')
                    ?.textContent?.trim();
                const price = document.querySelector('[automation-id="productPrice"]')
                    ?.textContent?.trim()?.replace(/[^0-9.]/g, '');
                const desc = document.querySelector('#product-detail-description')
                    ?.textContent?.trim();
                const specs = {};
                document.querySelectorAll('.product-info-specs tr').forEach(row => {
                    const key = row.querySelector('th')?.textContent?.trim();
                    const val = row.querySelector('td')?.textContent?.trim();
                    if (key && val) specs[key] = val;
                });
                const images = [];
                document.querySelectorAll('.product-image-carousel img').forEach(img => {
                    if (img.src) images.push(img.src);
                });
                const shipping = document.querySelector('.shipping-info')
                    ?.textContent?.trim();

                return {
                    name, price: price ? parseFloat(price) : null,
                    description: desc, specifications: specs,
                    images, shipping_info: shipping,
                };
            }
        """)

        await browser.close()
    return details


async def scrape_weekly_deals() -> list[dict]:
    """Scrape current weekly deals / coupon book items."""
    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=True,
            proxy=PROXY_CONFIG,
        )
        context = await browser.new_context(
            user_agent=(
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
        )
        page = await context.new_page()
        await page.goto(
            "https://www.costco.com/warehouse-savings.html",
            wait_until="networkidle",
            timeout=30000,
        )
        await page.wait_for_timeout(3000)

        deals = await page.evaluate("""
            () => {
                const items = [];
                document.querySelectorAll('.product').forEach(el => {
                    const name = el.querySelector('.description')?.textContent?.trim();
                    const original = el.querySelector('.strike-price')
                        ?.textContent?.replace(/[^0-9.]/g, '');
                    const sale = el.querySelector('.sale-price, .price')
                        ?.textContent?.replace(/[^0-9.]/g, '');
                    const savings = el.querySelector('.savings')
                        ?.textContent?.trim();
                    const validThru = el.querySelector('.valid-dates')
                        ?.textContent?.trim();
                    if (name) {
                        items.push({
                            name,
                            original_price: original ? parseFloat(original) : null,
                            sale_price: sale ? parseFloat(sale) : null,
                            savings: savings,
                            valid_through: validThru,
                        });
                    }
                });
                return items;
            }
        """)

        await browser.close()
    return deals


if __name__ == "__main__":
    async def main():
        # Search for Kirkland products
        print("Searching: Kirkland Signature\n")
        products = await scrape_costco_search("kirkland signature", max_pages=2)
        for p in products[:10]:
            rating = f" ★{p['rating']}" if p["rating"] else ""
            print(f"  ${p['price']:.2f} — {p['name'][:70]}{rating}")

        print(f"\n  Total found: {len(products)} products")

        # Get weekly deals
        print("\nWeekly deals:\n")
        deals = await scrape_weekly_deals()
        for d in deals[:10]:
            save = f" (save {d['savings']})" if d["savings"] else ""
            print(f"  ${d['sale_price']:.2f} — {d['name'][:60]}{save}")

    asyncio.run(main())

Installing Dependencies

pip install playwright httpx && playwright install chromium

Playwright is necessary here because Costco's Imperva protection requires full JavaScript execution. Lighter approaches like httpx alone won't work.

Kirkland Brand Price Tracking

Kirkland Signature is Costco's private label and one of the most interesting datasets to track. Prices change infrequently but when they do, it signals broader supply chain shifts.

Build a simple tracker by running the search scraper weekly and storing results:

import sqlite3
from datetime import date

def save_prices(products: list[dict], db_path: str = "costco_prices.db"):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS prices (
            item_number TEXT, name TEXT, price REAL,
            date TEXT, PRIMARY KEY (item_number, date)
        )
    """)
    for p in products:
        if p.get("item_number"):
            conn.execute(
                "INSERT OR REPLACE INTO prices VALUES (?, ?, ?, ?)",
                (p["item_number"], p["name"], p["price"], date.today().isoformat()),
            )
    conn.commit()
    conn.close()

Practical Tips

Costco's terms prohibit automated access. Scraping publicly visible product listings and pricing for personal research is generally low-risk. Building a competing retail service with their data is not. Stick to publicly accessible pages and don't circumvent any authentication barriers.

Tracking Price Changes Over Time

Costco's pricing model is different from most retailers. Prices change infrequently but when they do, changes are permanent until the next adjustment. Here is a SQLite-backed tracker:

import sqlite3
from datetime import date, datetime

def init_price_tracker(db_path: str = "costco_prices.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS products (
            item_number TEXT PRIMARY KEY,
            name TEXT,
            url TEXT,
            category TEXT,
            image_url TEXT,
            first_seen TEXT,
            last_seen TEXT
        );

        CREATE TABLE IF NOT EXISTS price_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            item_number TEXT,
            price REAL,
            sale_price REAL,
            savings TEXT,
            valid_through TEXT,
            recorded_at TEXT,
            FOREIGN KEY (item_number) REFERENCES products(item_number)
        );

        CREATE TABLE IF NOT EXISTS price_alerts (
            item_number TEXT PRIMARY KEY,
            target_price REAL,
            email TEXT,
            active INTEGER DEFAULT 1
        );

        CREATE INDEX IF NOT EXISTS idx_price_item
            ON price_history(item_number, recorded_at DESC);
    """)
    conn.commit()
    return conn


def record_price(
    conn: sqlite3.Connection,
    item: dict,
    sale_price: float = None,
    savings: str = None,
    valid_through: str = None,
):
    """Record a price observation for a product."""
    now = date.today().isoformat()

    # Upsert product record
    conn.execute(
        """INSERT INTO products (item_number, name, url, image_url, first_seen, last_seen)
           VALUES (?, ?, ?, ?, ?, ?)
           ON CONFLICT(item_number) DO UPDATE SET
               name=excluded.name, last_seen=excluded.last_seen""",
        (item.get("item_number"), item.get("name"), item.get("url"),
         item.get("image"), now, now)
    )

    # Check if price has changed since last record
    last_price = conn.execute(
        """SELECT price FROM price_history
           WHERE item_number = ?
           ORDER BY recorded_at DESC LIMIT 1""",
        (item.get("item_number"),)
    ).fetchone()

    current_price = item.get("price")
    if last_price is None or last_price[0] != current_price:
        conn.execute(
            """INSERT INTO price_history
               (item_number, price, sale_price, savings, valid_through, recorded_at)
               VALUES (?, ?, ?, ?, ?, ?)""",
            (item.get("item_number"), current_price, sale_price,
             savings, valid_through, datetime.now().isoformat())
        )

    conn.commit()


def get_price_drops(conn: sqlite3.Connection, min_drop_pct: float = 10.0) -> list:
    """Find products where current price is significantly below historical average."""
    return conn.execute("""
        WITH latest AS (
            SELECT item_number, price, recorded_at,
                   ROW_NUMBER() OVER (PARTITION BY item_number ORDER BY recorded_at DESC) rn
            FROM price_history
        ),
        historical AS (
            SELECT item_number, AVG(price) as avg_price
            FROM price_history
            WHERE recorded_at < datetime('now', '-7 days')
            GROUP BY item_number
        )
        SELECT p.name, l.price as current_price,
               h.avg_price as historical_avg,
               ROUND((h.avg_price - l.price) / h.avg_price * 100, 1) as drop_pct
        FROM latest l
        JOIN historical h ON l.item_number = h.item_number
        JOIN products p ON l.item_number = p.item_number
        WHERE l.rn = 1
          AND h.avg_price > 0
          AND (h.avg_price - l.price) / h.avg_price * 100 >= ?
        ORDER BY drop_pct DESC
    """, (min_drop_pct,)).fetchall()

Category-Based Product Discovery

Instead of searching, browse Costco's category URLs directly for more systematic coverage:

COSTCO_CATEGORIES = {
    "electronics": "https://www.costco.com/electronics.html",
    "grocery": "https://www.costco.com/grocery.html",
    "health-beauty": "https://www.costco.com/health-beauty.html",
    "clothing": "https://www.costco.com/clothing.html",
    "garden": "https://www.costco.com/garden-patio.html",
    "kitchen": "https://www.costco.com/kitchen.html",
    "furniture": "https://www.costco.com/furniture.html",
    "toys": "https://www.costco.com/toys-games.html",
    "sporting-goods": "https://www.costco.com/sports-fitness.html",
    "auto": "https://www.costco.com/auto-accessories.html",
}

async def scrape_category(category_name: str, category_url: str) -> list:
    """Scrape all products from a Costco category page."""
    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=True,
            proxy=PROXY_CONFIG,
        )
        context = await browser.new_context(
            user_agent=(
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
            viewport={"width": 1440, "height": 900},
        )
        page = await context.new_page()

        await page.goto(category_url, wait_until="networkidle", timeout=30000)
        await page.wait_for_timeout(3000)

        # Scroll to load lazy products
        for _ in range(5):
            await page.evaluate("window.scrollBy(0, window.innerHeight)")
            await page.wait_for_timeout(1500)

        products = await page.evaluate("""
            () => {
                const items = [];
                document.querySelectorAll('[automation-id="productList"] .product').forEach(el => {
                    const name = el.querySelector('.description')?.textContent?.trim();
                    const price = el.querySelector('.price')?.textContent?.replace(/[^0-9.]/g, '');
                    const rating = el.querySelector('.ratings .value')?.textContent?.trim();
                    const link = el.querySelector('a.description')?.href;
                    const itemNum = el.querySelector('.item-num')?.textContent?.replace('Item ', '').trim();
                    const img = el.querySelector('img.product-img')?.src;
                    if (name && price) {
                        items.push({name, price: parseFloat(price), rating, url: link,
                                   item_number: itemNum, image: img});
                    }
                });
                return items;
            }
        """)

        await browser.close()

        # Tag with category
        for p in products:
            p["category"] = category_name

        return products


async def scrape_all_categories() -> list:
    """Scrape all major Costco categories systematically."""
    import asyncio
    import random

    all_products = []
    for category_name, url in COSTCO_CATEGORIES.items():
        print(f"Scraping category: {category_name}")
        products = await scrape_category(category_name, url)
        all_products.extend(products)
        print(f"  Found {len(products)} products")
        await asyncio.sleep(random.uniform(15, 30))

    return all_products

Kirkland Signature Data Analysis

The Kirkland brand is particularly interesting for pricing analysis. Here is how to build a Kirkland product database:

async def collect_kirkland_products(db_path: str = "costco_prices.db") -> list:
    """Collect all currently listed Kirkland Signature products."""
    products = await scrape_costco_search("kirkland signature", max_pages=5)

    # Filter to Kirkland-only
    kirkland = [p for p in products if "kirkland" in p.get("name", "").lower()]

    conn = init_price_tracker(db_path)
    for product in kirkland:
        if product.get("item_number"):
            record_price(conn, product)
    conn.close()

    print(f"Collected {len(kirkland)} Kirkland products")
    return kirkland


def analyze_kirkland_by_category(products: list) -> dict:
    """Group Kirkland products by category and compute pricing stats."""
    import re
    import statistics

    # Group by detected category
    categories = {}
    category_keywords = {
        "organic": ["organic"],
        "supplements": ["vitamin", "supplement", "omega", "probiotic", "protein"],
        "snacks": ["snack", "nut", "chip", "cracker", "trail mix", "popcorn"],
        "beverages": ["water", "coffee", "tea", "juice", "olive oil"],
        "household": ["laundry", "detergent", "paper", "trash", "zip"],
        "personal_care": ["shampoo", "conditioner", "soap", "dental", "floss"],
        "meat": ["chicken", "beef", "salmon", "shrimp", "turkey"],
        "dairy": ["butter", "cheese", "yogurt", "milk", "cream"],
    }

    for product in products:
        name_lower = product.get("name", "").lower()
        assigned = "other"
        for cat, keywords in category_keywords.items():
            if any(kw in name_lower for kw in keywords):
                assigned = cat
                break
        categories.setdefault(assigned, []).append(product.get("price", 0))

    analysis = {}
    for cat, prices in categories.items():
        valid_prices = [p for p in prices if p > 0]
        if valid_prices:
            analysis[cat] = {
                "count": len(valid_prices),
                "min": min(valid_prices),
                "max": max(valid_prices),
                "median": round(statistics.median(valid_prices), 2),
                "avg": round(statistics.mean(valid_prices), 2),
            }

    return dict(sorted(analysis.items(), key=lambda x: x[1]["count"], reverse=True))

Integrating with Price Alert Services

For personal use, connect price drops to notifications:

import smtplib
from email.mime.text import MIMEText

def send_price_alert(
    product_name: str,
    item_number: str,
    current_price: float,
    original_price: float,
    product_url: str,
    to_email: str,
    from_email: str,
    smtp_password: str,
):
    """Send email notification when a tracked product drops in price."""
    savings_pct = round((original_price - current_price) / original_price * 100, 1)
    savings_amt = round(original_price - current_price, 2)

    subject = f"Price Drop: {product_name[:50]} ({savings_pct}% off)"
    body = f"""
Price Alert: {product_name}

Current Price: ${current_price:.2f}
Previous Price: ${original_price:.2f}
Savings: ${savings_amt:.2f} ({savings_pct}% off)

Product URL: {product_url}
Item Number: {item_number}
    """.strip()

    msg = MIMEText(body)
    msg["Subject"] = subject
    msg["From"] = from_email
    msg["To"] = to_email

    with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
        server.login(from_email, smtp_password)
        server.sendmail(from_email, to_email, msg.as_string())


def check_and_alert(db_path: str = "costco_prices.db", alert_email: str = None):
    """Check for price drops on watched items and send alerts."""
    conn = sqlite3.connect(db_path)

    drops = get_price_drops(conn, min_drop_pct=15.0)

    for drop in drops:
        name, current_price, historical_avg, drop_pct = drop
        print(f"Drop: {name} - ${current_price:.2f} (was ~${historical_avg:.2f}, {drop_pct}% off)")

        if alert_email:
            product_url = conn.execute(
                "SELECT url FROM products WHERE name = ?", (name,)
            ).fetchone()
            url = product_url[0] if product_url else ""

            send_price_alert(
                product_name=name,
                item_number="",
                current_price=current_price,
                original_price=historical_avg,
                product_url=url,
                to_email=alert_email,
                from_email="[email protected]",
                smtp_password="app_password",
            )

    conn.close()

Automating with a Cron Job

Run the scraper daily and let it build a price history automatically:

# Add to crontab: crontab -e
# Run every day at 6am
0 6 * * * cd /home/user/costco-tracker && python3 scrape.py >> logs/scrape.log 2>&1
# scrape.py -- daily scraper script
import asyncio
import sqlite3
from datetime import date

async def daily_scrape():
    conn = init_price_tracker("costco_prices.db")

    print(f"Daily Costco scrape: {date.today().isoformat()}")

    # Weekly deals
    deals = await scrape_weekly_deals()
    print(f"Weekly deals: {len(deals)} items")
    for deal in deals:
        if deal.get("sale_price"):
            record_price(
                conn, deal,
                sale_price=deal.get("sale_price"),
                savings=deal.get("savings"),
                valid_through=deal.get("valid_through"),
            )

    # Kirkland products
    kirkland = await scrape_costco_search("kirkland signature", max_pages=3)
    print(f"Kirkland products: {len(kirkland)} items")
    for product in kirkland:
        if product.get("item_number"):
            record_price(conn, product)

    # Check for price drops
    drops = get_price_drops(conn, min_drop_pct=20.0)
    if drops:
        print(f"\nPrice drops found: {len(drops)}")
        for drop in drops:
            print(f"  {drop[0]}: ${drop[1]:.2f} (was ${drop[2]:.2f}, -{drop[3]}%)")

    conn.close()
    print("Done.")

asyncio.run(daily_scrape())

Costco's terms prohibit automated access. Scraping publicly visible product listings and pricing for personal research is generally low-risk legally. Building a competing retail service with their data is not. Key boundaries: