← Back to blog

Scraping ASOS Fashion Data via Internal API (2026)

Scraping ASOS Fashion Data via Internal API (2026)

ASOS runs a modern single-page application powered by a set of internal JSON APIs. Rather than fighting through server-rendered HTML, you can hit the same endpoints their frontend calls — api.asos.com/product/search/v2/ for catalog search and api.asos.com/product/catalogue/v4/ for stock and pricing — and get clean, structured data directly.

These endpoints return full catalog information including current and sale prices, per-size stock availability, brand metadata, color variants, customer ratings, and category data. ASOS runs frequent sales (up to 70% off in seasonal events), making price monitoring especially valuable. This guide covers the full technical stack: endpoint discovery, authentication header requirements, pagination, multi-region collection, anti-bot measures, ThorData proxy integration, and a price tracking database.


What Data Is Available

The ASOS internal API exposes considerably more than the public-facing UI suggests:

Product Search Response

Catalogue (Stock/Price) Response

Per product: - All color variants — each colorway as separate product - Per-size stock status — in stock, low stock, out of stock flags - Price per variant — prices can vary by size in some markets - Brand size vs. standardized size mapping

What Requires HTML Scraping

Some fields require hitting individual product pages: - Full product description text - Composition and care instructions - Detailed fit guide - All product images (the API returns only the primary) - Customer review text (not just aggregate rating)


Anti-Bot Measures

ASOS uses Akamai Bot Manager. Understanding the threat model:

Required Custom Headers

Every valid ASOS frontend request includes store-specific headers that the API validates:

Header Example Value Purpose
asos-c-name asos-web-1.0 Client identifier
asos-c-ismobile false Device type flag
asos-c-store US, GB, AU Store region
asos-c-plat web Platform identifier

Requests missing these headers receive 403 responses or return degraded data (empty prices, missing availability).

TLS Fingerprinting

Akamai inspects the TLS ClientHello. Python's default SSL stack is distinguishable from Chrome. Using custom cipher suite ordering reduces detection rate significantly.

Geo-Targeting and IP Consistency

ASOS operates separate stores for UK, US, Australia, and an international region. Sending a US-store request from a UK IP triggers Akamai's anomaly detection because IP location contradicts the store parameter. You need IP-store consistency.

Rate Limiting

The search endpoint handles moderate crawl rates but throttles after ~30–40 requests/minute from a single IP. Per-request pagination from one IP looks suspicious regardless of headers.


Setup

import httpx
import asyncio
import ssl
import json
import time
import random
import sqlite3
from pathlib import Path
from datetime import datetime

# Store configurations
STORE_CONFIGS = {
    "US": {
        "store": "US",
        "currency": "USD",
        "country": "US",
        "lang": "en-US",
        "channel": "desktop-web",
    },
    "GB": {
        "store": "GB",
        "currency": "GBP",
        "country": "GB",
        "lang": "en-GB",
        "channel": "desktop-web",
    },
    "AU": {
        "store": "AU",
        "currency": "AUD",
        "country": "AU",
        "lang": "en-AU",
        "channel": "desktop-web",
    },
    "ROW": {  # Rest of World
        "store": "ROW",
        "currency": "USD",
        "country": "US",
        "lang": "en-US",
        "channel": "desktop-web",
    },
}

BASE_SEARCH = "https://api.asos.com/product/search/v2/"
BASE_CATALOGUE = "https://api.asos.com/product/catalogue/v4/stockprice"
BASE_PRODUCT = "https://api.asos.com/product/catalogue/v4/products/"

KEY_STORE_VERSION = "ornjx7v-36"  # Update from live request if needed

def make_headers(store="US"):
    return {
        "User-Agent": (
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
            "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36"
        ),
        "Accept": "application/json, text/plain, */*",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Origin": "https://www.asos.com",
        "Referer": "https://www.asos.com/",
        "asos-c-name": "asos-web-1.0",
        "asos-c-ismobile": "false",
        "asos-c-store": store,
        "asos-c-plat": "web",
    }

def make_ssl_context():
    """Create SSL context with Chrome-like cipher ordering."""
    ctx = ssl.create_default_context()
    # Chrome's preferred cipher suites
    ctx.set_ciphers(
        "TLS_AES_128_GCM_SHA256:TLS_AES_256_GCM_SHA384:"
        "TLS_CHACHA20_POLY1305_SHA256:ECDH+AESGCM:ECDH+CHACHA20:"
        "DH+AESGCM:DH+CHACHA20:ECDH+AES256:DH+AES256:ECDH+AES128:"
        "DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!eNULL:!MD5:!DSS"
    )
    return ctx

def make_client(store="US", proxy_url=None):
    """Create httpx client with proper headers and SSL for ASOS."""
    return httpx.Client(
        headers=make_headers(store),
        ssl=make_ssl_context(),
        proxies={"all://": proxy_url} if proxy_url else None,
        timeout=25.0,
        follow_redirects=True,
    )

Discovering the keyStoreDataversion

The keyStoreDataversion parameter is a versioned config key that rotates periodically. Fetch the current value from any live search response:

def get_current_key_version(store="US", proxy_url=None):
    """
    Fetch current keyStoreDataversion from a live ASOS search response.
    Call this at startup to ensure you have the current version.
    """
    config = STORE_CONFIGS[store]
    params = {
        "store": config["store"],
        "lang": config["lang"],
        "currency": config["currency"],
        "channel": config["channel"],
        "country": config["country"],
        "keyStoreDataversion": KEY_STORE_VERSION,
        "offset": 0,
        "limit": 1,
        "q": "dress",
    }

    with make_client(store, proxy_url) as client:
        try:
            resp = client.get(BASE_SEARCH, params=params)
            resp.raise_for_status()
            data = resp.json()

            # The version is echoed back in the response
            version = data.get("keyStoreDataversion") or KEY_STORE_VERSION
            print(f"Current keyStoreDataversion: {version}")
            return version
        except Exception as e:
            print(f"Could not fetch version: {e}, using default")
            return KEY_STORE_VERSION

# Always verify at startup
CURRENT_KEY_VERSION = get_current_key_version()

Product Search with Pagination

def search_products(
    query: str,
    store: str = "US",
    limit: int = 200,
    category_id: int = None,
    sort: str = None,
    proxy_url: str = None,
) -> list:
    """
    Search ASOS catalog and paginate through all results.
    sort options: freshness, priceAsc, priceDesc, priceDrop, rating, brand
    """
    config = STORE_CONFIGS[store]
    page_size = 72  # ASOS maximum per page
    all_results = []
    offset = 0

    with make_client(store, proxy_url) as client:
        while len(all_results) < limit:
            params = {
                "store": config["store"],
                "lang": config["lang"],
                "currency": config["currency"],
                "rowlength": "4",
                "channel": config["channel"],
                "country": config["country"],
                "keyStoreDataversion": CURRENT_KEY_VERSION,
                "offset": offset,
                "limit": min(page_size, limit - len(all_results)),
                "q": query,
            }
            if category_id:
                params["cid"] = category_id
            if sort:
                params["sort"] = sort

            try:
                resp = client.get(BASE_SEARCH, params=params)
                resp.raise_for_status()
                data = resp.json()
            except httpx.HTTPStatusError as e:
                print(f"Search error: {e.response.status_code} at offset {offset}")
                if e.response.status_code in (403, 429):
                    time.sleep(random.uniform(30, 60))
                break
            except Exception as e:
                print(f"Request error: {e}")
                break

            products = data.get("products", [])
            item_count = data.get("itemCount", 0)

            if not products:
                break

            for p in products:
                price = p.get("price", {})
                all_results.append({
                    "id": p.get("id"),
                    "name": p.get("name"),
                    "brand": p.get("brandName"),
                    "brand_id": p.get("brandId"),
                    "url": f"https://www.asos.com/{p.get('url', '')}",
                    "current_price": price.get("current", {}).get("value"),
                    "original_price": price.get("previous", {}).get("value"),
                    "currency": price.get("currency"),
                    "is_on_sale": price.get("isMarkedDown", False),
                    "discount_pct": price.get("discountPercentage"),
                    "colour": p.get("colour"),
                    "rating": p.get("rating"),
                    "rating_count": p.get("reviewsCount"),
                    "image_url": f"https://images.asos-media.com/products/{p.get('imageUrl', '')}",
                    "category": p.get("category"),
                    "gender": p.get("gender"),
                    "store": store,
                })

            print(f"  Offset {offset}: {len(products)} products (total: {len(all_results)}/{min(limit, item_count)})")
            offset += page_size

            if offset >= item_count or offset >= limit:
                break

            time.sleep(random.uniform(1.0, 2.5))

    return all_results[:limit]

# Example: get 500 women's dresses from ASOS US
dresses = search_products("midi dress", store="US", limit=300, proxy_url=proxy_url)
print(f"Found {len(dresses)} products")

# Show sale items
on_sale = [d for d in dresses if d["is_on_sale"]]
print(f"On sale: {len(on_sale)} ({len(on_sale)*100//len(dresses) if dresses else 0}%)")
for item in sorted(on_sale, key=lambda x: -(x.get("discount_pct") or 0))[:5]:
    print(f"  {item['brand']} — {item['name'][:50]}")
    print(f"    Now: {item['currency']} {item['current_price']} (was {item['original_price']}, -{item.get('discount_pct', '?')}%)")

Category Browsing

ASOS has a structured category tree. Browse by category ID instead of keyword search:

# Common ASOS category IDs (may change — verify from live requests)
ASOS_CATEGORIES = {
    "women_dresses": 8799,
    "women_tops": 4169,
    "women_jeans": 4169,
    "women_shoes": 4172,
    "men_shirts": 3602,
    "men_jeans": 3606,
    "men_shoes": 4209,
    "men_suits": 3606,
    "sportswear_women": 6435,
    "sportswear_men": 6422,
    "sale_women": 7014,
    "sale_men": 7013,
}

def browse_category(
    category_id: int,
    store: str = "US",
    sort: str = "freshness",
    limit: int = 500,
    proxy_url: str = None,
) -> list:
    """Browse ASOS by category ID rather than search query."""
    config = STORE_CONFIGS[store]
    page_size = 72
    all_results = []
    offset = 0

    with make_client(store, proxy_url) as client:
        while len(all_results) < limit:
            params = {
                "store": config["store"],
                "lang": config["lang"],
                "currency": config["currency"],
                "channel": config["channel"],
                "country": config["country"],
                "keyStoreDataversion": CURRENT_KEY_VERSION,
                "offset": offset,
                "limit": min(page_size, limit - len(all_results)),
                "cid": category_id,
                "sort": sort,
            }

            try:
                resp = client.get(BASE_SEARCH, params=params)
                resp.raise_for_status()
                data = resp.json()
            except Exception as e:
                print(f"Category browse error: {e}")
                break

            products = data.get("products", [])
            if not products:
                break

            for p in products:
                price = p.get("price", {})
                all_results.append({
                    "id": p.get("id"),
                    "name": p.get("name"),
                    "brand": p.get("brandName"),
                    "current_price": price.get("current", {}).get("value"),
                    "original_price": price.get("previous", {}).get("value"),
                    "is_on_sale": price.get("isMarkedDown", False),
                    "discount_pct": price.get("discountPercentage"),
                    "colour": p.get("colour"),
                    "rating": p.get("rating"),
                    "image_url": f"https://images.asos-media.com/products/{p.get('imageUrl', '')}",
                    "store": store,
                    "category_id": category_id,
                })

            item_count = data.get("itemCount", 0)
            offset += page_size

            if offset >= min(item_count, limit):
                break

            time.sleep(random.uniform(1.5, 2.5))

    return all_results[:limit]

# Browse the sale section
sale_items = browse_category(ASOS_CATEGORIES["sale_women"], store="US", sort="priceDrop", limit=200)
print(f"Sale items: {len(sale_items)}")

Size Availability (Catalogue Endpoint)

The search endpoint returns summary data. For per-size stock status, use the catalogue endpoint:

def get_size_availability(
    product_ids: list,
    store: str = "US",
    proxy_url: str = None,
) -> dict:
    """
    Fetch per-size stock availability for a batch of product IDs.
    Processes up to 50 IDs per request.
    Returns dict: {product_id: {"sizes": [...], "in_stock_count": N}}
    """
    config = STORE_CONFIGS[store]
    output = {}
    chunks = [product_ids[i:i+50] for i in range(0, len(product_ids), 50)]

    with make_client(store, proxy_url) as client:
        for chunk in chunks:
            params = {
                "productIds": ",".join(str(pid) for pid in chunk),
                "store": config["store"],
                "currency": config["currency"],
                "keyStoreDataversion": CURRENT_KEY_VERSION,
            }

            try:
                resp = client.get(BASE_CATALOGUE, params=params)
                resp.raise_for_status()
                data = resp.json()
            except Exception as e:
                print(f"Catalogue error: {e}")
                continue

            for product in data:
                pid = product.get("productId")
                if not pid:
                    continue

                variants = product.get("variants", [])
                sizes = []
                for v in variants:
                    sizes.append({
                        "size": v.get("brandSize") or v.get("size"),
                        "sku": v.get("sku"),
                        "in_stock": v.get("isInStock", False),
                        "low_stock": v.get("isLowInStock", False),
                        "price": v.get("price", {}).get("current", {}).get("value"),
                        "original_price": v.get("price", {}).get("previous", {}).get("value"),
                    })

                output[pid] = {
                    "sizes": sizes,
                    "in_stock_count": sum(1 for s in sizes if s["in_stock"]),
                    "low_stock_count": sum(1 for s in sizes if s["low_stock"]),
                    "total_sizes": len(sizes),
                    "has_stock": any(s["in_stock"] for s in sizes),
                }

            time.sleep(random.uniform(0.8, 1.5))

    return output

def enrich_with_sizes(products: list, store: str = "US", proxy_url: str = None) -> list:
    """Add size availability data to a product list."""
    ids = [p["id"] for p in products if p.get("id")]
    stock_map = get_size_availability(ids, store, proxy_url)

    for p in products:
        stock = stock_map.get(p.get("id"), {})
        p["sizes"] = stock.get("sizes", [])
        p["in_stock_sizes"] = stock.get("in_stock_count", 0)
        p["low_stock_sizes"] = stock.get("low_stock_count", 0)
        p["has_stock"] = stock.get("has_stock", True)

    return products

# Enrich first 100 dresses with size data
dresses_with_sizes = enrich_with_sizes(dresses[:100], store="US")
for d in dresses_with_sizes[:3]:
    in_stock = [s["size"] for s in d["sizes"] if s["in_stock"]]
    print(f"{d['name'][:50]}: {d['in_stock_sizes']}/{len(d['sizes'])} sizes in stock")
    print(f"  Available: {in_stock}")

ThorData Proxy Integration

ASOS's Akamai configuration specifically targets datacenter IP ranges. ThorData residential proxies route through real household connections in 195+ countries, providing the geographic IP consistency that Akamai expects.

Critical for ASOS: the store region and proxy country must match. A US-store request from a UK IP is anomalous; a UK-store request from a US IP is similarly flagged.

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000

# ThorData country codes map to ASOS stores
STORE_TO_COUNTRY = {
    "US": "us",
    "GB": "gb",
    "AU": "au",
    "ROW": "us",  # Use US IPs for ROW store
}

def get_proxy(store="US", session_id=None):
    """
    Get ThorData proxy URL matching the ASOS store region.
    session_id: use for sticky sessions across a scrape session.
    """
    country = STORE_TO_COUNTRY.get(store, "us")

    if session_id:
        user = f"{THORDATA_USER}-session-{session_id}-country-{country}"
    else:
        user = f"{THORDATA_USER}-country-{country}"

    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"

def get_playwright_proxy(store="US", session_id=None):
    """Playwright proxy config dict."""
    country = STORE_TO_COUNTRY.get(store, "us")
    if session_id:
        user = f"{THORDATA_USER}-session-{session_id}-country-{country}"
    else:
        user = f"{THORDATA_USER}-country-{country}"

    return {
        "server": f"http://{THORDATA_HOST}:{THORDATA_PORT}",
        "username": user,
        "password": THORDATA_PASS,
    }

# Example: collect from all major stores simultaneously
def collect_multi_region(query, limit_per_store=200):
    """Scrape same query across all ASOS regional stores."""
    all_results = {}

    for store in ["US", "GB", "AU"]:
        print(f"\n=== Scraping {store} store ===")
        session_id = random.randint(10000, 99999)
        proxy = get_proxy(store=store, session_id=session_id)

        products = search_products(query, store=store, limit=limit_per_store, proxy_url=proxy)
        all_results[store] = products
        print(f"  {store}: {len(products)} products")

        if products:
            on_sale = [p for p in products if p["is_on_sale"]]
            avg_price = sum(p["current_price"] for p in products if p["current_price"]) / len(products)
            print(f"  On sale: {len(on_sale)} | Avg price: {products[0]['currency']} {avg_price:.2f}")

        time.sleep(random.uniform(5, 10))

    return all_results

results_by_store = collect_multi_region("black midi dress")

Price Tracking Database

ASOS runs frequent sales. Track prices over time to detect markdown events:

def init_db(db_path="asos_prices.db"):
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY,
            name TEXT,
            brand TEXT,
            category_id INTEGER,
            url TEXT,
            image_url TEXT,
            first_seen TEXT DEFAULT (datetime('now'))
        );

        CREATE TABLE IF NOT EXISTS price_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id INTEGER NOT NULL,
            store TEXT NOT NULL,
            current_price REAL,
            original_price REAL,
            currency TEXT,
            is_on_sale INTEGER DEFAULT 0,
            discount_pct REAL,
            in_stock_sizes INTEGER,
            total_sizes INTEGER,
            rating REAL,
            rating_count INTEGER,
            captured_at TEXT DEFAULT (datetime('now')),
            FOREIGN KEY (product_id) REFERENCES products(id)
        );

        CREATE TABLE IF NOT EXISTS size_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id INTEGER NOT NULL,
            store TEXT NOT NULL,
            size TEXT,
            in_stock INTEGER DEFAULT 0,
            low_stock INTEGER DEFAULT 0,
            price REAL,
            captured_at TEXT DEFAULT (datetime('now')),
            FOREIGN KEY (product_id) REFERENCES products(id)
        );

        CREATE INDEX IF NOT EXISTS idx_price_product ON price_snapshots(product_id, captured_at);
        CREATE INDEX IF NOT EXISTS idx_price_sale ON price_snapshots(is_on_sale, store);
        CREATE INDEX IF NOT EXISTS idx_product_brand ON products(brand);
    """)
    conn.commit()
    return conn

def save_products_batch(conn, products, store):
    """Save products and price snapshots."""
    now = datetime.utcnow().isoformat()

    for p in products:
        pid = p.get("id")
        if not pid:
            continue

        # Upsert product record
        conn.execute("""
            INSERT OR IGNORE INTO products (id, name, brand, url, image_url)
            VALUES (?, ?, ?, ?, ?)
        """, (pid, p.get("name"), p.get("brand"), p.get("url"), p.get("image_url")))

        # Update name if it changed
        conn.execute("""
            UPDATE products SET name = ?, brand = ? WHERE id = ?
        """, (p.get("name"), p.get("brand"), pid))

        # Price snapshot
        conn.execute("""
            INSERT INTO price_snapshots
            (product_id, store, current_price, original_price, currency,
             is_on_sale, discount_pct, in_stock_sizes, total_sizes, rating, rating_count, captured_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            pid, store,
            p.get("current_price"), p.get("original_price"), p.get("currency"),
            1 if p.get("is_on_sale") else 0,
            p.get("discount_pct"),
            p.get("in_stock_sizes"), len(p.get("sizes", [])),
            p.get("rating"), p.get("rating_count"),
            now,
        ))

        # Size snapshots
        for s in p.get("sizes", []):
            conn.execute("""
                INSERT INTO size_snapshots (product_id, store, size, in_stock, low_stock, price, captured_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                pid, store, s.get("size"),
                1 if s.get("in_stock") else 0,
                1 if s.get("low_stock") else 0,
                s.get("price"), now,
            ))

    conn.commit()

def find_new_sales(conn, store="US", hours_back=24):
    """Find products that went on sale in the last N hours."""
    cursor = conn.execute("""
        SELECT DISTINCT
            p.name, p.brand, p.url,
            new_snap.current_price, new_snap.original_price,
            new_snap.discount_pct,
            old_snap.current_price as prev_price
        FROM price_snapshots new_snap
        JOIN price_snapshots old_snap ON new_snap.product_id = old_snap.product_id
            AND new_snap.store = old_snap.store
        JOIN products p ON new_snap.product_id = p.id
        WHERE new_snap.is_on_sale = 1
          AND old_snap.is_on_sale = 0
          AND new_snap.store = ?
          AND new_snap.captured_at > datetime('now', '-' || ? || ' hours')
          AND old_snap.captured_at < new_snap.captured_at
        ORDER BY new_snap.discount_pct DESC
    """, (store, hours_back))

    return cursor.fetchall()

def price_history(conn, product_id, store="US", days=30):
    """Get price history for a specific product."""
    cursor = conn.execute("""
        SELECT captured_at, current_price, original_price, is_on_sale, discount_pct
        FROM price_snapshots
        WHERE product_id = ? AND store = ?
          AND captured_at > datetime('now', '-' || ? || ' days')
        ORDER BY captured_at ASC
    """, (product_id, store, days))
    return cursor.fetchall()

Sale Detection and Alerts

def check_for_sales(conn, store="US", min_discount_pct=20):
    """Check for significant price drops since last scrape."""
    new_sales = find_new_sales(conn, store)

    significant = [s for s in new_sales if (s[5] or 0) >= min_discount_pct]
    print(f"\n{len(significant)} new sale items (>= {min_discount_pct}% off) in {store} store:")

    for name, brand, url, curr_price, orig_price, disc_pct, prev_price in significant[:10]:
        print(f"  {brand} — {name[:50]}")
        print(f"    Now: {curr_price} | Was: {orig_price} | -{disc_pct:.0f}%")
        print(f"    {url}")

    return significant

def find_low_stock_opportunities(conn, store="US"):
    """Find in-stock items where most sizes are already gone."""
    cursor = conn.execute("""
        SELECT
            p.name, p.brand,
            ps.current_price, ps.in_stock_sizes, ps.total_sizes,
            ROUND(CAST(ps.in_stock_sizes AS REAL) / NULLIF(ps.total_sizes, 0) * 100) as stock_pct
        FROM price_snapshots ps
        JOIN products p ON ps.product_id = p.id
        WHERE ps.store = ?
          AND ps.is_on_sale = 1
          AND ps.in_stock_sizes > 0
          AND ps.in_stock_sizes <= 2
          AND ps.total_sizes >= 6
          AND ps.captured_at > datetime('now', '-1 day')
        ORDER BY ps.discount_pct DESC
        LIMIT 20
    """, (store,))

    print(f"\nOn-sale items with very limited stock in {store}:")
    for row in cursor.fetchall():
        name, brand, price, in_stock, total, stock_pct = row
        print(f"  {brand} — {name[:45]}: {in_stock}/{total} sizes left ({stock_pct:.0f}% available)")

    return cursor.fetchall()

Full Scraping Pipeline

def run_asos_pipeline(
    queries: list = None,
    category_ids: list = None,
    stores: list = None,
    db_path: str = "asos_tracker.db",
    limit_per_query: int = 200,
    include_sizes: bool = True,
):
    """
    Complete ASOS data collection pipeline.
    Scrapes catalog, enriches with size data, saves to DB.
    """
    if queries is None:
        queries = ["dress", "jeans", "sneakers", "jacket"]
    if stores is None:
        stores = ["US", "GB"]

    conn = init_db(db_path)
    total_products = 0

    for store in stores:
        print(f"\n{'='*50}")
        print(f"Store: {store}")
        print("="*50)

        session_id = random.randint(10000, 99999)
        proxy = get_proxy(store=store, session_id=session_id)

        # Query-based collection
        for query in queries:
            print(f"\nQuery: '{query}'")
            products = search_products(
                query, store=store, limit=limit_per_query,
                sort="priceDrop",  # Surface sale items
                proxy_url=proxy,
            )

            if include_sizes and products:
                print(f"  Enriching {len(products)} products with size data...")
                products = enrich_with_sizes(products[:100], store=store, proxy_url=proxy)

            save_products_batch(conn, products, store)
            total_products += len(products)
            print(f"  Saved {len(products)} products")
            time.sleep(random.uniform(3, 6))

        # Category browsing for complete coverage
        if category_ids:
            for cat_id in category_ids:
                print(f"\nCategory {cat_id}:")
                cat_products = browse_category(
                    cat_id, store=store, sort="freshness",
                    limit=300, proxy_url=proxy
                )
                if include_sizes and cat_products:
                    cat_products = enrich_with_sizes(cat_products[:100], store, proxy)
                save_products_batch(conn, cat_products, store)
                total_products += len(cat_products)
                print(f"  Saved {len(cat_products)} category products")
                time.sleep(random.uniform(3, 6))

        # Check for new sales
        check_for_sales(conn, store=store)

    print(f"\nPipeline complete: {total_products} products collected")

    # Print summary stats
    cursor = conn.execute("SELECT COUNT(DISTINCT product_id) FROM price_snapshots")
    total_tracked = cursor.fetchone()[0]
    cursor = conn.execute("SELECT COUNT(*) FROM price_snapshots WHERE is_on_sale = 1 AND captured_at > datetime('now', '-1 day')")
    on_sale_now = cursor.fetchone()[0]
    print(f"Database: {total_tracked} products tracked | {on_sale_now} on sale now")

if __name__ == "__main__":
    run_asos_pipeline(
        queries=["midi dress", "wide leg jeans", "chunky trainers"],
        stores=["US", "GB"],
        limit_per_query=150,
        include_sizes=True,
    )

Proxy Rotation Best Practices for ASOS

import random

def scrape_with_rotation(queries, store="US", db_path="asos.db"):
    """
    Scrape multiple queries with proper proxy rotation.
    Rotate sessions between queries, not between pages.
    Consistent session per query = looks like one user browsing.
    """
    conn = init_db(db_path)

    for query in queries:
        # Fresh session per query
        session_id = random.randint(10000, 99999)
        proxy = get_proxy(store=store, session_id=session_id)

        try:
            products = search_products(query, store=store, limit=200, proxy_url=proxy)
            save_products_batch(conn, products, store)
            print(f"'{query}': {len(products)} products saved")
        except Exception as e:
            print(f"'{query}' failed: {e}")
            # Try with fresh proxy
            proxy = get_proxy(store=store)
            try:
                products = search_products(query, store=store, limit=50, proxy_url=proxy)
                save_products_batch(conn, products, store)
                print(f"  Retry succeeded: {len(products)} products")
            except Exception as e2:
                print(f"  Retry also failed: {e2}")

        # Human-like delay between queries
        time.sleep(random.uniform(8, 20))

Real-World Use Cases

1. Fashion Price Aggregator

Build a comparison tool that tracks prices across ASOS, and other retailers:

def build_price_alert_feed(conn, store="US", max_discount=30):
    """Generate an RSS-like feed of items over a discount threshold."""
    cursor = conn.execute("""
        SELECT p.name, p.brand, p.url,
               ps.current_price, ps.original_price, ps.discount_pct,
               ps.in_stock_sizes, ps.currency
        FROM price_snapshots ps
        JOIN products p ON ps.product_id = p.id
        WHERE ps.store = ? AND ps.is_on_sale = 1
          AND ps.discount_pct >= ?
          AND ps.in_stock_sizes > 0
          AND ps.captured_at > datetime('now', '-6 hours')
        ORDER BY ps.discount_pct DESC
        LIMIT 50
    """, (store, max_discount))

    feed = []
    for row in cursor.fetchall():
        name, brand, url, curr, orig, disc, sizes_left, currency = row
        feed.append({
            "title": f"{brand} — {name[:60]} ({disc:.0f}% off)",
            "price": f"{currency} {curr} (was {orig})",
            "sizes_available": sizes_left,
            "url": url,
        })

    return feed

2. Brand Price Positioning Analysis

def analyze_brand_pricing(conn, store="US"):
    """Compare average prices and sale frequency by brand."""
    cursor = conn.execute("""
        SELECT
            p.brand,
            COUNT(DISTINCT ps.product_id) as products,
            AVG(ps.current_price) as avg_price,
            SUM(CASE WHEN ps.is_on_sale = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as sale_pct,
            AVG(CASE WHEN ps.is_on_sale = 1 THEN ps.discount_pct ELSE NULL END) as avg_discount
        FROM price_snapshots ps
        JOIN products p ON ps.product_id = p.id
        WHERE ps.store = ?
          AND ps.captured_at > datetime('now', '-7 days')
        GROUP BY p.brand
        HAVING products >= 10
        ORDER BY products DESC
        LIMIT 20
    """, (store,))

    print(f"\nBrand pricing analysis ({store} store):")
    for brand, products, avg_price, sale_pct, avg_disc in cursor.fetchall():
        disc_str = f" | avg discount {avg_disc:.0f}%" if avg_disc else ""
        print(f"  {brand}: {products} products | avg {avg_price:.0f} | {sale_pct:.0f}% on sale{disc_str}")

3. Inventory Depletion Tracker

Monitor when popular items sell out to understand demand signals:

def track_inventory_depletion(conn, store="US"):
    """Find items that significantly lost stock between snapshots."""
    cursor = conn.execute("""
        SELECT p.name, p.brand,
               late_snap.in_stock_sizes as current_sizes,
               early_snap.in_stock_sizes as prev_sizes,
               early_snap.in_stock_sizes - late_snap.in_stock_sizes as sizes_sold
        FROM price_snapshots late_snap
        JOIN price_snapshots early_snap
            ON late_snap.product_id = early_snap.product_id
            AND late_snap.store = early_snap.store
        JOIN products p ON late_snap.product_id = p.id
        WHERE late_snap.store = ?
          AND late_snap.captured_at > datetime('now', '-1 day')
          AND early_snap.captured_at > datetime('now', '-2 days')
          AND early_snap.captured_at < datetime('now', '-1 day')
          AND late_snap.in_stock_sizes < early_snap.in_stock_sizes
          AND early_snap.in_stock_sizes >= 4
        ORDER BY sizes_sold DESC
        LIMIT 15
    """, (store,))

    print(f"\nFastest-selling items in {store} (last 24h):")
    for name, brand, curr, prev, sold in cursor.fetchall():
        print(f"  {brand} — {name[:50]}: {sold} sizes sold ({prev} → {curr} remaining)")

Closing Tips

Verify keyStoreDataversion at startup — this value changes irregularly and silently breaks scrapes without returning errors. One startup check prevents a wasted run.

Keep store and proxy country consistent — ASOS's Akamai rules flag store/IP mismatches. ThorData's per-request country targeting makes this easy to automate — set the target country in the proxy username and pair it with the matching store config.

Rotate sessions between queries, not between pages — consistent pagination from one IP looks like normal browsing. Rotating proxies mid-pagination is the suspicious pattern.

Use the catalogue endpoint for size data — the search endpoint can't return per-size availability. Hit the catalogue endpoint in batches of 50 IDs to keep requests efficient.

Run at off-peak hours for ASOS's markets — UK and EU scraping has lower traffic pressure during US business hours and vice versa. Less real traffic = less aggressive bot scoring.

Track discount_pct changes, not just is_on_sale — ASOS sometimes adjusts sale percentages during events. A product going from 20% to 50% off is as important an event as it going from 0% to 20%.

With daily scrapes across a few hundred product queries, you build a comprehensive ASOS price history database within weeks — enough to identify sale patterns, predict markdown events, and surface deals before aggregator sites catch them.