← Back to blog

How to Scrape Etsy Listings in 2026: Shops, Products & Reviews

How to Scrape Etsy Listings in 2026: Shops, Products & Reviews

Etsy hosts over 9 million active sellers and 96 million buyers. Whether you are doing competitive analysis, price monitoring, market research on handmade goods, or building a dataset of independent creator products, Etsy's data is rich — product titles, pricing with variants, shop analytics, review sentiment, bestseller tags, and more.

The official Etsy API (v3) exists but requires OAuth and an approved app, which involves a review process. For most research use cases, that overhead is not worth it. The good news: Etsy exposes a bespoke AJAX API that returns structured JSON from public pages, no authentication required.

This guide covers every angle: what data Etsy exposes, how their bot protection works, the AJAX endpoints that return clean JSON, scraping listing detail pages, extracting reviews, and storing everything in SQLite.

What You Can Extract

Etsy product and shop pages contain dense structured data:

Etsy's Anti-Bot Architecture

Etsy has layered defenses that have grown more sophisticated over the past two years:

Cloudflare integration. All Etsy endpoints run through Cloudflare's full bot management suite. JS challenges and Turnstile CAPTCHAs trigger on suspicious traffic — fresh IPs with no browsing history, linear pagination patterns, missing browser headers.

Aggressive rate limiting. More than 20 requests per minute from one IP triggers soft blocks. Soft blocks return 200 responses with empty results — you will not know you have been throttled unless you check the response payload.

Session tracking. Etsy tracks browsing patterns across requests. Crawling product pages linearly gets flagged faster than varied navigation. Requests without valid session cookies get challenged on many endpoints.

IP reputation scoring. Datacenter IP ranges are challenged immediately. Etsy's Cloudflare configuration specifically rejects ASNs associated with cloud hosting providers and proxy data centers.

Dynamic content loading. Some listing data loads via JavaScript after the initial page render. The AJAX endpoints described below bypass this problem entirely since they return JSON directly.

Method 1: The Bespoke AJAX API

Etsy's frontend makes requests to an internal bespoke API for search and shop data. This endpoint returns clean JSON without browser rendering:

import httpx
import json
import time
import random
from fake_useragent import UserAgent

ua = UserAgent()

def search_etsy_shops(
    query: str,
    page: int = 1,
    proxy: str = None,
) -> list[dict]:
    """
    Search Etsy shops using the bespoke AJAX endpoint.
    Returns shop listings with metadata including sales, ratings, and badges.
    """
    url = "https://www.etsy.com/api/v3/ajax/bespoke/member/neu/specs/async_search_results"

    params = {
        "q": query,
        "ref": "search_bar",
        "search_type": "shops",
        "page": page,
    }

    headers = {
        "User-Agent": ua.random,
        "Accept": "application/json",
        "Referer": f"https://www.etsy.com/search?q={query}&search_type=shops",
        "X-Requested-With": "XMLHttpRequest",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Sec-Fetch-Dest": "empty",
        "Sec-Fetch-Mode": "cors",
        "Sec-Fetch-Site": "same-origin",
    }

    client_kwargs = {"headers": headers, "follow_redirects": True, "timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params=params)

    if resp.status_code != 200:
        return []

    data = resp.json()
    shops = []

    for node in data.get("nodes", []):
        if node.get("type") != "search":
            continue
        for result in node.get("data", {}).get("results", []):
            shops.append({
                "shop_name": result.get("shop_name"),
                "shop_id": result.get("shop_id"),
                "title": result.get("title"),
                "url": result.get("url"),
                "num_favorers": result.get("num_favorers"),
                "star_seller": result.get("is_star_seller", False),
                "num_sales": result.get("num_sales"),
                "rating": result.get("rating"),
                "review_count": result.get("review_count"),
            })

    return shops


# Paginate through multiple pages
def search_etsy_all_pages(
    query: str, max_pages: int = 5, proxy: str = None
) -> list[dict]:
    """Fetch multiple pages of Etsy search results."""
    all_shops = []
    for page in range(1, max_pages + 1):
        shops = search_etsy_shops(query, page=page, proxy=proxy)
        if not shops:
            break
        all_shops.extend(shops)
        print(f"  Page {page}: {len(shops)} shops")
        time.sleep(random.uniform(4, 8))
    return all_shops

Method 2: Listing Search via Products Endpoint

The products search endpoint follows a similar pattern but returns individual listings instead of shops:

def search_etsy_listings(
    query: str,
    page: int = 1,
    min_price: float = None,
    max_price: float = None,
    proxy: str = None,
) -> list[dict]:
    """
    Search Etsy product listings using the AJAX search endpoint.
    Returns individual listings with price, rating, seller info.
    """
    url = "https://www.etsy.com/api/v3/ajax/bespoke/member/neu/specs/async_search_results"

    params = {
        "q": query,
        "ref": "search_bar",
        "search_type": "all",
        "page": page,
    }

    if min_price is not None:
        params["min"] = int(min_price)
    if max_price is not None:
        params["max"] = int(max_price)

    headers = {
        "User-Agent": ua.random,
        "Accept": "application/json",
        "Referer": f"https://www.etsy.com/search?q={query}",
        "X-Requested-With": "XMLHttpRequest",
        "Accept-Language": "en-US,en;q=0.9",
    }

    client_kwargs = {"headers": headers, "follow_redirects": True, "timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params=params)

    if resp.status_code != 200:
        return []

    data = resp.json()
    listings = []

    for node in data.get("nodes", []):
        if node.get("type") != "search":
            continue
        for result in node.get("data", {}).get("results", []):
            if result.get("type") != "listing":
                continue
            listings.append({
                "listing_id": result.get("listing_id"),
                "title": result.get("title"),
                "url": result.get("url"),
                "price": result.get("price", {}).get("amount"),
                "currency": result.get("price", {}).get("currency"),
                "price_formatted": result.get("price", {}).get("string"),
                "shop_name": result.get("shop_name"),
                "num_favorers": result.get("num_favorers"),
                "star_seller": result.get("is_star_seller", False),
                "bestseller": result.get("is_bestseller", False),
                "image_url": result.get("main_image", {}).get("url_570xN"),
            })

    return listings

Method 3: Scraping Listing Detail Pages

Individual listing pages embed product data in JSON-LD and a hydration script. This gives you the most complete data:

def scrape_etsy_listing(listing_url: str, proxy: str = None) -> dict:
    """Scrape a single Etsy listing page for full product details."""
    headers = {
        "User-Agent": ua.random,
        "Accept": "text/html,application/xhtml+xml",
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": "https://www.google.com/",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "cross-site",
        "Cache-Control": "no-cache",
    }

    client_kwargs = {
        "headers": headers,
        "follow_redirects": True,
        "timeout": 15,
        "cookies": {},  # Empty cookies — let them be set naturally
    }
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(listing_url)

    if resp.status_code != 200:
        return {"error": f"Status {resp.status_code}", "url": listing_url}

    listing = {"url": listing_url}

    # Extract JSON-LD product data
    import re
    ld_match = re.search(
        r'<script type="application/ld\+json">(.*?)</script>',
        resp.text, re.DOTALL,
    )
    if ld_match:
        try:
            ld = json.loads(ld_match.group(1))
            if isinstance(ld, list):
                ld = next(
                    (item for item in ld if item.get("@type") == "Product"),
                    ld[0] if ld else {}
                )
            listing["name"] = ld.get("name")
            listing["description"] = ld.get("description", "")[:1000]
            listing["image"] = (
                ld.get("image", [None])[0]
                if isinstance(ld.get("image"), list)
                else ld.get("image")
            )

            offers = ld.get("offers", {})
            if isinstance(offers, list):
                offers = offers[0]
            listing["price"] = offers.get("price")
            listing["currency"] = offers.get("priceCurrency")
            listing["availability"] = offers.get("availability", "").split("/")[-1]

            agg = ld.get("aggregateRating", {})
            listing["rating"] = agg.get("ratingValue")
            listing["review_count"] = agg.get("reviewCount")
        except (json.JSONDecodeError, StopIteration, IndexError):
            pass

    # Extract shop info from embedded page state
    shop_match = re.search(r'"shop_name"\s*:\s*"([^"]+)"', resp.text)
    if shop_match:
        listing["shop_name"] = shop_match.group(1)

    sales_match = re.search(r'"num_transactions"\s*:\s*(\d+)', resp.text)
    if sales_match:
        listing["shop_sales"] = int(sales_match.group(1))

    # Extract listing tags
    tags_match = re.search(r'"tags"\s*:\s*(\[[^\]]+\])', resp.text)
    if tags_match:
        try:
            tags_raw = json.loads(tags_match.group(1))
            listing["tags"] = [t if isinstance(t, str) else t.get("value", "") for t in tags_raw]
        except json.JSONDecodeError:
            pass

    # Extract quantity available
    qty_match = re.search(r'"quantity_sold"\s*:\s*(\d+)', resp.text)
    if qty_match:
        listing["quantity_sold"] = int(qty_match.group(1))

    return listing

Scraping Shop Pages

Etsy shop pages contain aggregate shop data and the full listing grid:

def scrape_etsy_shop(shop_name: str, proxy: str = None) -> dict:
    """Scrape an Etsy shop page for shop metadata and listings."""
    url = f"https://www.etsy.com/shop/{shop_name}"

    headers = {
        "User-Agent": ua.random,
        "Accept": "text/html,application/xhtml+xml",
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": "https://www.etsy.com/",
    }

    client_kwargs = {"headers": headers, "follow_redirects": True, "timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url)

    if resp.status_code != 200:
        return {"error": f"Status {resp.status_code}", "shop_name": shop_name}

    import re
    from selectolax.parser import HTMLParser

    tree = HTMLParser(resp.text)
    shop = {"shop_name": shop_name, "url": url}

    # Sales count
    sales_match = re.search(r'"num_transactions"\s*:\s*(\d+)', resp.text)
    if sales_match:
        shop["total_sales"] = int(sales_match.group(1))

    # Shop location
    location_node = tree.css_first('[data-region="shop-location"]')
    if location_node:
        shop["location"] = location_node.text(strip=True)

    # Star Seller badge
    shop["star_seller"] = bool(tree.css_first('[data-seller-type="star_seller"]'))

    # Shop announcement
    announcement = tree.css_first(".shop-announcement-content")
    if announcement:
        shop["announcement"] = announcement.text(strip=True)[:500]

    # Listing count
    count_match = re.search(r'"listingCount"\s*:\s*(\d+)', resp.text)
    if count_match:
        shop["listing_count"] = int(count_match.group(1))

    return shop

Scraping Reviews

Etsy loads reviews via a separate API endpoint. You can hit it directly with the listing ID:

def scrape_listing_reviews(
    listing_id: str,
    max_pages: int = 3,
    proxy: str = None,
) -> list[dict]:
    """Fetch all reviews for an Etsy listing across multiple pages."""
    all_reviews = []

    for page in range(1, max_pages + 1):
        url = (
            f"https://www.etsy.com/api/v3/ajax/bespoke/public/neu"
            f"/specs/reviews/{listing_id}"
        )
        params = {"page": page, "sort_by": "recent"}

        headers = {
            "User-Agent": ua.random,
            "Accept": "application/json",
            "X-Requested-With": "XMLHttpRequest",
            "Referer": f"https://www.etsy.com/listing/{listing_id}",
        }

        client_kwargs = {"headers": headers, "follow_redirects": True, "timeout": 15}
        if proxy:
            client_kwargs["proxies"] = {"all://": proxy}

        with httpx.Client(**client_kwargs) as client:
            resp = client.get(url, params=params)

        if resp.status_code != 200:
            break

        data = resp.json()
        reviews_page = data.get("reviews", [])

        if not reviews_page:
            break

        for review in reviews_page:
            all_reviews.append({
                "rating": review.get("rating"),
                "review_text": review.get("review"),
                "buyer": review.get("reviewer", {}).get("name"),
                "date": review.get("created_at"),
                "transaction_title": review.get("transaction_title"),
                "seller_reply": review.get("seller_reply"),
                "listing_id": listing_id,
            })

        time.sleep(random.uniform(2, 4))

    return all_reviews

SQLite Schema

import sqlite3

def init_etsy_db(db_path: str = "etsy.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS shops (
            shop_name TEXT PRIMARY KEY,
            total_sales INTEGER,
            location TEXT,
            star_seller INTEGER DEFAULT 0,
            listing_count INTEGER,
            announcement TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS listings (
            listing_id TEXT,
            url TEXT PRIMARY KEY,
            name TEXT,
            shop_name TEXT,
            price REAL,
            currency TEXT,
            rating REAL,
            review_count INTEGER,
            shop_sales INTEGER,
            availability TEXT,
            description TEXT,
            tags TEXT,
            quantity_sold INTEGER,
            image_url TEXT,
            star_seller INTEGER DEFAULT 0,
            bestseller INTEGER DEFAULT 0,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (shop_name) REFERENCES shops(shop_name)
        );

        CREATE TABLE IF NOT EXISTS reviews (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            listing_id TEXT,
            rating INTEGER,
            review_text TEXT,
            buyer TEXT,
            date TEXT,
            transaction_title TEXT,
            seller_reply TEXT,
            FOREIGN KEY (listing_id) REFERENCES listings(listing_id)
        );

        CREATE INDEX IF NOT EXISTS idx_listings_shop
            ON listings(shop_name);

        CREATE INDEX IF NOT EXISTS idx_reviews_listing
            ON reviews(listing_id);
    """)
    conn.commit()
    return conn


def save_listing(conn: sqlite3.Connection, listing: dict):
    conn.execute(
        """INSERT OR REPLACE INTO listings
           (listing_id, url, name, shop_name, price, currency, rating,
            review_count, shop_sales, availability, description, tags,
            quantity_sold, image_url, star_seller, bestseller)
           VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
        (
            listing.get("listing_id"),
            listing.get("url"),
            listing.get("name"),
            listing.get("shop_name"),
            listing.get("price"),
            listing.get("currency"),
            listing.get("rating"),
            listing.get("review_count"),
            listing.get("shop_sales"),
            listing.get("availability"),
            listing.get("description"),
            json.dumps(listing.get("tags", [])),
            listing.get("quantity_sold"),
            listing.get("image_url"),
            int(listing.get("star_seller", False)),
            int(listing.get("bestseller", False)),
        ),
    )
    conn.commit()


def save_reviews(conn: sqlite3.Connection, reviews: list[dict]):
    conn.executemany(
        """INSERT INTO reviews
           (listing_id, rating, review_text, buyer, date, transaction_title, seller_reply)
           VALUES (?,?,?,?,?,?,?)""",
        [
            (
                r.get("listing_id"), r.get("rating"), r.get("review_text"),
                r.get("buyer"), r.get("date"), r.get("transaction_title"),
                r.get("seller_reply"),
            )
            for r in reviews
        ],
    )
    conn.commit()

Error Handling and Anti-Soft-Block Detection

Etsy's soft blocks return 200 with empty results — you will not know you have been throttled unless you validate response content:

import time
import random

def is_soft_blocked(data: dict) -> bool:
    """Check if Etsy returned an empty soft-block response."""
    nodes = data.get("nodes", [])
    if not nodes:
        return True
    for node in nodes:
        if node.get("type") == "search":
            results = node.get("data", {}).get("results", [])
            if results:
                return False
    return True


def search_etsy_with_retry(
    query: str,
    proxy: str = None,
    max_retries: int = 3,
) -> list[dict]:
    """Search Etsy with soft-block detection and retry with new proxy."""
    for attempt in range(max_retries):
        shops = search_etsy_shops(query, proxy=proxy)

        if shops:
            return shops

        print(f"  Empty response on attempt {attempt + 1} (possible soft block)")
        # Longer delay before retry
        time.sleep(random.uniform(30, 60))

    return []

Proxy Configuration

Etsy's bot detection is heavily IP-reputation based. Datacenter IPs get challenged within the first few requests. Residential IPs that look like normal Etsy shoppers are required for reliable access.

ThorData's residential proxy network handles Etsy well — the IPs come from real ISPs and pass Cloudflare's ASN checks without triggering JS challenges. For Etsy specifically, US residential IPs perform best since that is where the majority of buyer traffic originates.

PROXY = "http://USER:[email protected]:9000"

# Scrape top shops for a niche
shops = search_etsy_shops("handmade leather wallet", proxy=PROXY)
print(f"Found {len(shops)} shops")

conn = init_etsy_db()
for shop in shops[:5]:
    print(f"  {shop['shop_name']} — Favorites: {shop.get('num_favorers', 'N/A')}")
    time.sleep(random.uniform(4, 10))

    # Fetch listing details for each shop
    shop_data = scrape_etsy_shop(shop["shop_name"], proxy=PROXY)
    # save to db...
    time.sleep(random.uniform(6, 12))

Keep delays between 4-10 seconds for AJAX endpoints and 8-15 seconds between full page loads. Going faster than this will trigger soft blocks that silently return empty results.

Complete Scraping Pipeline

def run_etsy_pipeline(
    query: str,
    db_path: str = "etsy.db",
    proxy: str = None,
    max_pages: int = 3,
):
    """
    Full pipeline:
    1. Search shops by query
    2. Scrape each shop's top listings
    3. Fetch reviews for each listing
    4. Store everything in SQLite
    """
    conn = init_etsy_db(db_path)

    print(f"Searching Etsy for: {query}")
    shops = search_etsy_all_pages(query, max_pages=max_pages, proxy=proxy)
    print(f"Found {len(shops)} shops total")

    for shop in shops:
        shop_name = shop.get("shop_name")
        if not shop_name:
            continue

        print(f"\nProcessing shop: {shop_name}")

        # Save basic shop data
        conn.execute(
            """INSERT OR REPLACE INTO shops
               (shop_name, total_sales, star_seller)
               VALUES (?, ?, ?)""",
            (shop_name, shop.get("num_sales"), int(shop.get("star_seller", False))),
        )
        conn.commit()

        # Search listings within this shop
        shop_listings = search_etsy_listings(
            query=query, proxy=proxy
        )
        shop_listings = [l for l in shop_listings if l.get("shop_name") == shop_name]

        for listing in shop_listings[:5]:  # Top 5 listings per shop
            url = listing.get("url")
            if not url:
                continue

            print(f"  Listing: {listing.get('title', '')[:50]}")

            # Get full listing details
            details = scrape_etsy_listing(url, proxy=proxy)
            details["listing_id"] = listing.get("listing_id")
            details["star_seller"] = listing.get("star_seller", False)
            details["bestseller"] = listing.get("bestseller", False)

            save_listing(conn, details)

            # Fetch reviews
            lid = listing.get("listing_id")
            if lid:
                reviews = scrape_listing_reviews(lid, max_pages=2, proxy=proxy)
                save_reviews(conn, reviews)
                print(f"    {len(reviews)} reviews saved")

            time.sleep(random.uniform(8, 15))

    conn.close()
    print(f"\nPipeline complete. Data saved to {db_path}")


# Run it
PROXY = "http://USER:[email protected]:9000"
run_etsy_pipeline("handmade ceramic mug", proxy=PROXY)

Analyzing Etsy Market Data with SQL

Once you have data in SQLite, you can run market analysis queries:

import sqlite3

conn = sqlite3.connect("etsy.db")

# Top-performing shops by average rating
top_shops = conn.execute("""
    SELECT l.shop_name, COUNT(*) AS listing_count,
           AVG(l.rating) AS avg_rating, SUM(l.review_count) AS total_reviews,
           AVG(l.price) AS avg_price_usd
    FROM listings l
    WHERE l.rating IS NOT NULL
    GROUP BY l.shop_name
    HAVING listing_count >= 3
    ORDER BY avg_rating DESC, total_reviews DESC
    LIMIT 20
""").fetchall()

# Price distribution for a niche
price_dist = conn.execute("""
    SELECT
        CASE
            WHEN price < 10 THEN 'Under $10'
            WHEN price BETWEEN 10 AND 25 THEN '$10-$25'
            WHEN price BETWEEN 25 AND 50 THEN '$25-$50'
            WHEN price BETWEEN 50 AND 100 THEN '$50-$100'
            ELSE 'Over $100'
        END AS price_band,
        COUNT(*) AS listing_count,
        AVG(review_count) AS avg_reviews
    FROM listings
    WHERE price IS NOT NULL
    GROUP BY price_band
    ORDER BY MIN(price)
""").fetchall()

# Most common tags across scraped listings
tags_freq = conn.execute("""
    SELECT tag_value, COUNT(*) AS frequency
    FROM (
        SELECT json_each.value AS tag_value
        FROM listings, json_each(tags)
        WHERE tags IS NOT NULL AND tags != '[]'
    )
    GROUP BY tag_value
    ORDER BY frequency DESC
    LIMIT 30
""").fetchall()

# Star sellers vs non-star sellers — price and review comparison
star_seller_stats = conn.execute("""
    SELECT star_seller,
           COUNT(*) AS count,
           AVG(price) AS avg_price,
           AVG(review_count) AS avg_reviews,
           AVG(rating) AS avg_rating
    FROM listings
    WHERE rating IS NOT NULL
    GROUP BY star_seller
""").fetchall()

for row in star_seller_stats:
    label = "Star Seller" if row[0] else "Regular Seller"
    print(f"{label}: {row[1]} listings, avg ${row[2]:.2f}, {row[3]:.0f} reviews, {row[4]:.2f} stars")

Extracting Shop Performance Metrics

Understanding what makes a successful Etsy shop requires looking at multiple signals together:

def analyze_shop_performance(conn: sqlite3.Connection) -> list[dict]:
    """
    Compute shop-level performance metrics from scraped data.
    Returns shops ranked by composite performance score.
    """
    rows = conn.execute("""
        SELECT
            l.shop_name,
            COUNT(*) AS listing_count,
            AVG(l.price) AS avg_price,
            AVG(l.rating) AS avg_rating,
            SUM(l.review_count) AS total_reviews,
            SUM(CASE WHEN l.star_seller = 1 THEN 1 ELSE 0 END) AS star_seller_listings,
            SUM(CASE WHEN l.bestseller = 1 THEN 1 ELSE 0 END) AS bestseller_count,
            MAX(l.shop_sales) AS estimated_sales,
            AVG(l.quantity_sold) AS avg_quantity_sold
        FROM listings l
        GROUP BY l.shop_name
        HAVING listing_count >= 2
        ORDER BY total_reviews DESC
    """).fetchall()

    shops = []
    for row in rows:
        shops.append({
            "shop_name": row[0],
            "listing_count": row[1],
            "avg_price_usd": round(row[2] or 0, 2),
            "avg_rating": round(row[3] or 0, 2),
            "total_reviews": row[4] or 0,
            "is_star_seller": row[5] > 0,
            "bestseller_count": row[6] or 0,
            "estimated_sales": row[7],
            "avg_quantity_sold": round(row[8] or 0, 0),
        })

    return shops


shops = analyze_shop_performance(conn)
for s in shops[:10]:
    print(
        f"{s['shop_name']:<30} "
        f"Rating: {s['avg_rating']:.2f} "
        f"Reviews: {s['total_reviews']:>6} "
        f"Avg Price: ${s['avg_price_usd']:>7.2f}"
    )

Etsy's Terms of Service prohibit automated data collection. Their robots.txt blocks most scraping paths. Use this knowledge for: - Personal competitive research and price monitoring - Academic research and market analysis - One-off datasets for private use

Do not: - Republish Etsy listings as your own catalog - Use pricing data to automatically undercut sellers - Scrape and store seller personal information - Build tools that enable others to do the above at scale

For commercial use cases, Etsy's official Open API v3 with proper OAuth approval is the correct path.

The real value from Etsy data comes from watching trends. Here is how to build a longitudinal dataset:

import sqlite3
import json
import time
import random
from datetime import date, datetime

def run_weekly_snapshot(
    niche_queries: list,
    db_path: str = "etsy.db",
    proxy: str = None,
):
    """
    Run weekly market snapshots for a set of niches.
    Call this function once per week via cron.
    """
    conn = init_etsy_db(db_path)
    snapshot_date = date.today().isoformat()

    print(f"Weekly Etsy snapshot: {snapshot_date}")

    for query in niche_queries:
        print(f"\nNiche: {query}")

        # Search listings
        listings = search_etsy_all_pages(query, max_pages=3, proxy=proxy)
        print(f"  Found {len(listings)} listings")

        # Extract market stats
        prices = [l.get("price") for l in listings if l.get("price")]
        favorites = [l.get("num_favorers") for l in listings if l.get("num_favorers")]
        star_count = sum(1 for l in listings if l.get("star_seller"))
        bestseller_count = sum(1 for l in listings if l.get("bestseller"))

        import statistics

        if prices:
            stats = {
                "snapshot_date": snapshot_date,
                "niche": query,
                "listing_count": len(listings),
                "median_price": statistics.median(prices),
                "avg_price": round(statistics.mean(prices), 2),
                "min_price": min(prices),
                "max_price": max(prices),
                "pct_star_seller": round(star_count / len(listings) * 100, 1) if listings else 0,
                "pct_bestseller": round(bestseller_count / len(listings) * 100, 1) if listings else 0,
                "avg_favorites": round(statistics.mean(favorites), 1) if favorites else 0,
            }

            # Store snapshot
            conn.execute(
                """CREATE TABLE IF NOT EXISTS market_snapshots (
                   snapshot_date TEXT, niche TEXT, listing_count INTEGER,
                   median_price REAL, avg_price REAL, min_price REAL, max_price REAL,
                   pct_star_seller REAL, pct_bestseller REAL, avg_favorites REAL,
                   PRIMARY KEY (snapshot_date, niche)
                )""",
            )
            conn.execute(
                """INSERT OR REPLACE INTO market_snapshots
                   VALUES (?,?,?,?,?,?,?,?,?,?)""",
                tuple(stats.values()),
            )
            conn.commit()

            print(f"  Median price: ${stats['median_price']:.2f}")
            print(f"  Star sellers: {stats['pct_star_seller']}%")

        # Save individual listings too
        for listing in listings:
            save_listing(conn, listing)

        time.sleep(random.uniform(15, 30))

    conn.close()


# Set up for weekly cron
NICHES = [
    "handmade ceramic mug",
    "personalized jewelry gift",
    "digital wedding invitation",
    "custom portrait print",
    "crochet baby blanket",
]

run_weekly_snapshot(NICHES, proxy="http://USER:[email protected]:9000")

Price Elasticity Analysis

Understanding how price affects favoriting and sales is key to pricing strategy research:

def analyze_price_elasticity(conn: sqlite3.Connection, niche: str = None) -> dict:
    """
    Analyze the relationship between price and engagement metrics.
    Returns correlation data and optimal price ranges.
    """
    query = """
        SELECT price, num_favorers, review_count, star_seller, bestseller
        FROM listings
        WHERE price IS NOT NULL AND price > 0 AND price < 1000
    """
    params = ()
    if niche:
        query += " AND (name LIKE ? OR tags LIKE ?)"
        params = (f'%{niche}%', f'%{niche}%')

    rows = conn.execute(query, params).fetchall()

    if len(rows) < 10:
        return {"error": "insufficient_data", "count": len(rows)}

    # Group into price buckets
    buckets = {
        "under_10": [],
        "10_to_25": [],
        "25_to_50": [],
        "50_to_100": [],
        "100_to_250": [],
        "over_250": [],
    }

    for price, faves, reviews, star, best in rows:
        engagement = (faves or 0) + (reviews or 0) * 5  # Reviews worth 5x favorites

        if price < 10:
            buckets["under_10"].append((price, engagement, star, best))
        elif price < 25:
            buckets["10_to_25"].append((price, engagement, star, best))
        elif price < 50:
            buckets["25_to_50"].append((price, engagement, star, best))
        elif price < 100:
            buckets["50_to_100"].append((price, engagement, star, best))
        elif price < 250:
            buckets["100_to_250"].append((price, engagement, star, best))
        else:
            buckets["over_250"].append((price, engagement, star, best))

    import statistics

    analysis = {}
    for bucket, data in buckets.items():
        if len(data) >= 3:
            engagements = [d[1] for d in data]
            star_rate = sum(1 for d in data if d[2]) / len(data) * 100
            analysis[bucket] = {
                "count": len(data),
                "avg_engagement": round(statistics.mean(engagements), 1),
                "median_engagement": round(statistics.median(engagements), 1),
                "star_seller_pct": round(star_rate, 1),
                "avg_price": round(statistics.mean(d[0] for d in data), 2),
            }

    return {
        "niche": niche,
        "total_listings": len(rows),
        "by_price_bucket": analysis,
        "optimal_range": max(
            analysis.items(),
            key=lambda x: x[1]["avg_engagement"]
        )[0] if analysis else None,
    }

Niche Saturation Scoring

Determine how saturated a niche is before entering it:

def score_niche_opportunity(
    query: str,
    proxy: str = None,
    db_path: str = "etsy.db",
) -> dict:
    """
    Score a potential Etsy niche on opportunity vs. saturation.
    Returns a composite score with supporting metrics.
    """
    # Collect sample data
    listings = search_etsy_listings(query, page=1, proxy=proxy)

    if not listings:
        return {"error": "no_data", "query": query}

    prices = [l["price"] for l in listings if l.get("price")]
    favorites = [l["num_favorers"] for l in listings if l.get("num_favorers")]
    star_count = sum(1 for l in listings if l.get("star_seller"))
    bestseller_count = sum(1 for l in listings if l.get("bestseller"))

    import statistics

    if not prices:
        return {"error": "no_prices", "query": query}

    avg_price = statistics.mean(prices)
    avg_favorites = statistics.mean(favorites) if favorites else 0

    # Saturation signals (higher = more saturated)
    saturation_factors = {
        "high_star_seller_pct": (star_count / len(listings)) > 0.3,  # Many established sellers
        "price_compression": max(prices) / avg_price < 2,  # Prices bunched together
        "low_favorites_avg": avg_favorites < 100,  # Low engagement
        "many_bestsellers": (bestseller_count / len(listings)) > 0.4,
    }

    saturation_count = sum(saturation_factors.values())

    # Opportunity signals (higher = better opportunity)
    opportunity_factors = {
        "high_avg_price": avg_price > 25,  # Worth the effort
        "high_favorites": avg_favorites > 500,  # Proven demand
        "few_star_sellers": (star_count / len(listings)) < 0.15,  # Less competition
        "price_spread": max(prices) / avg_price > 3,  # Room for premium positioning
    }

    opportunity_count = sum(opportunity_factors.values())

    return {
        "query": query,
        "sample_size": len(listings),
        "avg_price": round(avg_price, 2),
        "avg_favorites": round(avg_favorites, 1),
        "pct_star_seller": round(star_count / len(listings) * 100, 1),
        "saturation_score": saturation_count,  # 0-4, lower is less saturated
        "opportunity_score": opportunity_count,  # 0-4, higher is better
        "net_opportunity": opportunity_count - saturation_count,  # > 0 means worth exploring
        "saturation_signals": saturation_factors,
        "opportunity_signals": opportunity_factors,
    }


# Evaluate multiple niches
niches = [
    "personalized dog collar",
    "custom phone case",
    "hand painted portrait",
    "digital planner template",
    "resin ocean tray",
]

PROXY = "http://USER:[email protected]:9000"
print("Niche opportunity analysis:")
for niche in niches:
    score = score_niche_opportunity(niche, proxy=PROXY)
    print(f"  {niche:<35} net={score['net_opportunity']:+d}  price=${score['avg_price']:.0f}  faves={score['avg_favorites']:.0f}")
    time.sleep(random.uniform(10, 20))

Competitor Shop Monitoring

Track specific competitor shops over time:

def monitor_competitor_shops(
    shop_names: list,
    db_path: str = "etsy.db",
    proxy: str = None,
):
    """
    Monitor a list of competitor shops weekly.
    Tracks listing count, total reviews, and new products.
    """
    conn = init_etsy_db(db_path)
    today = date.today().isoformat()

    for shop_name in shop_names:
        print(f"Monitoring: {shop_name}")

        # Get shop data via API
        try:
            shop = scrape_etsy_shop(shop_name, proxy=proxy)
        except Exception as e:
            print(f"  Error: {e}")
            continue

        # Store snapshot
        conn.execute(
            """INSERT OR REPLACE INTO shops
               (shop_name, total_sales, star_seller, listing_count)
               VALUES (?, ?, ?, ?)""",
            (shop_name, shop.get("total_sales"),
             int(shop.get("star_seller", False)),
             shop.get("listing_count")),
        )
        conn.commit()

        print(f"  Sales: {shop.get('total_sales', 'N/A')}")
        print(f"  Listings: {shop.get('listing_count', 'N/A')}")
        time.sleep(random.uniform(10, 20))

    conn.close()

Key Takeaways for Etsy Scraping in 2026