← Back to blog

How to Scrape eBay Auction Data in 2026: Sold Listings, Prices & Seller Stats

How to Scrape eBay Auction Data in 2026: Sold Listings, Prices & Seller Stats

eBay processes over $70 billion in gross merchandise volume annually. For resellers, market researchers, and pricing tool builders, eBay's sold listing data is the closest thing to a real-time price oracle — it tells you what items actually sell for, not what sellers hope to get.

eBay offers official APIs (Finding API, Browse API), and they cover active listings well. But for sold/completed items at scale, historical price trends, and seller analytics, you'll need a mix of API calls and targeted web scraping. This guide covers everything: both APIs, HTML scraping, full pagination, robust error handling, anti-detection, data storage, and proxy strategy.

What Data Can You Extract?

Between the APIs and scraping, you can access:

eBay's Anti-Bot Measures

eBay takes scraping seriously. Their defenses are layered and effective against naive scrapers:

1. API rate limits The Finding API allows 5,000 calls/day per App ID. The Browse API has similar daily limits plus per-second throttling. Exceed these and you get HTTP 429 with exponential backoff required.

2. Aggressive web scraping detection eBay uses Akamai Bot Manager on their web pages. It fingerprints TLS characteristics, JavaScript execution environment, mouse movement patterns, and request timing. Browser emulation that fails the JS challenge gets a CAPTCHA.

3. CAPTCHA walls After 50-100 page requests from a single IP, you'll hit a CAPTCHA interstitial. Datacenter IPs often get CAPTCHAs on the very first request.

4. Session-based tracking eBay tracks request patterns across sessions using cookies and browser fingerprinting. Even with rotating IPs, making the same search with identical parameters at consistent intervals flags automated behavior.

5. Sold listings are web-only The Finding API can return completed items, but detailed sold listing data (exact bid history, precise sale time, all bidders) requires scraping the web pages.

For scraping sold listings at scale, residential proxies are non-negotiable. eBay's Akamai setup blocks datacenter IPs almost instantly. ThorData residential proxies pass eBay's fingerprinting because they're real residential IPs — not datacenter IPs in disguise. Their 195+ country coverage lets you access region-specific eBay sites (ebay.de, ebay.co.uk) with appropriate local IPs.

Setting Up: eBay APIs

Register at developer.ebay.com to get API credentials. The sandbox is free; production keys require an approved app.

pip install requests beautifulsoup4 pandas sqlite3

Finding API: Search Active and Sold Listings

import requests
import time
import random
import json
import sqlite3
import logging
from datetime import datetime
from typing import Optional
from bs4 import BeautifulSoup

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)

EBAY_APP_ID = "YOUR_APP_ID"
FINDING_API_URL = "https://svcs.ebay.com/services/search/FindingService/v1"

def search_ebay_finding(
    keywords: str,
    sold: bool = False,
    min_price: float = None,
    max_price: float = None,
    limit: int = 100,
    page: int = 1,
    sort: str = "EndTimeNewest",
    category_id: str = None,
) -> dict:
    """
    Search eBay via Finding API.

    sold: If True, search completed/sold items only.
    sort options: EndTimeNewest, EndTimeSoonest, BidCountMost, PricePlusShippingLowest
    Returns raw API response dict.
    """
    operation = "findCompletedItems" if sold else "findItemsByKeywords"

    params = {
        "OPERATION-NAME": operation,
        "SERVICE-VERSION": "1.0.0",
        "SECURITY-APPNAME": EBAY_APP_ID,
        "RESPONSE-DATA-FORMAT": "JSON",
        "REST-PAYLOAD": "",
        "keywords": keywords,
        "paginationInput.entriesPerPage": min(limit, 100),
        "paginationInput.pageNumber": page,
        "sortOrder": sort,
        "outputSelector": "SellerInfo,StoreInfo,PictureURLSuperSize",
    }

    filter_idx = 0

    if sold:
        params["itemFilter(0).name"] = "SoldItemsOnly"
        params["itemFilter(0).value"] = "true"
        filter_idx = 1

    if min_price is not None:
        params[f"itemFilter({filter_idx}).name"] = "MinPrice"
        params[f"itemFilter({filter_idx}).value"] = str(min_price)
        filter_idx += 1

    if max_price is not None:
        params[f"itemFilter({filter_idx}).name"] = "MaxPrice"
        params[f"itemFilter({filter_idx}).value"] = str(max_price)
        filter_idx += 1

    if category_id:
        params["categoryId"] = category_id

    try:
        resp = requests.get(FINDING_API_URL, params=params, timeout=15)
        resp.raise_for_status()
        return resp.json()
    except requests.RequestException as e:
        logger.error(f"Finding API error: {e}")
        return {}


def parse_finding_results(data: dict, sold: bool = False) -> list:
    """Parse items from Finding API response."""
    key = "findCompletedItemsResponse" if sold else "findItemsByKeywordsResponse"
    response = data.get(key, [{}])[0]
    items_data = response.get("searchResult", [{}])[0].get("item", [])

    items = []
    for item in items_data:
        try:
            price_info = item.get("sellingStatus", [{}])[0]
            listing_info = item.get("listingInfo", [{}])[0]
            seller_info = item.get("sellerInfo", [{}])[0]
            shipping_info = item.get("shippingInfo", [{}])[0]
            condition_info = item.get("condition", [{}])[0]

            items.append({
                "item_id": item.get("itemId", [None])[0],
                "title": item.get("title", [None])[0],
                "price": float(price_info.get("currentPrice", [{}])[0].get("__value__", 0)),
                "currency": price_info.get("currentPrice", [{}])[0].get("@currencyId", "USD"),
                "bid_count": int(price_info.get("bidCount", [0])[0]),
                "sell_state": price_info.get("sellingState", [None])[0],
                "condition": condition_info.get("conditionDisplayName", [None])[0],
                "condition_id": condition_info.get("conditionId", [None])[0],
                "listing_type": listing_info.get("listingType", [None])[0],
                "end_time": listing_info.get("endTime", [None])[0],
                "start_time": listing_info.get("startTime", [None])[0],
                "buy_it_now": listing_info.get("buyItNowAvailable", ["false"])[0] == "true",
                "seller": seller_info.get("sellerUserName", [None])[0],
                "feedback_score": int(seller_info.get("feedbackScore", [0])[0]),
                "feedback_pct": seller_info.get("positiveFeedbackPercent", [None])[0],
                "top_rated": seller_info.get("topRatedSeller", ["false"])[0] == "true",
                "ship_type": shipping_info.get("shippingType", [None])[0],
                "url": item.get("viewItemURL", [None])[0],
                "gallery_url": item.get("galleryURL", [None])[0],
                "location": item.get("location", [None])[0],
                "country": item.get("country", [None])[0],
                "category_id": item.get("primaryCategory", [{}])[0].get("categoryId", [None])[0],
                "category_name": item.get("primaryCategory", [{}])[0].get("categoryName", [None])[0],
            })
        except (IndexError, KeyError, ValueError) as e:
            logger.debug(f"Error parsing item: {e}")
            continue

    return items


def get_pagination_info(data: dict, sold: bool = False) -> dict:
    """Extract pagination metadata from Finding API response."""
    key = "findCompletedItemsResponse" if sold else "findItemsByKeywordsResponse"
    response = data.get(key, [{}])[0]
    pagination = response.get("paginationOutput", [{}])[0]

    return {
        "total_entries": int(pagination.get("totalEntries", [0])[0]),
        "entries_per_page": int(pagination.get("entriesPerPage", [0])[0]),
        "page_number": int(pagination.get("pageNumber", [1])[0]),
        "total_pages": int(pagination.get("totalPages", [0])[0]),
    }


def search_sold_items_paginated(
    keywords: str,
    max_results: int = 500,
    min_price: float = None,
    max_price: float = None,
    delay_range: tuple = (0.5, 1.5),
) -> list:
    """
    Paginate through all sold items for a keyword search.
    Finding API caps at 10 pages (100 results/page = 1,000 max).
    """
    all_items = []
    page = 1

    while len(all_items) < max_results:
        logger.info(f"Fetching sold items page {page} for '{keywords}'")

        data = search_ebay_finding(
            keywords, sold=True,
            min_price=min_price, max_price=max_price,
            limit=100, page=page,
        )

        items = parse_finding_results(data, sold=True)
        if not items:
            logger.info("No more items")
            break

        all_items.extend(items)

        pagination = get_pagination_info(data, sold=True)
        total_pages = pagination.get("total_pages", 1)

        logger.info(
            f"Page {page}/{total_pages}: "
            f"{len(items)} items, {len(all_items)} total"
        )

        if page >= min(total_pages, 10):  # Finding API max 10 pages
            break

        page += 1
        time.sleep(random.uniform(*delay_range))

    return all_items[:max_results]

Browse API: Detailed Item Data

The Browse API requires OAuth authentication but returns richer data:

import base64

CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
BROWSE_API_URL = "https://api.ebay.com/buy/browse/v1"


def get_browse_token() -> Optional[str]:
    """Get OAuth token for Browse API."""
    credentials = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()

    try:
        resp = requests.post(
            "https://api.ebay.com/identity/v1/oauth2/token",
            headers={
                "Authorization": f"Basic {credentials}",
                "Content-Type": "application/x-www-form-urlencoded",
            },
            data="grant_type=client_credentials&scope=https://api.ebay.com/oauth/api_scope",
            timeout=15,
        )
        resp.raise_for_status()
        return resp.json().get("access_token")
    except requests.RequestException as e:
        logger.error(f"Token request failed: {e}")
        return None


def search_browse_api(
    query: str,
    token: str,
    limit: int = 200,
    offset: int = 0,
    filters: str = None,
    marketplace_id: str = "EBAY_US",
) -> dict:
    """
    Search via Browse API.

    filters examples:
      "price:[10..50],priceCurrency:USD"
      "buyingOptions:{AUCTION}"
      "conditions:{NEW}"
    """
    headers = {
        "Authorization": f"Bearer {token}",
        "X-EBAY-C-MARKETPLACE-ID": marketplace_id,
        "Content-Type": "application/json",
    }

    params = {
        "q": query,
        "limit": min(limit, 200),
        "offset": offset,
        "fieldgroups": "EXTENDED",
    }

    if filters:
        params["filter"] = filters

    try:
        resp = requests.get(
            f"{BROWSE_API_URL}/item_summary/search",
            headers=headers, params=params, timeout=15,
        )
        resp.raise_for_status()
        return resp.json()
    except requests.HTTPError as e:
        logger.error(f"Browse API HTTP error: {e}")
        if e.response.status_code == 429:
            retry_after = int(e.response.headers.get("retry-after", 60))
            logger.warning(f"Rate limited, waiting {retry_after}s")
            time.sleep(retry_after)
        return {}
    except requests.RequestException as e:
        logger.error(f"Browse API error: {e}")
        return {}


def get_item_detail_browse(item_id: str, token: str) -> Optional[dict]:
    """Fetch complete item details via Browse API."""
    headers = {
        "Authorization": f"Bearer {token}",
        "X-EBAY-C-MARKETPLACE-ID": "EBAY_US",
    }

    try:
        resp = requests.get(
            f"{BROWSE_API_URL}/item/v1|{item_id}|0",
            headers=headers, timeout=15,
        )
        resp.raise_for_status()
        data = resp.json()

        # Extract item specifics (brand, model, etc.)
        aspects = {}
        for aspect in data.get("localizedAspects", []):
            aspects[aspect.get("name", "")] = aspect.get("value", "")

        return {
            "item_id": data.get("itemId"),
            "title": data.get("title"),
            "price": data.get("price", {}).get("value"),
            "currency": data.get("price", {}).get("currency"),
            "current_bid": data.get("currentBidPrice", {}).get("value"),
            "bid_count": data.get("bidCount"),
            "end_date": data.get("itemEndDate"),
            "condition": data.get("condition"),
            "condition_id": data.get("conditionId"),
            "category": data.get("categoryPath"),
            "description": data.get("description", "")[:1000],
            "seller_username": data.get("seller", {}).get("username"),
            "seller_feedback_pct": data.get("seller", {}).get("feedbackPercentage"),
            "seller_feedback_score": data.get("seller", {}).get("feedbackScore"),
            "seller_country": data.get("seller", {}).get("sellerAccountType"),
            "item_location": data.get("itemLocation", {}).get("country"),
            "buying_options": data.get("buyingOptions", []),
            "shipping_cost": data.get("shippingOptions", [{}])[0].get("shippingCost", {}).get("value"),
            "ship_to_locations": data.get("shipToLocations", {}).get("regionIncluded", []),
            "images": [img.get("imageUrl") for img in data.get("additionalImages", [])],
            "aspects": aspects,
        }
    except requests.RequestException as e:
        logger.error(f"Error fetching item {item_id}: {e}")
        return None


def search_all_pages_browse(
    query: str,
    token: str,
    max_results: int = 1000,
    filters: str = None,
    delay: float = 0.5,
) -> list:
    """
    Paginate through Browse API results.
    eBay caps total results at 10,000 per query regardless of pagination.
    Max 200 per request.
    """
    all_items = []
    offset = 0

    while len(all_items) < max_results:
        data = search_browse_api(query, token, limit=200, offset=offset, filters=filters)

        items = data.get("itemSummaries", [])
        if not items:
            break

        for item in items:
            all_items.append({
                "item_id": item.get("itemId"),
                "title": item.get("title"),
                "price": item.get("price", {}).get("value"),
                "currency": item.get("price", {}).get("currency"),
                "buying_options": item.get("buyingOptions", []),
                "condition": item.get("condition"),
                "seller": item.get("seller", {}).get("username"),
                "seller_feedback_pct": item.get("seller", {}).get("feedbackPercentage"),
                "ship_to_country": item.get("itemLocation", {}).get("country"),
                "shipping_cost": item.get("shippingOptions", [{}])[0].get("shippingCost", {}).get("value") if item.get("shippingOptions") else None,
                "url": item.get("itemWebUrl"),
                "image": item.get("image", {}).get("imageUrl"),
            })

        total = int(data.get("total", 0))
        offset += 200

        logger.info(f"Browse API: {len(all_items)}/{min(total, max_results)} items fetched")

        if offset >= total or offset >= 10000:
            break

        time.sleep(delay)

    return all_items[:max_results]

Scraping Sold Listing Details from HTML

The API tells you an item sold and the price. For bid history and granular sale data, you scrape the listing page:

SCRAPE_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/126.0.0.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xhtml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
    "DNT": "1",
    "Upgrade-Insecure-Requests": "1",
}


def scrape_sold_listing(
    item_url: str,
    session: requests.Session,
    proxy: str = None,
) -> dict:
    """
    Scrape detailed data from a sold/completed eBay listing page.

    proxy format: 'http://USER:PASS@host:port'
    Returns dict with enriched data not available in APIs.
    """
    if proxy:
        proxies = {"http": proxy, "https": proxy}
    else:
        proxies = None

    try:
        resp = session.get(item_url, proxies=proxies, timeout=20)
        resp.raise_for_status()
    except requests.RequestException as e:
        logger.error(f"Failed to scrape {item_url}: {e}")
        return {}

    soup = BeautifulSoup(resp.text, "html.parser")
    result = {}

    # Final sale price
    price_el = soup.select_one(".x-price-primary span.ux-textspans")
    if price_el:
        result["final_price_text"] = price_el.get_text(strip=True)

    # Bid count (auction listings)
    bid_el = soup.select_one("span.ux-textspans--BOLD")
    for el in soup.select("span.ux-textspans"):
        text = el.get_text(strip=True)
        if "bid" in text.lower() and any(c.isdigit() for c in text):
            result["bid_count_text"] = text
            break

    # Watchers count
    for el in soup.select("span.ux-textspans"):
        text = el.get_text(strip=True)
        if "watcher" in text.lower():
            result["watchers_text"] = text
            break

    # Item condition details
    condition_el = soup.select_one("[data-testid='x-item-condition-text']")
    if condition_el:
        result["condition_detail"] = condition_el.get_text(strip=True)

    # Item specifics table
    specifics = {}
    for section in soup.select(".ux-layout-section--column"):
        label_els = section.select(".ux-labels-values__labels span.ux-textspans")
        value_els = section.select(".ux-labels-values__values span.ux-textspans")
        for label, value in zip(label_els, value_els):
            key = label.get_text(strip=True).rstrip(":")
            val = value.get_text(strip=True)
            if key and val:
                specifics[key] = val
    result["item_specifics"] = specifics

    # Returns policy
    returns_el = soup.select_one(".d-returns-minview")
    if returns_el:
        result["returns_policy"] = returns_el.get_text(strip=True)[:200]

    return result


def make_scraping_session() -> requests.Session:
    """Create a requests session with browser-like headers."""
    session = requests.Session()
    session.headers.update(SCRAPE_HEADERS)
    return session

Building a Price History Database

def init_database(db_path: str = "ebay_prices.db") -> sqlite3.Connection:
    """Initialize price history database."""
    conn = sqlite3.connect(db_path)

    conn.executescript("""
        CREATE TABLE IF NOT EXISTS sold_items (
            item_id TEXT PRIMARY KEY,
            title TEXT,
            search_query TEXT,
            price REAL,
            currency TEXT,
            bid_count INTEGER DEFAULT 0,
            condition TEXT,
            condition_id TEXT,
            listing_type TEXT,
            end_time TEXT,
            start_time TEXT,
            seller TEXT,
            feedback_score INTEGER DEFAULT 0,
            feedback_pct TEXT,
            top_rated BOOLEAN DEFAULT 0,
            location TEXT,
            country TEXT,
            category_id TEXT,
            category_name TEXT,
            item_specifics TEXT,
            url TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_query ON sold_items(search_query);
        CREATE INDEX IF NOT EXISTS idx_price ON sold_items(search_query, price);
        CREATE INDEX IF NOT EXISTS idx_end_time ON sold_items(end_time);
        CREATE INDEX IF NOT EXISTS idx_seller ON sold_items(seller);
        CREATE INDEX IF NOT EXISTS idx_category ON sold_items(category_id);

        CREATE TABLE IF NOT EXISTS price_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            search_query TEXT,
            snapshot_date TEXT,
            item_count INTEGER,
            avg_price REAL,
            median_price REAL,
            min_price REAL,
            max_price REAL,
            avg_bid_count REAL,
            auction_count INTEGER,
            fixed_price_count INTEGER,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE UNIQUE INDEX IF NOT EXISTS idx_snapshot_date
        ON price_snapshots(search_query, snapshot_date);
    """)

    conn.commit()
    return conn


def save_sold_items(
    conn: sqlite3.Connection,
    items: list,
    query: str,
    enrichments: dict = None,
) -> int:
    """
    Save sold item records.
    enrichments: dict of item_id -> scraped extra data
    """
    if enrichments is None:
        enrichments = {}

    saved = 0
    for item in items:
        enrich = enrichments.get(item.get("item_id"), {})
        specifics = enrich.get("item_specifics", {})

        try:
            conn.execute("""
                INSERT OR REPLACE INTO sold_items
                (item_id, title, search_query, price, currency, bid_count,
                 condition, condition_id, listing_type, end_time, start_time,
                 seller, feedback_score, feedback_pct, top_rated,
                 location, country, category_id, category_name, item_specifics, url)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                item.get("item_id"), item.get("title"), query,
                item.get("price"), item.get("currency"),
                item.get("bid_count", 0), item.get("condition"),
                item.get("condition_id"), item.get("listing_type"),
                item.get("end_time"), item.get("start_time"),
                item.get("seller"), item.get("feedback_score"),
                item.get("feedback_pct"), item.get("top_rated", False),
                item.get("location"), item.get("country"),
                item.get("category_id"), item.get("category_name"),
                json.dumps(specifics), item.get("url"),
            ))
            saved += 1
        except sqlite3.Error as e:
            logger.error(f"DB error: {e}")

    conn.commit()
    return saved


def take_price_snapshot(conn: sqlite3.Connection, query: str) -> dict:
    """Calculate and store price statistics for a query."""
    cursor = conn.execute("""
        SELECT price, bid_count, listing_type
        FROM sold_items
        WHERE search_query = ?
        ORDER BY price
    """, (query,))

    rows = cursor.fetchall()
    if not rows:
        return {}

    prices = [r[0] for r in rows if r[0] is not None]
    bid_counts = [r[1] for r in rows]
    types = [r[2] for r in rows]

    # Median calculation
    sorted_prices = sorted(prices)
    n = len(sorted_prices)
    if n % 2 == 0 and n > 0:
        median = (sorted_prices[n // 2 - 1] + sorted_prices[n // 2]) / 2
    elif n > 0:
        median = sorted_prices[n // 2]
    else:
        median = 0

    today = datetime.now().strftime("%Y-%m-%d")
    snapshot = {
        "search_query": query,
        "snapshot_date": today,
        "item_count": len(prices),
        "avg_price": round(sum(prices) / len(prices), 2) if prices else 0,
        "median_price": round(median, 2),
        "min_price": round(min(prices), 2) if prices else 0,
        "max_price": round(max(prices), 2) if prices else 0,
        "avg_bid_count": round(sum(bid_counts) / len(bid_counts), 1) if bid_counts else 0,
        "auction_count": sum(1 for t in types if t and "Auction" in t),
        "fixed_price_count": sum(1 for t in types if t and "FixedPrice" in t),
    }

    conn.execute("""
        INSERT OR REPLACE INTO price_snapshots
        (search_query, snapshot_date, item_count, avg_price, median_price,
         min_price, max_price, avg_bid_count, auction_count, fixed_price_count)
        VALUES (:search_query, :snapshot_date, :item_count, :avg_price, :median_price,
                :min_price, :max_price, :avg_bid_count, :auction_count, :fixed_price_count)
    """, snapshot)
    conn.commit()

    return snapshot


def get_price_stats(conn: sqlite3.Connection, query: str) -> dict:
    """Get current price statistics for a search query."""
    cursor = conn.execute("""
        SELECT
            COUNT(*) as count,
            AVG(price) as avg,
            MIN(price) as min,
            MAX(price) as max,
            AVG(CASE WHEN bid_count > 0 THEN price END) as avg_auction,
            AVG(CASE WHEN listing_type = 'FixedPrice' THEN price END) as avg_fixed,
            AVG(bid_count) as avg_bids,
            SUM(CASE WHEN bid_count > 0 THEN 1 ELSE 0 END) as auction_count,
            COUNT(*) - SUM(CASE WHEN bid_count > 0 THEN 1 ELSE 0 END) as fixed_count
        FROM sold_items WHERE search_query = ?
    """, (query,))
    row = cursor.fetchone()

    return {
        "query": query,
        "total_sold": row[0],
        "avg_price": round(row[1] or 0, 2),
        "min_price": round(row[2] or 0, 2),
        "max_price": round(row[3] or 0, 2),
        "avg_auction_price": round(row[4] or 0, 2),
        "avg_buy_now_price": round(row[5] or 0, 2),
        "avg_bid_count": round(row[6] or 0, 1),
        "auction_count": row[7] or 0,
        "fixed_price_count": row[8] or 0,
    }

Proxy Configuration for Scale

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000


def get_proxy_url(country: str = None, session_id: str = None) -> str:
    """
    Build ThorData proxy URL with optional targeting.
    country: US, GB, DE, AU, etc.
    session_id: sticky session (same IP for duration)
    """
    user = THORDATA_USER
    if country:
        user += f"-country-{country}"
    if session_id:
        user += f"-session-{session_id}"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"


def scrape_sold_listings_with_proxy(
    item_urls: list,
    max_per_session: int = 30,
) -> dict:
    """
    Scrape sold listing pages with proxy rotation.
    Rotates IP every max_per_session requests to avoid detection.
    """
    enrichments = {}
    session_num = 0

    for i, url in enumerate(item_urls):
        # Rotate session every N requests
        if i % max_per_session == 0:
            session_num += 1
            proxy = get_proxy_url(session_id=f"ebay-{session_num}")
            scrape_session = make_scraping_session()
            logger.info(f"Rotating to session {session_num}")

        item_id = url.split("/itm/")[-1].split("?")[0] if "/itm/" in url else str(i)

        data = scrape_sold_listing(url, scrape_session, proxy=proxy)
        if data:
            enrichments[item_id] = data

        # Human-like delays
        time.sleep(random.uniform(2.0, 4.5))

        if (i + 1) % 10 == 0:
            logger.info(f"Scraped {i+1}/{len(item_urls)} listing pages")

    return enrichments

Complete Pipeline

def build_price_database(
    queries: list,
    max_per_query: int = 200,
    enrich_top_n: int = 0,  # Set > 0 to scrape listing page details
    db_path: str = "ebay_prices.db",
) -> None:
    """
    Complete price research pipeline.
    Collects sold listings for multiple queries and builds price history.
    """
    conn = init_database(db_path)

    for query in queries:
        logger.info(f"\nProcessing: {query}")

        # Collect sold items via Finding API
        items = search_sold_items_paginated(
            query,
            max_results=max_per_query,
            delay_range=(0.5, 1.2),
        )

        if not items:
            logger.warning(f"No results for: {query}")
            continue

        # Optionally enrich top items with HTML scraping
        enrichments = {}
        if enrich_top_n > 0:
            top_items = sorted(items, key=lambda x: x.get("bid_count", 0), reverse=True)
            urls = [item["url"] for item in top_items[:enrich_top_n] if item.get("url")]
            enrichments = scrape_sold_listings_with_proxy(urls)

        # Save to database
        saved = save_sold_items(conn, items, query, enrichments=enrichments)
        logger.info(f"Saved {saved}/{len(items)} items for '{query}'")

        # Create price snapshot
        snapshot = take_price_snapshot(conn, query)

        # Report stats
        stats = get_price_stats(conn, query)
        print(f"\n{query}:")
        print(f"  Total sold: {stats['total_sold']}")
        print(f"  Price range: ${stats['min_price']:.2f} - ${stats['max_price']:.2f}")
        print(f"  Average price: ${stats['avg_price']:.2f}")
        print(f"  Median price: ${snapshot.get('median_price', 0):.2f}")
        print(f"  Auction avg: ${stats['avg_auction_price']:.2f}")
        print(f"  Buy Now avg: ${stats['avg_buy_now_price']:.2f}")
        print(f"  Avg bids: {stats['avg_bid_count']:.1f}")

        time.sleep(3)

    conn.close()
    logger.info("Price database build complete.")


if __name__ == "__main__":
    build_price_database(
        queries=[
            "iPhone 15 Pro Max 256GB unlocked",
            "Nintendo Switch OLED",
            "Lego Technic",
            "vintage Levis 501",
            "Air Jordan 1 Retro",
        ],
        max_per_query=200,
        enrich_top_n=0,
        db_path="ebay_prices.db",
    )

eBay's API Terms of Use allow most data collection for analytics and comparison purposes — that's what the APIs are built for. Web scraping is technically restricted by their Terms of Service, but price comparison tools like Terapeak (which eBay acquired), CamelCamelCamel, and Keepa have operated for years.

Practical guidelines: - Do collect pricing data for analytics and comparison - Do use the official APIs first before scraping HTML - Do not scrape user personal data (buyer identities, contact info) - Do not build an eBay clone or replicate their product inventory - Do not hammer their servers — keep request rates reasonable - For production use, eBay's Marketplace Insights API is worth investigating

Key Takeaways

  1. Finding API for sold itemsfindCompletedItems with SoldItemsOnly filter is the fastest path to sold pricing data. 5,000 calls/day covers extensive product research.

  2. Browse API for active listings — Better structured data, requires OAuth but handles 10K+ results per query with proper filter usage.

  3. HTML scraping for bid history — Detailed auction data requires the listing page. eBay's Akamai detection blocks datacenter IPs aggressively. ThorData residential proxies are the reliable solution.

  4. SQLite for price history — Store everything. Price history accumulates value — 3 months of sold data is exponentially more useful than a point-in-time scrape.

  5. Auction vs. Buy It Now separately — They tell different market stories. Track both price distributions independently for better pricing intelligence.

  6. Rate-limit even with the API — 5,000 calls/day sounds like a lot until you're tracking 50 product categories with daily updates.