How to Scrape Product Hunt Launches in 2026: Upvotes, Comments & Maker Data
How to Scrape Product Hunt Launches in 2026: Upvotes, Comments & Maker Data
Product Hunt is where startups launch and the tech community votes. Daily launches, upvote counts, comment threads, and maker profiles — it's a goldmine for competitive intelligence, trend analysis, and understanding what resonates with early adopters.
Product Hunt provides a public GraphQL API that's surprisingly generous. With the right queries, you can pull launch data, maker profiles, and full comment threads without authentication for basic access. For heavier use, a developer token (free) unlocks higher rate limits.
What Data Product Hunt Exposes
Each launch and profile contains:
- Product launches — name, tagline, description, thumbnail, launch date
- Upvote counts — total votes, real-time during launch day
- Comments — full threads with author info, timestamps, vote counts
- Maker profiles — name, headline, Twitter handle, products made/hunted
- Topics — category tags (SaaS, AI, Developer Tools, etc.)
- Rankings — daily, weekly, monthly leaderboard positions
- Media — screenshots, videos, gallery images
- External links — product website, direct links
- Review data — star ratings from users who've used the product
- Launch streak — maker's consecutive days of launches
Product Hunt's API and Protections
Product Hunt is more open than most platforms, but still has guardrails:
- GraphQL API — The official API at
api.producthunt.com/v2/api/graphqlis the primary data source. It works without auth for basic queries but rate limits kick in quickly. - Rate limiting — Unauthenticated: ~30 requests per 15 minutes. With developer token: ~450 requests per 15 minutes. Exceeding limits returns 429 responses.
- Query complexity limits — The GraphQL API rejects queries that request too many nested fields. You need to keep queries focused.
- Cloudflare on the website — The web frontend uses Cloudflare, but the API endpoint has lighter protection.
- Token requirements for some fields — Detailed maker data and historical launches require a bearer token.
Dependencies and Setup
pip install httpx requests fake-useragent playwright
playwright install chromium
Method 1: The GraphQL API (No Auth)
Basic launch data is available without any authentication:
import httpx
import json
import time
import random
from datetime import datetime, timedelta
try:
from fake_useragent import UserAgent
ua = UserAgent()
def get_ua():
return ua.random
except ImportError:
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/126.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/126.0.0.0 Safari/537.36",
]
def get_ua():
return random.choice(USER_AGENTS)
PH_API = "https://api.producthunt.com/v2/api/graphql"
def fetch_daily_launches(date: str = None, proxy: str = None) -> list:
"""
Fetch Product Hunt launches for a specific date.
date format: YYYY-MM-DD. Defaults to today.
"""
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
query = """
query GetDailyPosts($postedAfter: DateTime!, $postedBefore: DateTime!) {
posts(postedAfter: $postedAfter, postedBefore: $postedBefore, first: 50, order: VOTES) {
edges {
node {
id
name
tagline
description
votesCount
commentsCount
createdAt
url
website
reviewsRating
reviewsCount
topics { edges { node { name slug } } }
thumbnail { url }
makers {
id
name
username
headline
twitterUsername
}
media { type url videoUrl }
}
}
}
}
"""
headers = {
"User-Agent": get_ua(),
"Accept": "application/json",
"Content-Type": "application/json",
"Origin": "https://www.producthunt.com",
"Referer": "https://www.producthunt.com/",
}
variables = {
"postedAfter": f"{date}T00:00:00Z",
"postedBefore": f"{date}T23:59:59Z",
}
client_kwargs = {"headers": headers, "follow_redirects": True, "timeout": 20}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
with httpx.Client(**client_kwargs) as client:
resp = client.post(PH_API, json={"query": query, "variables": variables})
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
return fetch_daily_launches(date, proxy=proxy)
if resp.status_code != 200:
print(f"API error: HTTP {resp.status_code}")
return []
data = resp.json()
if "errors" in data:
print(f"GraphQL errors: {data['errors']}")
return []
launches = []
edges = data.get("data", {}).get("posts", {}).get("edges", [])
for edge in edges:
node = edge["node"]
launches.append({
"id": node["id"],
"name": node["name"],
"tagline": node["tagline"],
"description": (node.get("description") or "")[:400],
"votes": node["votesCount"],
"comments": node["commentsCount"],
"launched_at": node["createdAt"],
"url": node["url"],
"website": node.get("website"),
"reviews_rating": node.get("reviewsRating"),
"reviews_count": node.get("reviewsCount", 0),
"topics": [t["node"]["name"] for t in node.get("topics", {}).get("edges", [])],
"makers": [
{
"id": m["id"],
"name": m["name"],
"username": m.get("username"),
"headline": m.get("headline"),
"twitter": m.get("twitterUsername"),
}
for m in node.get("makers", [])
],
"thumbnail": node.get("thumbnail", {}).get("url") if node.get("thumbnail") else None,
"media_count": len(node.get("media", [])),
"has_video": any(m.get("videoUrl") for m in node.get("media", [])),
})
return sorted(launches, key=lambda x: x["votes"], reverse=True)
# Example: get today's launches
today = datetime.now().strftime("%Y-%m-%d")
launches = fetch_daily_launches(today)
print(f"Today's top launches:")
for i, l in enumerate(launches[:5], 1):
print(f" #{i} {l['name']} — {l['votes']} upvotes — {', '.join(l['topics'][:3])}")
Method 2: Authenticated Access for Full Data
A free developer token unlocks higher limits and more fields. Register at producthunt.com/v2/oauth/applications:
def fetch_launches_authenticated(
date: str,
token: str,
proxy: str = None,
first: int = 50,
) -> list:
"""Fetch launches with a developer token for higher rate limits and more data."""
query = """
query GetPosts($postedAfter: DateTime!, $first: Int!, $after: String) {
posts(postedAfter: $postedAfter, first: $first, after: $after, order: VOTES) {
edges {
node {
id name tagline description
votesCount commentsCount
website createdAt featuredAt
reviewsRating reviewsCount
pricingType
makers {
id name username headline
twitterUsername followersCount
}
topics { edges { node { name slug } } }
media { type url videoUrl }
thumbnail { url }
}
}
pageInfo { hasNextPage endCursor }
}
}
"""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": get_ua(),
}
client_kwargs = {"headers": headers, "timeout": 20}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
all_launches = []
cursor = None
with httpx.Client(**client_kwargs) as client:
while True:
variables = {
"postedAfter": f"{date}T00:00:00Z",
"first": first,
}
if cursor:
variables["after"] = cursor
resp = client.post(PH_API, json={"query": query, "variables": variables})
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
continue
if resp.status_code != 200:
break
data = resp.json()
posts_data = data.get("data", {}).get("posts", {})
edges = posts_data.get("edges", [])
for edge in edges:
node = edge["node"]
all_launches.append({
"id": node["id"],
"name": node["name"],
"tagline": node["tagline"],
"description": (node.get("description") or "")[:600],
"votes": node["votesCount"],
"comments": node["commentsCount"],
"website": node.get("website"),
"launched_at": node["createdAt"],
"featured_at": node.get("featuredAt"),
"reviews_rating": node.get("reviewsRating"),
"reviews_count": node.get("reviewsCount", 0),
"pricing_type": node.get("pricingType"),
"topics": [t["node"]["name"] for t in node.get("topics", {}).get("edges", [])],
"makers": [
{
"id": m["id"],
"name": m["name"],
"username": m.get("username"),
"twitter": m.get("twitterUsername"),
"followers": m.get("followersCount", 0),
}
for m in node.get("makers", [])
],
"thumbnail": node.get("thumbnail", {}).get("url") if node.get("thumbnail") else None,
"has_video": any(m.get("videoUrl") for m in node.get("media", [])),
})
page_info = posts_data.get("pageInfo", {})
if not page_info.get("hasNextPage"):
break
cursor = page_info["endCursor"]
time.sleep(random.uniform(1, 2))
return sorted(all_launches, key=lambda x: x["votes"], reverse=True)
Scraping Comments and Discussions
Comment threads are where the real insights live — user feedback, feature requests, competitor comparisons:
def fetch_post_comments(post_id: str, token: str = None, proxy: str = None) -> list:
"""Fetch all comments for a Product Hunt post with pagination."""
query = """
query GetComments($postId: ID!, $first: Int!, $after: String) {
post(id: $postId) {
comments(first: $first, after: $after, order: VOTES) {
edges {
node {
id
body
votesCount
createdAt
user {
name
username
headline
}
replies {
edges {
node {
id
body
votesCount
createdAt
user { name username }
}
}
}
}
}
pageInfo { hasNextPage endCursor }
}
}
}
"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": get_ua(),
}
if token:
headers["Authorization"] = f"Bearer {token}"
client_kwargs = {"headers": headers, "timeout": 20}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
all_comments = []
cursor = None
with httpx.Client(**client_kwargs) as client:
while True:
payload = {
"query": query,
"variables": {"postId": post_id, "first": 20, "after": cursor},
}
resp = client.post(PH_API, json=payload)
if resp.status_code != 200:
break
comments_data = resp.json().get("data", {}).get("post", {}).get("comments", {})
edges = comments_data.get("edges", [])
for edge in edges:
node = edge["node"]
comment = {
"id": node["id"],
"body": node["body"],
"votes": node["votesCount"],
"author": node["user"]["name"],
"username": node["user"]["username"],
"author_headline": node["user"].get("headline", ""),
"created_at": node["createdAt"],
"replies": [],
}
for reply_edge in node.get("replies", {}).get("edges", []):
rn = reply_edge["node"]
comment["replies"].append({
"id": rn["id"],
"body": rn["body"],
"author": rn["user"]["name"],
"username": rn["user"]["username"],
"votes": rn["votesCount"],
"created_at": rn["createdAt"],
})
all_comments.append(comment)
page_info = comments_data.get("pageInfo", {})
if not page_info.get("hasNextPage"):
break
cursor = page_info["endCursor"]
time.sleep(random.uniform(1.5, 3.0))
return all_comments
Tracking the Leaderboard and Vote Velocity
For active launches (same-day tracking), capturing vote snapshots over time reveals momentum:
import sqlite3
from datetime import datetime
def init_ph_db(db_path: str = "producthunt.db") -> sqlite3.Connection:
"""Initialize the Product Hunt tracking database."""
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS launches (
id TEXT PRIMARY KEY,
name TEXT,
tagline TEXT,
description TEXT,
votes INTEGER DEFAULT 0,
comments INTEGER DEFAULT 0,
reviews_rating REAL,
reviews_count INTEGER DEFAULT 0,
website TEXT,
pricing_type TEXT,
launched_at TEXT,
featured_at TEXT,
topics TEXT,
makers TEXT,
thumbnail TEXT,
has_video BOOLEAN DEFAULT 0,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS vote_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
launch_id TEXT,
votes INTEGER,
comments INTEGER,
snapshot_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (launch_id) REFERENCES launches(id)
);
CREATE TABLE IF NOT EXISTS comments_data (
id TEXT PRIMARY KEY,
launch_id TEXT,
body TEXT,
votes INTEGER DEFAULT 0,
author TEXT,
username TEXT,
author_headline TEXT,
created_at TEXT,
FOREIGN KEY (launch_id) REFERENCES launches(id)
);
CREATE INDEX IF NOT EXISTS idx_launches_votes ON launches(votes DESC);
CREATE INDEX IF NOT EXISTS idx_launches_date ON launches(launched_at);
CREATE INDEX IF NOT EXISTS idx_snapshots_launch ON vote_snapshots(launch_id);
""")
conn.commit()
return conn
def save_launch(conn: sqlite3.Connection, launch: dict):
"""Save or update a launch record and create a vote snapshot."""
conn.execute(
"""INSERT OR REPLACE INTO launches
(id, name, tagline, description, votes, comments, reviews_rating, reviews_count,
website, pricing_type, launched_at, featured_at, topics, makers, thumbnail, has_video)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
launch["id"], launch["name"], launch["tagline"],
launch.get("description"), launch["votes"], launch["comments"],
launch.get("reviews_rating"), launch.get("reviews_count", 0),
launch.get("website"), launch.get("pricing_type"),
launch["launched_at"], launch.get("featured_at"),
json.dumps(launch.get("topics", [])),
json.dumps(launch.get("makers", [])),
launch.get("thumbnail"), int(launch.get("has_video", False)),
)
)
conn.execute(
"INSERT INTO vote_snapshots (launch_id, votes, comments) VALUES (?, ?, ?)",
(launch["id"], launch["votes"], launch["comments"])
)
conn.commit()
def compute_vote_velocity(conn: sqlite3.Connection, launch_id: str, hours: int = 6) -> dict:
"""Compute vote velocity over the past N hours."""
from datetime import timedelta
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
cursor = conn.execute("""
SELECT votes, snapshot_at FROM vote_snapshots
WHERE launch_id = ? AND snapshot_at >= ?
ORDER BY snapshot_at ASC
""", (launch_id, since))
snapshots = cursor.fetchall()
if len(snapshots) < 2:
return {"launch_id": launch_id, "velocity": None, "message": "Not enough data"}
first_votes, first_time = snapshots[0]
last_votes, last_time = snapshots[-1]
votes_gained = last_votes - first_votes
# Parse timestamps
from datetime import datetime
t1 = datetime.fromisoformat(first_time.replace("Z", "+00:00"))
t2 = datetime.fromisoformat(last_time.replace("Z", "+00:00"))
hours_elapsed = (t2 - t1).total_seconds() / 3600
velocity = votes_gained / hours_elapsed if hours_elapsed > 0 else 0
return {
"launch_id": launch_id,
"votes_gained": votes_gained,
"hours_tracked": round(hours_elapsed, 2),
"votes_per_hour": round(velocity, 1),
"current_votes": last_votes,
}
Historical Collection: Week in Review
Collect launches for the past week and build a trend dataset:
def collect_weekly_launches(
days: int = 7,
token: str = None,
proxy: str = None,
db_path: str = "producthunt.db",
):
"""Collect and store launches from the past N days."""
conn = init_ph_db(db_path)
today = datetime.now()
total_launches = 0
for i in range(days):
date = (today - timedelta(days=i)).strftime("%Y-%m-%d")
print(f"Collecting {date}...")
if token:
launches = fetch_launches_authenticated(date, token=token, proxy=proxy)
else:
launches = fetch_daily_launches(date, proxy=proxy)
for launch in launches:
save_launch(conn, launch)
total_launches += len(launches)
print(f" {len(launches)} launches (#{launches[0]['name']} topped with {launches[0]['votes']} votes)" if launches else " No data")
if i < days - 1:
time.sleep(random.uniform(5, 10))
conn.close()
print(f"\nCollected {total_launches} total launches over {days} days")
def analyze_topic_trends(db_path: str = "producthunt.db") -> list:
"""Rank topics by average votes to identify what the community is excited about."""
conn = sqlite3.connect(db_path)
cursor = conn.execute("""
SELECT topics, votes FROM launches
WHERE topics != '[]' AND votes > 10
""")
rows = cursor.fetchall()
conn.close()
from collections import defaultdict
topic_stats = defaultdict(lambda: {"count": 0, "total_votes": 0})
for row in rows:
try:
topics = json.loads(row[0])
votes = row[1]
for topic in topics:
topic_stats[topic]["count"] += 1
topic_stats[topic]["total_votes"] += votes
except (json.JSONDecodeError, TypeError):
continue
results = [
{
"topic": topic,
"launch_count": stats["count"],
"avg_votes": round(stats["total_votes"] / stats["count"], 1),
"total_votes": stats["total_votes"],
}
for topic, stats in topic_stats.items()
if stats["count"] >= 5
]
return sorted(results, key=lambda x: x["avg_votes"], reverse=True)
Playwright Fallback for Website Scraping
When the API limits are exhausted and you need to scrape the website directly:
import asyncio
from playwright.async_api import async_playwright
async def scrape_daily_page(
date: str = None,
proxy: dict = None,
) -> list:
"""
Scrape the Product Hunt daily page via Playwright.
proxy: dict with 'server', 'username', 'password'
"""
async with async_playwright() as p:
launch_kwargs = {
"headless": True,
"args": [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
]
}
if proxy:
launch_kwargs["proxy"] = proxy
browser = await p.chromium.launch(**launch_kwargs)
context = await browser.new_context(
viewport={"width": 1440, "height": 900},
user_agent=get_ua(),
locale="en-US",
timezone_id="America/New_York",
)
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
""")
page = await context.new_page()
url = f"https://www.producthunt.com?day={date}" if date else "https://www.producthunt.com"
await page.goto(url, wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(3000)
products = []
for _ in range(3):
await page.keyboard.press("End")
await page.wait_for_timeout(1500)
cards = await page.query_selector_all("[data-test='post-item'], [class*='post-item']")
for card in cards:
try:
name_el = await card.query_selector("h3")
tagline_el = await card.query_selector("[data-test='post-tagline'], [class*='tagline']")
vote_el = await card.query_selector("[data-test='vote-button'], [aria-label*='vote']")
link_el = await card.query_selector("a[href*='/posts/']")
name = await name_el.inner_text() if name_el else ""
tagline = await tagline_el.inner_text() if tagline_el else ""
vote_text = await vote_el.inner_text() if vote_el else "0"
href = await link_el.get_attribute("href") if link_el else ""
# Parse vote count (could be "1.2K")
def parse_votes(raw: str) -> int:
raw = raw.strip()
if raw.endswith("K"):
return int(float(raw[:-1]) * 1000)
try:
return int(raw)
except ValueError:
return 0
votes = parse_votes(vote_text)
products.append({
"name": name.strip(),
"tagline": tagline.strip(),
"votes": votes,
"url": f"https://www.producthunt.com{href}" if href.startswith("/") else href,
})
except Exception:
continue
await browser.close()
return sorted(products, key=lambda x: x["votes"], reverse=True)
Proxy Considerations
The Product Hunt GraphQL API is more lenient than most targets — it accepts requests from clean datacenter IPs if you stay within rate limits. But if you need sustained scraping beyond the free tier limits, residential proxies help distribute your requests.
ThorData's proxy network lets you rotate IPs between requests, staying under the per-IP rate limits while maintaining throughput. For Product Hunt specifically, you don't always need residential — datacenter proxies work for the API, and you only need residential for scraping the website directly.
PROXY = "http://YOUR_USER:[email protected]:9000"
# Playwright proxy config
playwright_proxy = {
"server": "http://proxy.thordata.com:9000",
"username": "YOUR_USER",
"password": "YOUR_PASS",
}
# Full weekly run
if __name__ == "__main__":
TOKEN = "your_ph_developer_token"
collect_weekly_launches(days=7, token=TOKEN, proxy=PROXY)
# Analyze trends
topics = analyze_topic_trends()
print("\nTop performing topics:")
for t in topics[:10]:
print(f" {t['topic']}: avg {t['avg_votes']:.0f} votes across {t['launch_count']} launches")
Comment Analysis: Market Research Intelligence
The real value in Product Hunt comments is qualitative market intelligence:
import re
from collections import Counter
def analyze_comments_for_insights(comments: list) -> dict:
"""
Extract market research signals from Product Hunt comment threads.
Looks for feature requests, competitor mentions, use case validation.
"""
competitor_mentions = Counter()
feature_requests = []
use_cases = []
pain_points = []
FEATURE_PATTERNS = [
r"(?:would (?:love|like|appreciate)|wish (?:you|it) (?:had|could|would)|please add|need a|missing a?|looking for)\s+([^.!?]{10,60})",
r"(?:feature request|suggestion):\s*([^.!?]{10,80})",
]
PAIN_PATTERNS = [
r"(?:problem with|issue with|frustrated with|annoying that|hate (?:that|when)|doesn't work)\s+([^.!?]{10,60})",
]
all_text = "\n".join(c.get("body", "") for c in comments).lower()
# Feature requests
for pattern in FEATURE_PATTERNS:
for match in re.finditer(pattern, all_text, re.IGNORECASE):
feature_requests.append(match.group(1).strip())
# Pain points
for pattern in PAIN_PATTERNS:
for match in re.finditer(pattern, all_text, re.IGNORECASE):
pain_points.append(match.group(1).strip())
# Top commented issues (by vote count)
top_comments = sorted(comments, key=lambda c: c.get("votes", 0), reverse=True)[:10]
return {
"total_comments": len(comments),
"feature_requests": feature_requests[:20],
"pain_points": pain_points[:15],
"top_voted_comments": [
{"body": c["body"][:200], "votes": c["votes"], "author": c["author"]}
for c in top_comments
],
}
Legal Notes
Product Hunt's API Terms of Service allow data access for personal and non-commercial use. Don't use scraped data to build a competing product directory. Respect rate limits — Product Hunt's community team actively monitors API abuse and will revoke tokens. If you need bulk historical data, reach out to their partnerships team directly.
Key Takeaways
- Product Hunt's GraphQL API is the best entry point — structured, well-documented, and works without auth for basic queries.
- Free developer tokens unlock 450 requests per 15 minutes — enough for daily monitoring.
- Comments contain the richest qualitative data for market research. Paginate through them and run pattern extraction.
- Vote snapshots over time reveal launch momentum — track hourly during launch day for the full picture.
- For heavy API usage, ThorData proxies help distribute rate limits across IPs.
- GraphQL query complexity matters — keep queries focused to avoid server-side rejections.
- The data is most valuable in aggregate: weekly topic trends, maker velocity, and vote distribution are more actionable than any single launch's data.