← Back to blog

How to Scrape Product Hunt in 2026: Products, Upvotes & Maker Profiles

How to Scrape Product Hunt in 2026: Products, Upvotes & Maker Profiles

Product Hunt is a goldmine for startup intelligence — daily product launches, real upvote data, and direct access to founders. Whether you're tracking competitors, sourcing leads, or building a trend analysis tool, getting this data programmatically saves hours of manual work.

This guide covers two approaches: the official GraphQL API v2 for structured data access, and Playwright-based web scraping as a fallback when the API hits its limits.

What You Can Extract

Product Hunt's Anti-Bot Measures

  1. API rate limits — The official GraphQL API v2 caps authenticated requests at 900 per day per token. Unauthenticated requests get far fewer (~200 per day).
  2. Browser fingerprinting — The web interface runs fingerprint checks via JavaScript that flag headless browsers, unusual screen dimensions, and missing plugins.
  3. Aggressive throttling on unauthenticated requests — Hit the API without a token a few times and you get soft-blocked. Keep going and it's a hard block on the IP.
  4. Dynamic JavaScript rendering — Product cards are injected by React after page load. Raw HTML fetches return a shell. You need a real browser or the API.
  5. Bot detection patterns — Consistent timing between requests, missing referer headers, and unusual user agents all trigger flags.

Dependencies and Setup

pip install httpx playwright
playwright install chromium

Method 1: The Official GraphQL API (v2)

Get an API token from Product Hunt developer settings. Create an application, grab the API key (not OAuth — the direct developer token).

import httpx
import time
import random
import json
from typing import Optional

API_URL = "https://api.producthunt.com/v2/api/graphql"
TOKEN = "your_api_token_here"

PRODUCTS_QUERY = """
query GetProducts($first: Int!, $after: String) {
  posts(first: $first, after: $after, order: VOTES) {
    pageInfo {
      hasNextPage
      endCursor
    }
    edges {
      node {
        id
        name
        tagline
        description
        votesCount
        commentsCount
        url
        website
        createdAt
        featuredAt
        reviewsCount
        reviewsRating
        pricingType
        topics {
          edges {
            node {
              name
              slug
            }
          }
        }
        makers {
          id
          name
          username
          headline
          profileImage
          websiteUrl
          twitterUsername
          followersCount
        }
        media {
          type
          url
          videoUrl
        }
        thumbnail {
          url
        }
      }
    }
  }
}
"""


def make_headers(token: str = None) -> dict:
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/126.0.0.0",
    }
    if token:
        headers["Authorization"] = f"Bearer {token}"
    return headers


def fetch_products(
    cursor: Optional[str] = None,
    first: int = 20,
    token: str = None,
    proxy: str = None,
) -> dict:
    """Fetch a page of Product Hunt products."""
    variables = {"first": first}
    if cursor:
        variables["after"] = cursor

    client_kwargs = {
        "headers": make_headers(token),
        "timeout": 30,
    }
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        response = client.post(
            API_URL,
            json={"query": PRODUCTS_QUERY, "variables": variables},
        )

    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        print(f"Rate limited. Waiting {retry_after}s...")
        time.sleep(retry_after)
        return fetch_products(cursor, first, token, proxy)

    response.raise_for_status()
    data = response.json()

    if "errors" in data:
        raise Exception(f"GraphQL errors: {data['errors']}")

    return data["data"]


def scrape_all_products(
    max_pages: int = 20,
    token: str = None,
    proxy: str = None,
) -> list:
    """Scrape products with cursor pagination, up to max_pages pages."""
    products = []
    cursor = None

    for page in range(max_pages):
        data = fetch_products(cursor=cursor, token=token, proxy=proxy)
        page_data = data["posts"]

        for edge in page_data["edges"]:
            node = edge["node"]
            products.append({
                "id": node["id"],
                "name": node["name"],
                "tagline": node["tagline"],
                "description": node.get("description", ""),
                "votes": node["votesCount"],
                "comments": node["commentsCount"],
                "url": node["url"],
                "website": node.get("website"),
                "created_at": node["createdAt"],
                "featured_at": node.get("featuredAt"),
                "reviews_count": node.get("reviewsCount", 0),
                "reviews_rating": node.get("reviewsRating"),
                "pricing_type": node.get("pricingType"),
                "topics": [e["node"]["name"] for e in node["topics"]["edges"]],
                "makers": [
                    {
                        "id": m["id"],
                        "name": m["name"],
                        "username": m["username"],
                        "headline": m.get("headline"),
                        "twitter": m.get("twitterUsername"),
                        "website": m.get("websiteUrl"),
                        "followers": m.get("followersCount", 0),
                    }
                    for m in node["makers"]
                ],
                "thumbnail": node["thumbnail"]["url"] if node.get("thumbnail") else None,
                "media_count": len(node.get("media", [])),
                "has_video": any(m.get("videoUrl") for m in node.get("media", [])),
            })

        print(f"Page {page+1}: {len(page_data['edges'])} products ({len(products)} total)")

        if not page_data["pageInfo"]["hasNextPage"]:
            print("No more pages")
            break

        cursor = page_data["pageInfo"]["endCursor"]
        time.sleep(random.uniform(1.5, 3.0))

    return products

Fetching a Specific Product by Slug

PRODUCT_BY_SLUG_QUERY = """
query GetProduct($slug: String!) {
  post(slug: $slug) {
    id
    name
    tagline
    description
    body
    votesCount
    commentsCount
    url
    website
    createdAt
    featuredAt
    reviewsCount
    reviewsRating
    pricingType
    makers {
      id
      name
      username
      headline
      twitterUsername
      websiteUrl
      followersCount
    }
    topics {
      edges {
        node { name slug }
      }
    }
    comments(first: 10, order: VOTES) {
      edges {
        node {
          id
          body
          votesCount
          user { name username headline }
          createdAt
        }
      }
    }
    media {
      type
      url
      videoUrl
    }
  }
}
"""


def get_product_by_slug(
    slug: str,
    token: str = None,
    proxy: str = None,
) -> dict | None:
    """Get a specific product by its URL slug."""
    client_kwargs = {
        "headers": make_headers(token),
        "timeout": 30,
    }
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        response = client.post(
            API_URL,
            json={"query": PRODUCT_BY_SLUG_QUERY, "variables": {"slug": slug}},
        )
        response.raise_for_status()
        data = response.json()

    if "errors" in data:
        print(f"GraphQL errors for {slug}: {data['errors']}")
        return None

    return data["data"].get("post")


# Example: fetch a specific product
product = get_product_by_slug("cursor", token=TOKEN)
if product:
    print(f"{product['name']}: {product['votesCount']} votes")
    print(f"Made by: {', '.join(m['name'] for m in product['makers'])}")
    print(f"Topics: {', '.join(e['node']['name'] for e in product['topics']['edges'])}")
    top_comment = product['comments']['edges'][0]['node'] if product['comments']['edges'] else None
    if top_comment:
        print(f"Top comment ({top_comment['votesCount']} votes): {top_comment['body'][:100]}...")

Fetching Maker Profiles

Dig into maker profiles to understand founders and their portfolios:

MAKER_QUERY = """
query GetMaker($username: String!) {
  user(username: $username) {
    id
    name
    username
    headline
    websiteUrl
    twitterUsername
    followersCount
    followingsCount
    votedProducts(first: 5) {
      edges {
        node { name votesCount }
      }
    }
    madePosts(first: 10, order: VOTES) {
      edges {
        node {
          id name tagline votesCount
          createdAt url
        }
      }
    }
  }
}
"""


def get_maker_profile(username: str, token: str = None, proxy: str = None) -> dict | None:
    """Fetch a maker's full profile and their products."""
    client_kwargs = {"headers": make_headers(token), "timeout": 30}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        response = client.post(
            API_URL,
            json={"query": MAKER_QUERY, "variables": {"username": username}},
        )
        response.raise_for_status()
        data = response.json()

    if "errors" in data:
        return None

    user = data["data"].get("user")
    if not user:
        return None

    # Parse made products
    made_posts = [
        {
            "id": e["node"]["id"],
            "name": e["node"]["name"],
            "tagline": e["node"]["tagline"],
            "votes": e["node"]["votesCount"],
            "created_at": e["node"]["createdAt"],
            "url": e["node"]["url"],
        }
        for e in user.get("madePosts", {}).get("edges", [])
    ]

    return {
        "id": user["id"],
        "name": user["name"],
        "username": user["username"],
        "headline": user.get("headline"),
        "website": user.get("websiteUrl"),
        "twitter": user.get("twitterUsername"),
        "followers": user.get("followersCount", 0),
        "following": user.get("followingsCount", 0),
        "products": made_posts,
        "total_products": len(made_posts),
        "total_votes_received": sum(p["votes"] for p in made_posts),
    }

Method 2: Web Scraping with Playwright

When the API limit is exhausted, fall back to Playwright:

import asyncio
from playwright.async_api import async_playwright

async def scrape_daily_page(date: str = None, proxy: dict = None) -> list:
    """
    Scrape the Product Hunt homepage or a specific date's page.
    date format: YYYY-MM-DD (optional, defaults to today)
    proxy: dict with 'server', 'username', 'password'
    """
    products = []

    async with async_playwright() as p:
        launch_kwargs = {
            "headless": True,
            "args": [
                "--disable-blink-features=AutomationControlled",
                "--no-sandbox",
                "--disable-dev-shm-usage",
            ]
        }
        if proxy:
            launch_kwargs["proxy"] = proxy

        browser = await p.chromium.launch(**launch_kwargs)
        context = await browser.new_context(
            viewport={"width": 1440, "height": 900},
            user_agent=(
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/125.0.0.0 Safari/537.36"
            ),
            locale="en-US",
            timezone_id="America/New_York",
        )

        await context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
            Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4] });
            window.chrome = { runtime: {} };
        """)

        page = await context.new_page()
        url = f"https://www.producthunt.com?day={date}" if date else "https://www.producthunt.com"
        await page.goto(url, wait_until="networkidle", timeout=30000)
        await page.wait_for_timeout(3000)

        # Wait for product cards
        try:
            await page.wait_for_selector(
                "[data-test='post-item'], [class*='post-item']",
                timeout=15000
            )
        except Exception:
            print("Product cards not found — possible Cloudflare block")
            await browser.close()
            return []

        # Scroll to load lazy content
        for _ in range(3):
            await page.keyboard.press("End")
            await asyncio.sleep(1.5)

        cards = await page.query_selector_all(
            "[data-test='post-item'], [class*='post-item']"
        )

        for card in cards:
            try:
                name_el = await card.query_selector("h3")
                tagline_el = await card.query_selector("[data-test='post-tagline'], [class*='tagline']")
                vote_el = await card.query_selector("[data-test='vote-button'], [aria-label*='vote']")
                link_el = await card.query_selector("a[href*='/posts/']")
                topic_els = await card.query_selector_all("[class*='topic'], [data-test*='topic']")

                name = await name_el.inner_text() if name_el else ""
                tagline = await tagline_el.inner_text() if tagline_el else ""
                vote_text = await vote_el.inner_text() if vote_el else "0"
                href = await link_el.get_attribute("href") if link_el else ""

                topics = []
                for tel in topic_els[:3]:
                    t = await tel.inner_text()
                    if t.strip():
                        topics.append(t.strip())

                def parse_votes(raw: str) -> int:
                    raw = raw.strip()
                    if raw.endswith("K"):
                        return int(float(raw[:-1]) * 1000)
                    try:
                        return int(raw)
                    except ValueError:
                        return 0

                products.append({
                    "name": name.strip(),
                    "tagline": tagline.strip(),
                    "votes": parse_votes(vote_text),
                    "url": f"https://www.producthunt.com{href}" if href.startswith("/") else href,
                    "topics": topics,
                })

                await asyncio.sleep(random.uniform(0.1, 0.3))

            except Exception:
                continue

        await browser.close()

    return sorted(products, key=lambda x: x["votes"], reverse=True)

Scaling with Residential Proxies

Once you're running this at volume — pulling historical data, monitoring daily launches, tracking upvote velocity — the 900/day API cap and IP-based throttling become real problems. Rotating residential proxies solve both: each request comes from a different real residential IP, so Product Hunt's rate limiting and bot detection see unrelated users rather than a single scraper.

ThorData's residential proxy network provides automatic rotation with solid coverage. Plugging it into the API client is straightforward:

PROXY_URL = "http://user:[email protected]:9000"

# For API requests via httpx
products = scrape_all_products(
    max_pages=20,
    token=TOKEN,
    proxy=PROXY_URL,
)

# For Playwright
playwright_proxy = {
    "server": "http://proxy.thordata.com:9000",
    "username": "YOUR_USER",
    "password": "YOUR_PASS",
}
# results = asyncio.run(scrape_daily_page(proxy=playwright_proxy))

Storing the Data

import sqlite3
from datetime import datetime

def init_db(db_path: str = "producthunt.db") -> sqlite3.Connection:
    """Initialize the Product Hunt database schema."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS products (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            tagline TEXT,
            description TEXT,
            votes_count INTEGER DEFAULT 0,
            comments_count INTEGER DEFAULT 0,
            url TEXT,
            website TEXT,
            created_at TEXT,
            featured_at TEXT,
            topics TEXT,
            pricing_type TEXT,
            reviews_count INTEGER DEFAULT 0,
            reviews_rating REAL,
            thumbnail_url TEXT,
            has_video BOOLEAN DEFAULT 0,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS makers (
            id TEXT,
            product_id TEXT,
            name TEXT,
            username TEXT,
            twitter TEXT,
            website TEXT,
            followers INTEGER DEFAULT 0,
            headline TEXT,
            PRIMARY KEY (id, product_id),
            FOREIGN KEY (product_id) REFERENCES products(id)
        );

        CREATE TABLE IF NOT EXISTS vote_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id TEXT,
            votes_count INTEGER,
            recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_products_votes ON products(votes_count DESC);
        CREATE INDEX IF NOT EXISTS idx_products_created ON products(created_at);
        CREATE INDEX IF NOT EXISTS idx_products_featured ON products(featured_at);
        CREATE INDEX IF NOT EXISTS idx_products_topics ON products(topics);
    """)
    conn.commit()
    return conn


def save_products(conn: sqlite3.Connection, products: list) -> int:
    """Save products to DB, returns count of new/updated records."""
    saved = 0
    for product in products:
        existing = conn.execute(
            "SELECT votes_count FROM products WHERE id = ?", (product["id"],)
        ).fetchone()

        conn.execute(
            """INSERT OR REPLACE INTO products
               (id, name, tagline, description, votes_count, comments_count, url,
                website, created_at, featured_at, topics, pricing_type,
                reviews_count, reviews_rating, thumbnail_url, has_video)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                product["id"], product["name"], product["tagline"],
                product.get("description", ""),
                product["votes"], product["comments"],
                product["url"], product.get("website"),
                product.get("created_at"), product.get("featured_at"),
                json.dumps(product.get("topics", [])),
                product.get("pricing_type"),
                product.get("reviews_count", 0),
                product.get("reviews_rating"),
                product.get("thumbnail"),
                int(product.get("has_video", False)),
            )
        )

        # Track vote changes
        if existing and existing[0] != product["votes"]:
            conn.execute(
                "INSERT INTO vote_history (product_id, votes_count) VALUES (?, ?)",
                (product["id"], product["votes"])
            )

        for maker in product.get("makers", []):
            conn.execute(
                """INSERT OR REPLACE INTO makers
                   (id, product_id, name, username, twitter, website, followers, headline)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    maker["id"], product["id"], maker["name"],
                    maker["username"], maker.get("twitter"),
                    maker.get("website"), maker.get("followers", 0),
                    maker.get("headline"),
                )
            )

        saved += 1

    conn.commit()
    return saved

Analyzing the Data

def analyze_products(conn: sqlite3.Connection) -> dict:
    """Run comprehensive analytics on the stored product data."""

    # Top 10 by votes
    top_products = conn.execute("""
        SELECT name, votes_count, tagline, featured_at
        FROM products ORDER BY votes_count DESC LIMIT 10
    """).fetchall()

    # Products per topic
    topics_count = {}
    for row in conn.execute("SELECT topics FROM products WHERE topics != '[]'"):
        for topic in json.loads(row[0]):
            topics_count[topic] = topics_count.get(topic, 0) + 1

    # Most prolific makers
    top_makers = conn.execute("""
        SELECT m.name, m.username, m.followers,
               COUNT(DISTINCT m.product_id) as products_made,
               SUM(p.votes_count) as total_votes
        FROM makers m
        JOIN products p ON m.product_id = p.id
        GROUP BY m.id
        HAVING products_made >= 2
        ORDER BY total_votes DESC
        LIMIT 10
    """).fetchall()

    # Pricing distribution
    pricing = conn.execute("""
        SELECT pricing_type, COUNT(*) as count, AVG(votes_count) as avg_votes
        FROM products
        WHERE pricing_type IS NOT NULL
        GROUP BY pricing_type
        ORDER BY avg_votes DESC
    """).fetchall()

    # Video vs non-video performance
    video_stats = conn.execute("""
        SELECT has_video,
               COUNT(*) as count,
               AVG(votes_count) as avg_votes,
               AVG(comments_count) as avg_comments
        FROM products
        GROUP BY has_video
    """).fetchall()

    print("=== Top 10 Products by Votes ===")
    for name, votes, tagline, date in top_products:
        print(f"  {votes:5d} — {name}: {(tagline or '')[:50]}")

    print("\n=== Top Topics ===")
    for topic, count in sorted(topics_count.items(), key=lambda x: -x[1])[:10]:
        print(f"  {topic}: {count} products")

    print("\n=== Top Makers ===")
    for name, username, followers, products_made, total_votes in top_makers:
        print(f"  @{username} ({name}): {products_made} products, {total_votes:,} total votes, {followers:,} followers")

    print("\n=== Pricing Type Performance ===")
    for ptype, count, avg_votes in pricing:
        print(f"  {ptype}: {count} products, avg {avg_votes:.0f} votes")

    print("\n=== Video vs No Video ===")
    for has_vid, count, avg_v, avg_c in video_stats:
        label = "With video" if has_vid else "No video"
        print(f"  {label}: {count} products, avg {avg_v:.0f} votes, avg {avg_c:.1f} comments")

    return {
        "top_products": top_products,
        "top_topics": sorted(topics_count.items(), key=lambda x: -x[1])[:20],
        "top_makers": top_makers,
    }


# Full run
if __name__ == "__main__":
    PROXY_URL = "http://user:[email protected]:9000"

    conn = init_db()
    products = scrape_all_products(max_pages=20, token=TOKEN, proxy=PROXY_URL)
    count = save_products(conn, products)
    print(f"Saved {count} products")

    analyze_products(conn)
    conn.close()

Product Hunt's Terms of Service prohibit scraping that circumvents technical measures or harvests personal data at scale. Use the official API where possible, respect the rate limits, and avoid storing personally identifiable information on makers beyond what's publicly visible on their profiles. If you're building a commercial product on top of this data, review their developer policy before shipping.

The API approach is generally safer than Playwright-based scraping since you're using an explicitly provided interface. Keep your API key secret, don't share it across projects, and monitor for rate limit responses to avoid token revocation.

Conclusion

Product Hunt's GraphQL API v2 is one of the cleaner startup data sources available. For most use cases — monitoring launches, researching makers, analyzing trends — the official API with its structured queries is the right approach. Playwright fallback handles edge cases where the API quota runs out or data not accessible through the API is needed.

For volume work beyond the 900/day API limit, ThorData's residential proxies provide the IP diversity needed to scale without triggering Product Hunt's rate limits. Combined with cursor pagination, vote history tracking, and SQLite storage, you'll have a production-grade startup intelligence pipeline in a few hundred lines of Python.