← Back to blog

How to Scrape Product Hunt Launches in 2026: Upvotes, Comments & Maker Data

How to Scrape Product Hunt Launches in 2026: Upvotes, Comments & Maker Data

Product Hunt is where startups launch and the tech community votes. Daily launches, upvote counts, comment threads, and maker profiles — it's a goldmine for competitive intelligence, trend analysis, and understanding what resonates with early adopters.

Product Hunt provides a public GraphQL API that's surprisingly generous. With the right queries, you can pull launch data, maker profiles, and full comment threads without authentication for basic access. For heavier use, a developer token (free) unlocks higher rate limits.

What Data Product Hunt Exposes

Each launch and profile contains:

Product Hunt's API and Protections

Product Hunt is more open than most platforms, but still has guardrails:

  1. GraphQL API — The official API at api.producthunt.com/v2/api/graphql is the primary data source. It works without auth for basic queries but rate limits kick in quickly.
  2. Rate limiting — Unauthenticated: ~30 requests per 15 minutes. With developer token: ~450 requests per 15 minutes. Exceeding limits returns 429 responses.
  3. Query complexity limits — The GraphQL API rejects queries that request too many nested fields. You need to keep queries focused.
  4. Cloudflare on the website — The web frontend uses Cloudflare, but the API endpoint has lighter protection.
  5. Token requirements for some fields — Detailed maker data and historical launches require a bearer token.

Dependencies and Setup

pip install httpx requests fake-useragent playwright
playwright install chromium

Method 1: The GraphQL API (No Auth)

Basic launch data is available without any authentication:

import httpx
import json
import time
import random
from datetime import datetime, timedelta

try:
    from fake_useragent import UserAgent
    ua = UserAgent()
    def get_ua():
        return ua.random
except ImportError:
    USER_AGENTS = [
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/126.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/126.0.0.0 Safari/537.36",
    ]
    def get_ua():
        return random.choice(USER_AGENTS)

PH_API = "https://api.producthunt.com/v2/api/graphql"


def fetch_daily_launches(date: str = None, proxy: str = None) -> list:
    """
    Fetch Product Hunt launches for a specific date.
    date format: YYYY-MM-DD. Defaults to today.
    """
    if date is None:
        date = datetime.now().strftime("%Y-%m-%d")

    query = """
    query GetDailyPosts($postedAfter: DateTime!, $postedBefore: DateTime!) {
        posts(postedAfter: $postedAfter, postedBefore: $postedBefore, first: 50, order: VOTES) {
            edges {
                node {
                    id
                    name
                    tagline
                    description
                    votesCount
                    commentsCount
                    createdAt
                    url
                    website
                    reviewsRating
                    reviewsCount
                    topics { edges { node { name slug } } }
                    thumbnail { url }
                    makers {
                        id
                        name
                        username
                        headline
                        twitterUsername
                    }
                    media { type url videoUrl }
                }
            }
        }
    }
    """

    headers = {
        "User-Agent": get_ua(),
        "Accept": "application/json",
        "Content-Type": "application/json",
        "Origin": "https://www.producthunt.com",
        "Referer": "https://www.producthunt.com/",
    }

    variables = {
        "postedAfter": f"{date}T00:00:00Z",
        "postedBefore": f"{date}T23:59:59Z",
    }

    client_kwargs = {"headers": headers, "follow_redirects": True, "timeout": 20}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.post(PH_API, json={"query": query, "variables": variables})

    if resp.status_code == 429:
        retry_after = int(resp.headers.get("Retry-After", 60))
        print(f"Rate limited. Waiting {retry_after}s...")
        time.sleep(retry_after)
        return fetch_daily_launches(date, proxy=proxy)

    if resp.status_code != 200:
        print(f"API error: HTTP {resp.status_code}")
        return []

    data = resp.json()
    if "errors" in data:
        print(f"GraphQL errors: {data['errors']}")
        return []

    launches = []
    edges = data.get("data", {}).get("posts", {}).get("edges", [])
    for edge in edges:
        node = edge["node"]
        launches.append({
            "id": node["id"],
            "name": node["name"],
            "tagline": node["tagline"],
            "description": (node.get("description") or "")[:400],
            "votes": node["votesCount"],
            "comments": node["commentsCount"],
            "launched_at": node["createdAt"],
            "url": node["url"],
            "website": node.get("website"),
            "reviews_rating": node.get("reviewsRating"),
            "reviews_count": node.get("reviewsCount", 0),
            "topics": [t["node"]["name"] for t in node.get("topics", {}).get("edges", [])],
            "makers": [
                {
                    "id": m["id"],
                    "name": m["name"],
                    "username": m.get("username"),
                    "headline": m.get("headline"),
                    "twitter": m.get("twitterUsername"),
                }
                for m in node.get("makers", [])
            ],
            "thumbnail": node.get("thumbnail", {}).get("url") if node.get("thumbnail") else None,
            "media_count": len(node.get("media", [])),
            "has_video": any(m.get("videoUrl") for m in node.get("media", [])),
        })

    return sorted(launches, key=lambda x: x["votes"], reverse=True)


# Example: get today's launches
today = datetime.now().strftime("%Y-%m-%d")
launches = fetch_daily_launches(today)
print(f"Today's top launches:")
for i, l in enumerate(launches[:5], 1):
    print(f"  #{i} {l['name']} — {l['votes']} upvotes — {', '.join(l['topics'][:3])}")

Method 2: Authenticated Access for Full Data

A free developer token unlocks higher limits and more fields. Register at producthunt.com/v2/oauth/applications:

def fetch_launches_authenticated(
    date: str,
    token: str,
    proxy: str = None,
    first: int = 50,
) -> list:
    """Fetch launches with a developer token for higher rate limits and more data."""
    query = """
    query GetPosts($postedAfter: DateTime!, $first: Int!, $after: String) {
        posts(postedAfter: $postedAfter, first: $first, after: $after, order: VOTES) {
            edges {
                node {
                    id name tagline description
                    votesCount commentsCount
                    website createdAt featuredAt
                    reviewsRating reviewsCount
                    pricingType
                    makers {
                        id name username headline
                        twitterUsername followersCount
                    }
                    topics { edges { node { name slug } } }
                    media { type url videoUrl }
                    thumbnail { url }
                }
            }
            pageInfo { hasNextPage endCursor }
        }
    }
    """

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "application/json",
        "User-Agent": get_ua(),
    }

    client_kwargs = {"headers": headers, "timeout": 20}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    all_launches = []
    cursor = None

    with httpx.Client(**client_kwargs) as client:
        while True:
            variables = {
                "postedAfter": f"{date}T00:00:00Z",
                "first": first,
            }
            if cursor:
                variables["after"] = cursor

            resp = client.post(PH_API, json={"query": query, "variables": variables})

            if resp.status_code == 429:
                retry_after = int(resp.headers.get("Retry-After", 60))
                print(f"Rate limited. Waiting {retry_after}s...")
                time.sleep(retry_after)
                continue

            if resp.status_code != 200:
                break

            data = resp.json()
            posts_data = data.get("data", {}).get("posts", {})
            edges = posts_data.get("edges", [])

            for edge in edges:
                node = edge["node"]
                all_launches.append({
                    "id": node["id"],
                    "name": node["name"],
                    "tagline": node["tagline"],
                    "description": (node.get("description") or "")[:600],
                    "votes": node["votesCount"],
                    "comments": node["commentsCount"],
                    "website": node.get("website"),
                    "launched_at": node["createdAt"],
                    "featured_at": node.get("featuredAt"),
                    "reviews_rating": node.get("reviewsRating"),
                    "reviews_count": node.get("reviewsCount", 0),
                    "pricing_type": node.get("pricingType"),
                    "topics": [t["node"]["name"] for t in node.get("topics", {}).get("edges", [])],
                    "makers": [
                        {
                            "id": m["id"],
                            "name": m["name"],
                            "username": m.get("username"),
                            "twitter": m.get("twitterUsername"),
                            "followers": m.get("followersCount", 0),
                        }
                        for m in node.get("makers", [])
                    ],
                    "thumbnail": node.get("thumbnail", {}).get("url") if node.get("thumbnail") else None,
                    "has_video": any(m.get("videoUrl") for m in node.get("media", [])),
                })

            page_info = posts_data.get("pageInfo", {})
            if not page_info.get("hasNextPage"):
                break
            cursor = page_info["endCursor"]
            time.sleep(random.uniform(1, 2))

    return sorted(all_launches, key=lambda x: x["votes"], reverse=True)

Scraping Comments and Discussions

Comment threads are where the real insights live — user feedback, feature requests, competitor comparisons:

def fetch_post_comments(post_id: str, token: str = None, proxy: str = None) -> list:
    """Fetch all comments for a Product Hunt post with pagination."""
    query = """
    query GetComments($postId: ID!, $first: Int!, $after: String) {
        post(id: $postId) {
            comments(first: $first, after: $after, order: VOTES) {
                edges {
                    node {
                        id
                        body
                        votesCount
                        createdAt
                        user {
                            name
                            username
                            headline
                        }
                        replies {
                            edges {
                                node {
                                    id
                                    body
                                    votesCount
                                    createdAt
                                    user { name username }
                                }
                            }
                        }
                    }
                }
                pageInfo { hasNextPage endCursor }
            }
        }
    }
    """

    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "User-Agent": get_ua(),
    }
    if token:
        headers["Authorization"] = f"Bearer {token}"

    client_kwargs = {"headers": headers, "timeout": 20}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    all_comments = []
    cursor = None

    with httpx.Client(**client_kwargs) as client:
        while True:
            payload = {
                "query": query,
                "variables": {"postId": post_id, "first": 20, "after": cursor},
            }
            resp = client.post(PH_API, json=payload)

            if resp.status_code != 200:
                break

            comments_data = resp.json().get("data", {}).get("post", {}).get("comments", {})
            edges = comments_data.get("edges", [])

            for edge in edges:
                node = edge["node"]
                comment = {
                    "id": node["id"],
                    "body": node["body"],
                    "votes": node["votesCount"],
                    "author": node["user"]["name"],
                    "username": node["user"]["username"],
                    "author_headline": node["user"].get("headline", ""),
                    "created_at": node["createdAt"],
                    "replies": [],
                }
                for reply_edge in node.get("replies", {}).get("edges", []):
                    rn = reply_edge["node"]
                    comment["replies"].append({
                        "id": rn["id"],
                        "body": rn["body"],
                        "author": rn["user"]["name"],
                        "username": rn["user"]["username"],
                        "votes": rn["votesCount"],
                        "created_at": rn["createdAt"],
                    })
                all_comments.append(comment)

            page_info = comments_data.get("pageInfo", {})
            if not page_info.get("hasNextPage"):
                break
            cursor = page_info["endCursor"]
            time.sleep(random.uniform(1.5, 3.0))

    return all_comments

Tracking the Leaderboard and Vote Velocity

For active launches (same-day tracking), capturing vote snapshots over time reveals momentum:

import sqlite3
from datetime import datetime

def init_ph_db(db_path: str = "producthunt.db") -> sqlite3.Connection:
    """Initialize the Product Hunt tracking database."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS launches (
            id TEXT PRIMARY KEY,
            name TEXT,
            tagline TEXT,
            description TEXT,
            votes INTEGER DEFAULT 0,
            comments INTEGER DEFAULT 0,
            reviews_rating REAL,
            reviews_count INTEGER DEFAULT 0,
            website TEXT,
            pricing_type TEXT,
            launched_at TEXT,
            featured_at TEXT,
            topics TEXT,
            makers TEXT,
            thumbnail TEXT,
            has_video BOOLEAN DEFAULT 0,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS vote_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            launch_id TEXT,
            votes INTEGER,
            comments INTEGER,
            snapshot_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (launch_id) REFERENCES launches(id)
        );

        CREATE TABLE IF NOT EXISTS comments_data (
            id TEXT PRIMARY KEY,
            launch_id TEXT,
            body TEXT,
            votes INTEGER DEFAULT 0,
            author TEXT,
            username TEXT,
            author_headline TEXT,
            created_at TEXT,
            FOREIGN KEY (launch_id) REFERENCES launches(id)
        );

        CREATE INDEX IF NOT EXISTS idx_launches_votes ON launches(votes DESC);
        CREATE INDEX IF NOT EXISTS idx_launches_date ON launches(launched_at);
        CREATE INDEX IF NOT EXISTS idx_snapshots_launch ON vote_snapshots(launch_id);
    """)
    conn.commit()
    return conn


def save_launch(conn: sqlite3.Connection, launch: dict):
    """Save or update a launch record and create a vote snapshot."""
    conn.execute(
        """INSERT OR REPLACE INTO launches
           (id, name, tagline, description, votes, comments, reviews_rating, reviews_count,
            website, pricing_type, launched_at, featured_at, topics, makers, thumbnail, has_video)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
        (
            launch["id"], launch["name"], launch["tagline"],
            launch.get("description"), launch["votes"], launch["comments"],
            launch.get("reviews_rating"), launch.get("reviews_count", 0),
            launch.get("website"), launch.get("pricing_type"),
            launch["launched_at"], launch.get("featured_at"),
            json.dumps(launch.get("topics", [])),
            json.dumps(launch.get("makers", [])),
            launch.get("thumbnail"), int(launch.get("has_video", False)),
        )
    )
    conn.execute(
        "INSERT INTO vote_snapshots (launch_id, votes, comments) VALUES (?, ?, ?)",
        (launch["id"], launch["votes"], launch["comments"])
    )
    conn.commit()


def compute_vote_velocity(conn: sqlite3.Connection, launch_id: str, hours: int = 6) -> dict:
    """Compute vote velocity over the past N hours."""
    from datetime import timedelta
    since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()

    cursor = conn.execute("""
        SELECT votes, snapshot_at FROM vote_snapshots
        WHERE launch_id = ? AND snapshot_at >= ?
        ORDER BY snapshot_at ASC
    """, (launch_id, since))
    snapshots = cursor.fetchall()

    if len(snapshots) < 2:
        return {"launch_id": launch_id, "velocity": None, "message": "Not enough data"}

    first_votes, first_time = snapshots[0]
    last_votes, last_time = snapshots[-1]
    votes_gained = last_votes - first_votes

    # Parse timestamps
    from datetime import datetime
    t1 = datetime.fromisoformat(first_time.replace("Z", "+00:00"))
    t2 = datetime.fromisoformat(last_time.replace("Z", "+00:00"))
    hours_elapsed = (t2 - t1).total_seconds() / 3600

    velocity = votes_gained / hours_elapsed if hours_elapsed > 0 else 0

    return {
        "launch_id": launch_id,
        "votes_gained": votes_gained,
        "hours_tracked": round(hours_elapsed, 2),
        "votes_per_hour": round(velocity, 1),
        "current_votes": last_votes,
    }

Historical Collection: Week in Review

Collect launches for the past week and build a trend dataset:

def collect_weekly_launches(
    days: int = 7,
    token: str = None,
    proxy: str = None,
    db_path: str = "producthunt.db",
):
    """Collect and store launches from the past N days."""
    conn = init_ph_db(db_path)
    today = datetime.now()

    total_launches = 0
    for i in range(days):
        date = (today - timedelta(days=i)).strftime("%Y-%m-%d")
        print(f"Collecting {date}...")

        if token:
            launches = fetch_launches_authenticated(date, token=token, proxy=proxy)
        else:
            launches = fetch_daily_launches(date, proxy=proxy)

        for launch in launches:
            save_launch(conn, launch)

        total_launches += len(launches)
        print(f"  {len(launches)} launches (#{launches[0]['name']} topped with {launches[0]['votes']} votes)" if launches else "  No data")

        if i < days - 1:
            time.sleep(random.uniform(5, 10))

    conn.close()
    print(f"\nCollected {total_launches} total launches over {days} days")


def analyze_topic_trends(db_path: str = "producthunt.db") -> list:
    """Rank topics by average votes to identify what the community is excited about."""
    conn = sqlite3.connect(db_path)

    cursor = conn.execute("""
        SELECT topics, votes FROM launches
        WHERE topics != '[]' AND votes > 10
    """)
    rows = cursor.fetchall()
    conn.close()

    from collections import defaultdict
    topic_stats = defaultdict(lambda: {"count": 0, "total_votes": 0})

    for row in rows:
        try:
            topics = json.loads(row[0])
            votes = row[1]
            for topic in topics:
                topic_stats[topic]["count"] += 1
                topic_stats[topic]["total_votes"] += votes
        except (json.JSONDecodeError, TypeError):
            continue

    results = [
        {
            "topic": topic,
            "launch_count": stats["count"],
            "avg_votes": round(stats["total_votes"] / stats["count"], 1),
            "total_votes": stats["total_votes"],
        }
        for topic, stats in topic_stats.items()
        if stats["count"] >= 5
    ]

    return sorted(results, key=lambda x: x["avg_votes"], reverse=True)

Playwright Fallback for Website Scraping

When the API limits are exhausted and you need to scrape the website directly:

import asyncio
from playwright.async_api import async_playwright

async def scrape_daily_page(
    date: str = None,
    proxy: dict = None,
) -> list:
    """
    Scrape the Product Hunt daily page via Playwright.
    proxy: dict with 'server', 'username', 'password'
    """
    async with async_playwright() as p:
        launch_kwargs = {
            "headless": True,
            "args": [
                "--disable-blink-features=AutomationControlled",
                "--no-sandbox",
            ]
        }
        if proxy:
            launch_kwargs["proxy"] = proxy

        browser = await p.chromium.launch(**launch_kwargs)
        context = await browser.new_context(
            viewport={"width": 1440, "height": 900},
            user_agent=get_ua(),
            locale="en-US",
            timezone_id="America/New_York",
        )
        await context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
        """)

        page = await context.new_page()
        url = f"https://www.producthunt.com?day={date}" if date else "https://www.producthunt.com"
        await page.goto(url, wait_until="networkidle", timeout=30000)
        await page.wait_for_timeout(3000)

        products = []
        for _ in range(3):
            await page.keyboard.press("End")
            await page.wait_for_timeout(1500)

        cards = await page.query_selector_all("[data-test='post-item'], [class*='post-item']")

        for card in cards:
            try:
                name_el = await card.query_selector("h3")
                tagline_el = await card.query_selector("[data-test='post-tagline'], [class*='tagline']")
                vote_el = await card.query_selector("[data-test='vote-button'], [aria-label*='vote']")
                link_el = await card.query_selector("a[href*='/posts/']")

                name = await name_el.inner_text() if name_el else ""
                tagline = await tagline_el.inner_text() if tagline_el else ""
                vote_text = await vote_el.inner_text() if vote_el else "0"
                href = await link_el.get_attribute("href") if link_el else ""

                # Parse vote count (could be "1.2K")
                def parse_votes(raw: str) -> int:
                    raw = raw.strip()
                    if raw.endswith("K"):
                        return int(float(raw[:-1]) * 1000)
                    try:
                        return int(raw)
                    except ValueError:
                        return 0

                votes = parse_votes(vote_text)
                products.append({
                    "name": name.strip(),
                    "tagline": tagline.strip(),
                    "votes": votes,
                    "url": f"https://www.producthunt.com{href}" if href.startswith("/") else href,
                })
            except Exception:
                continue

        await browser.close()
        return sorted(products, key=lambda x: x["votes"], reverse=True)

Proxy Considerations

The Product Hunt GraphQL API is more lenient than most targets — it accepts requests from clean datacenter IPs if you stay within rate limits. But if you need sustained scraping beyond the free tier limits, residential proxies help distribute your requests.

ThorData's proxy network lets you rotate IPs between requests, staying under the per-IP rate limits while maintaining throughput. For Product Hunt specifically, you don't always need residential — datacenter proxies work for the API, and you only need residential for scraping the website directly.

PROXY = "http://YOUR_USER:[email protected]:9000"

# Playwright proxy config
playwright_proxy = {
    "server": "http://proxy.thordata.com:9000",
    "username": "YOUR_USER",
    "password": "YOUR_PASS",
}

# Full weekly run
if __name__ == "__main__":
    TOKEN = "your_ph_developer_token"
    collect_weekly_launches(days=7, token=TOKEN, proxy=PROXY)

    # Analyze trends
    topics = analyze_topic_trends()
    print("\nTop performing topics:")
    for t in topics[:10]:
        print(f"  {t['topic']}: avg {t['avg_votes']:.0f} votes across {t['launch_count']} launches")

Comment Analysis: Market Research Intelligence

The real value in Product Hunt comments is qualitative market intelligence:

import re
from collections import Counter

def analyze_comments_for_insights(comments: list) -> dict:
    """
    Extract market research signals from Product Hunt comment threads.
    Looks for feature requests, competitor mentions, use case validation.
    """
    competitor_mentions = Counter()
    feature_requests = []
    use_cases = []
    pain_points = []

    FEATURE_PATTERNS = [
        r"(?:would (?:love|like|appreciate)|wish (?:you|it) (?:had|could|would)|please add|need a|missing a?|looking for)\s+([^.!?]{10,60})",
        r"(?:feature request|suggestion):\s*([^.!?]{10,80})",
    ]

    PAIN_PATTERNS = [
        r"(?:problem with|issue with|frustrated with|annoying that|hate (?:that|when)|doesn't work)\s+([^.!?]{10,60})",
    ]

    all_text = "\n".join(c.get("body", "") for c in comments).lower()

    # Feature requests
    for pattern in FEATURE_PATTERNS:
        for match in re.finditer(pattern, all_text, re.IGNORECASE):
            feature_requests.append(match.group(1).strip())

    # Pain points
    for pattern in PAIN_PATTERNS:
        for match in re.finditer(pattern, all_text, re.IGNORECASE):
            pain_points.append(match.group(1).strip())

    # Top commented issues (by vote count)
    top_comments = sorted(comments, key=lambda c: c.get("votes", 0), reverse=True)[:10]

    return {
        "total_comments": len(comments),
        "feature_requests": feature_requests[:20],
        "pain_points": pain_points[:15],
        "top_voted_comments": [
            {"body": c["body"][:200], "votes": c["votes"], "author": c["author"]}
            for c in top_comments
        ],
    }

Product Hunt's API Terms of Service allow data access for personal and non-commercial use. Don't use scraped data to build a competing product directory. Respect rate limits — Product Hunt's community team actively monitors API abuse and will revoke tokens. If you need bulk historical data, reach out to their partnerships team directly.

Key Takeaways