How to Scrape Hacker News: Firebase API, Algolia Search & Python (2026)
How to Scrape Hacker News: Firebase API, Algolia Search & Python (2026)
Hacker News is one of the cleanest sites to scrape because it actually wants you to access its data programmatically. There are two official APIs — the Firebase real-time API and Algolia's search API — and neither requires authentication.
No API key. No rate limit headers. No OAuth dance. You just hit the endpoints and get JSON back.
That said, there are still things you can get wrong. Fetching 500 stories one at a time will be painfully slow without concurrency. Algolia has undocumented rate limits. And if you want comment trees, you need to understand HN's data model.
Let's walk through both APIs and build something useful.
The Firebase API: Real-Time Data
HN's official API lives at hacker-news.firebaseio.com/v0/. It's a REST API that mirrors the Firebase real-time database.
Key endpoints:
/v0/topstories.json— IDs of the top 500 stories/v0/newstories.json— IDs of the newest 500 stories/v0/beststories.json— IDs of the best 500 stories/v0/askstories.json— IDs of the latest Ask HN stories/v0/showstories.json— IDs of the latest Show HN stories/v0/jobstories.json— IDs of the latest job postings/v0/item/{id}.json— Any item (story, comment, poll, job)/v0/user/{username}.json— User profile/v0/maxitem.json— Current max item ID (useful for scanning all historical items)/v0/updates.json— Most recently changed items and profiles
Every story, comment, and poll on HN is an 'item' with a numeric ID. Stories have kids (comment IDs), comments have kids (reply IDs), and you walk the tree recursively.
Item Schema
A typical story item looks like:
{
"by": "dhouston",
"descendants": 71,
"id": 8863,
"kids": [8952, 9224, 8917],
"score": 111,
"time": 1175714200,
"title": "My YC app: Dropbox - Throw away your USB drive",
"type": "story",
"url": "http://www.getdropbox.com/u/2/screencast.html"
}
Comments include the parent ID and text as HTML:
{
"by": "norvig",
"id": 2921983,
"kids": [2922097, 2922429],
"parent": 2921506,
"text": "Agreed, but...",
"time": 1314211127,
"type": "comment"
}
Basic Fetcher: Async with Concurrency Control
Here's a solid foundation for all HN fetching. The semaphore prevents overwhelming the API:
import httpx
import asyncio
import json
from pathlib import Path
HN_API = "https://hacker-news.firebaseio.com/v0"
SEM = asyncio.Semaphore(20) # max 20 concurrent requests
async def fetch_item(client: httpx.AsyncClient, item_id: int) -> dict | None:
async with SEM:
try:
resp = await client.get(
f"{HN_API}/item/{item_id}.json",
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
except (httpx.HTTPError, httpx.TimeoutException) as e:
print(f" Warning: failed to fetch item {item_id}: {e}")
return None
async def fetch_top_stories(limit: int = 30) -> list[dict]:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{HN_API}/topstories.json")
story_ids = resp.json()[:limit]
print(f"Fetching {len(story_ids)} stories...")
tasks = [fetch_item(client, sid) for sid in story_ids]
stories = await asyncio.gather(*tasks)
return [s for s in stories if s and s.get("type") == "story"]
if __name__ == "__main__":
stories = asyncio.run(fetch_top_stories(30))
for s in stories[:10]:
print(f" {s.get('score', 0):>5} pts | {s.get('title', 'N/A')[:70]}")
print(f" by {s.get('by')} | {s.get('descendants', 0)} comments")
The async approach matters enormously here. Fetching 30 stories sequentially takes 5-10 seconds. With asyncio.gather, it completes in under a second.
Fetching Comment Trees
Comments are where HN's real value lives. Each story has a kids field — an array of top-level comment IDs. Each comment can also have kids. You walk the tree recursively:
async def fetch_comment_tree(
client: httpx.AsyncClient,
item_id: int,
depth: int = 0,
max_depth: int = 10,
) -> dict | None:
if depth > max_depth:
return None
item = await fetch_item(client, item_id)
if not item:
return None
# Skip deleted and dead comments
if item.get("deleted") or item.get("dead"):
return None
item["depth"] = depth
# Recursively fetch children
kid_tasks = [
fetch_comment_tree(client, kid_id, depth + 1, max_depth)
for kid_id in item.get("kids", [])
]
results = await asyncio.gather(*kid_tasks)
item["children"] = [r for r in results if r is not None]
return item
async def get_story_with_comments(story_id: int) -> dict:
async with httpx.AsyncClient() as client:
story = await fetch_item(client, story_id)
if not story:
return {}
print(f"Fetching comments for: {story.get('title', story_id)[:60]}")
top_level_tasks = [
fetch_comment_tree(client, kid_id)
for kid_id in story.get("kids", [])
]
top_comments = await asyncio.gather(*top_level_tasks)
story["comment_tree"] = [c for c in top_comments if c]
return story
A popular story can have 500+ comments. That's 500+ API calls. For bulk jobs — say, fetching all comments from the top 100 stories — you're looking at tens of thousands of requests.
This is where you need to be careful. The Firebase API doesn't publish rate limits, but hammer it too hard and you'll get 429s or temporary blocks. Keep the semaphore at 20 concurrent requests max, and consider routing through a proxy pool for large-scale work. ThorData's residential proxies work well for this since they rotate IPs automatically and handle connection pooling for you.
Algolia Search API: The Power Tool
The Algolia API at hn.algolia.com/api/v1/ powers HN's built-in search. It's faster for filtered queries and returns richer data than Firebase for many use cases.
Key endpoints:
/search?query=python&tags=story— full-text search stories/search_by_date?tags=comment&numericFilters=created_at_i>1700000000— filtered by date/items/{id}— item with full comment tree in a single call/users/{username}— user profile with statistics
Tags you can filter on: story, comment, poll, job, ask_hn, show_hn, front_page, author_USERNAME, story_STORY_ID
import httpx
import time
ALGOLIA = "https://hn.algolia.com/api/v1"
def search_stories(
query: str,
page: int = 0,
hits_per_page: int = 50,
min_points: int = 0,
tags: str = "story",
) -> tuple[list, int]:
params = {
"query": query,
"tags": tags,
"page": page,
"hitsPerPage": hits_per_page,
}
if min_points > 0:
params["numericFilters"] = f"points>{min_points}"
resp = httpx.get(f"{ALGOLIA}/search", params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
return data["hits"], data["nbPages"]
def search_all_pages(
query: str,
min_points: int = 100,
max_pages: int = 10,
) -> list[dict]:
all_hits = []
for page in range(max_pages):
hits, total_pages = search_stories(query, page=page, min_points=min_points)
if not hits or page >= total_pages:
break
all_hits.extend(hits)
print(f" Page {page+1}/{min(max_pages, total_pages)}: {len(hits)} hits")
time.sleep(0.5)
return all_hits
Date Range Queries
Algolia supports created_at_i (Unix timestamp) for time filtering:
import time
def get_stories_in_range(
start_ts: int,
end_ts: int,
min_points: int = 50,
) -> list[dict]:
all_hits = []
page = 0
while True:
params = {
"tags": "story",
"numericFilters": (
f"created_at_i>{start_ts},"
f"created_at_i<{end_ts},"
f"points>{min_points}"
),
"hitsPerPage": 50,
"page": page,
}
resp = httpx.get(f"{ALGOLIA}/search_by_date", params=params)
resp.raise_for_status()
data = resp.json()
if not data["hits"]:
break
all_hits.extend(data["hits"])
page += 1
time.sleep(0.3)
return all_hits
# Get stories from last 7 days with 100+ points
one_week_ago = int(time.time()) - 7 * 86400
recent = get_stories_in_range(one_week_ago, int(time.time()), min_points=100)
print(f"Found {len(recent)} quality stories from the past week")
Full Comment Tree in One Call
Algolia's /items/{id} endpoint returns a story with its entire nested comment tree — no recursive Firebase calls needed:
def get_story_full_algolia(story_id: int) -> dict:
resp = httpx.get(f"{ALGOLIA}/items/{story_id}", timeout=30)
resp.raise_for_status()
data = resp.json()
def count_comments(node: dict) -> int:
count = 0
for child in node.get("children", []):
count += 1 + count_comments(child)
return count
total = count_comments(data)
title = data.get("title", "")[:60]
print(f" Loaded '{title}' with {total} comments")
return data
Bulk Dataset Building
If you want to build a dataset — every story above 100 points from the last year — here's a production-grade approach:
import json
import time
import httpx
from pathlib import Path
from datetime import datetime
ALGOLIA = "https://hn.algolia.com/api/v1"
OUTPUT = Path("hn_dataset")
OUTPUT.mkdir(exist_ok=True)
def scrape_top_stories_dataset(
min_points: int = 100,
days_back: int = 365,
max_pages: int = 200,
) -> list[dict]:
cutoff = int(time.time()) - days_back * 86400
dataset = []
seen_ids = set()
page = 0
while page < max_pages:
try:
resp = httpx.get(
f"{ALGOLIA}/search_by_date",
params={
"tags": "story",
"numericFilters": f"points>{min_points},created_at_i>{cutoff}",
"hitsPerPage": 50,
"page": page,
},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
print(f" HTTP error on page {page}: {e}")
time.sleep(10)
continue
hits = data.get("hits", [])
if not hits:
break
for hit in hits:
hit_id = hit.get("objectID")
if hit_id in seen_ids:
continue
seen_ids.add(hit_id)
dataset.append({
"id": hit_id,
"title": hit.get("title"),
"url": hit.get("url"),
"points": hit.get("points", 0),
"comments": hit.get("num_comments", 0),
"author": hit.get("author"),
"created_at": hit.get("created_at"),
"tags": hit.get("_tags", []),
"story_text": hit.get("story_text"),
})
page += 1
print(f" Page {page}: {len(dataset)} stories collected")
time.sleep(1.0)
if page % 10 == 0:
checkpoint = OUTPUT / f"checkpoint_page_{page}.json"
with open(checkpoint, "w") as f:
json.dump(dataset, f, indent=2)
return dataset
Ask HN and Show HN Mining
Ask HN and Show HN threads are particularly valuable — they contain curated expert opinion and product launches:
def get_ask_hn_threads(query: str = "", min_points: int = 50) -> list[dict]:
params = {"tags": "ask_hn", "hitsPerPage": 50}
if query:
params["query"] = query
if min_points:
params["numericFilters"] = f"points>{min_points}"
resp = httpx.get(f"{ALGOLIA}/search_by_date", params=params)
resp.raise_for_status()
return resp.json()["hits"]
def get_show_hn_launches(days_back: int = 30) -> list[dict]:
cutoff = int(time.time()) - days_back * 86400
resp = httpx.get(
f"{ALGOLIA}/search_by_date",
params={"tags": "show_hn", "numericFilters": f"created_at_i>{cutoff}", "hitsPerPage": 50},
)
resp.raise_for_status()
return resp.json()["hits"]
ask_threads = get_ask_hn_threads("what are you building", min_points=100)
for t in ask_threads[:5]:
print(f" {t.get('points', 0):>4} pts | {t['title'][:65]}")
launches = get_show_hn_launches(days_back=7)
print(f"{len(launches)} Show HN launches this week")
Who Is Hiring? Mining Job Threads
HN's monthly 'Who is hiring?' megathreads are one of the best sources of tech job data. Each thread has thousands of top-level comments, each being a job posting:
import re
def find_hiring_threads(year: int = 2026) -> list[dict]:
resp = httpx.get(
f"{ALGOLIA}/search",
params={"query": "Ask HN: Who is hiring?", "tags": "story", "hitsPerPage": 20},
)
resp.raise_for_status()
hits = resp.json()["hits"]
return [h for h in hits if str(year) in h.get("title", "")]
def scrape_hiring_thread(story_id: int) -> list[dict]:
story = get_story_full_algolia(story_id)
jobs = []
for comment in story.get("children", []):
text = comment.get("text", "")
if not text or comment.get("deleted"):
continue
job = {
"comment_id": comment.get("id"),
"author": comment.get("author"),
"text": text,
"created_at": comment.get("created_at"),
}
# Extract salary range
salary_match = re.search(r'\$\d{2,3}[Kk]?\s*[-]\s*\$?\d{2,3}[Kk]', text)
if salary_match:
nums = re.findall(r'\d{2,3}', salary_match.group())
if len(nums) >= 2:
low, high = int(nums[0]) * 1000, int(nums[1]) * 1000
job["salary_low"] = low
job["salary_high"] = high
job["remote"] = bool(re.search(r'\bremote\b', text, re.IGNORECASE))
job["visa_sponsorship"] = bool(re.search(r'visa', text, re.IGNORECASE))
jobs.append(job)
return jobs
User Profile Analysis
User data from Firebase reveals engagement patterns and karma history:
def get_user_profile(username: str) -> dict:
# Firebase gives karma, created date, about, submission IDs
fb_resp = httpx.get(f"{HN_API}/user/{username}.json", timeout=10)
fb_resp.raise_for_status()
user = fb_resp.json()
if not user:
return {}
# Algolia gives submission history with engagement metrics
algo_resp = httpx.get(f"{ALGOLIA}/users/{username}", timeout=10)
algo_data = algo_resp.json() if algo_resp.status_code == 200 else {}
return {
"id": user["id"],
"karma": user["karma"],
"created": user["created"],
"about": user.get("about", ""),
"submitted_count": len(user.get("submitted", [])),
"avg_story_score": algo_data.get("avg", 0),
}
def get_user_best_posts(username: str, limit: int = 10) -> list[dict]:
resp = httpx.get(
f"{ALGOLIA}/search",
params={"tags": f"story,author_{username}", "hitsPerPage": limit},
timeout=10,
)
resp.raise_for_status()
return [
{
"id": h["objectID"],
"title": h.get("title"),
"points": h.get("points", 0),
"comments": h.get("num_comments", 0),
"url": h.get("url"),
}
for h in resp.json()["hits"]
]
Trend Analysis
Building a trend detector over HN data reveals what topics the developer community cares about:
def analyze_topic_trends(queries: list[str], days_back: int = 30, min_points: int = 50) -> dict:
cutoff = int(time.time()) - days_back * 86400
results = {}
for query in queries:
resp = httpx.get(
f"{ALGOLIA}/search_by_date",
params={
"query": query,
"tags": "story",
"numericFilters": f"points>{min_points},created_at_i>{cutoff}",
"hitsPerPage": 50,
},
timeout=10,
)
resp.raise_for_status()
hits = resp.json()["hits"]
results[query] = {
"count": len(hits),
"avg_points": sum(h.get("points", 0) for h in hits) / len(hits) if hits else 0,
"avg_comments": sum(h.get("num_comments", 0) for h in hits) / len(hits) if hits else 0,
}
time.sleep(0.5)
return results
topics = ["rust", "python", "golang", "llm", "ai agent", "webassembly"]
trends = analyze_topic_trends(topics, days_back=30)
print(f"{'Topic':<15} {'Posts':>5} {'Avg pts':>8}")
for topic, data in sorted(trends.items(), key=lambda x: x[1]['count'], reverse=True):
print(f"{topic:<15} {data['count']:>5} {data['avg_points']:>8.0f}")
Using Proxies for High-Volume Scraping
While HN's APIs are generous, large-scale operations — scanning all items since a given ID, bulk comment extraction for ML training data, running many parallel trend queries — benefit from proxy rotation.
The Firebase API is rate-limited per IP. Heavy concurrent use without rotating IPs hits 429s after a few thousand requests. ThorData's residential proxies distribute requests across real residential IPs, preventing rate limiting:
import httpx
import asyncio
# ThorData residential proxy — https://thordata.partnerstack.com/partner/0a0x4nzb (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
PROXY_URL = "http://USER:[email protected]:9000"
async def fetch_items_bulk_proxied(
item_ids: list[int],
proxy_url: str,
) -> list[dict]:
transport = httpx.AsyncHTTPTransport(proxy=proxy_url)
async with httpx.AsyncClient(transport=transport) as client:
sem = asyncio.Semaphore(30)
async def fetch_one(iid):
async with sem:
try:
r = await client.get(
f"https://hacker-news.firebaseio.com/v0/item/{iid}.json",
timeout=15,
)
return r.json()
except Exception:
return None
results = await asyncio.gather(*[fetch_one(i) for i in item_ids])
return [r for r in results if r]
Error Handling and Retry Logic
The Firebase API occasionally returns null items. A production scraper needs robust handling:
import asyncio
import httpx
from typing import Optional
HN_API = "https://hacker-news.firebaseio.com/v0"
async def fetch_item_with_retry(
client: httpx.AsyncClient,
item_id: int,
max_retries: int = 3,
backoff_base: float = 2.0,
) -> Optional[dict]:
for attempt in range(max_retries):
try:
resp = await client.get(
f"{HN_API}/item/{item_id}.json",
timeout=10.0,
)
if resp.status_code == 429:
wait = backoff_base ** attempt
print(f" Rate limited on item {item_id}, waiting {wait:.1f}s")
await asyncio.sleep(wait)
continue
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
return data # may be None for deleted items
except httpx.TimeoutException:
if attempt < max_retries - 1:
await asyncio.sleep(backoff_base ** attempt)
else:
print(f" Timeout on item {item_id} after {max_retries} attempts")
return None
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500:
await asyncio.sleep(backoff_base ** attempt)
else:
return None
return None
Historical Archive Scanning
HN's maxitem endpoint lets you scan the entire historical record:
async def scan_items_from(
start_id: int,
end_id: int | None = None,
filter_type: str = "story",
min_score: int = 0,
batch_size: int = 500,
) -> list[dict]:
if end_id is None:
resp = httpx.get(f"{HN_API}/maxitem.json")
end_id = resp.json()
print(f"Scanning items {start_id} to {end_id} ({end_id - start_id:,} total)")
all_items = []
for batch_start in range(start_id, end_id, batch_size):
batch_end = min(batch_start + batch_size, end_id)
batch_ids = list(range(batch_start, batch_end))
items = await fetch_items_bulk_proxied(batch_ids, PROXY_URL)
matching = [
i for i in items
if i and i.get("type") == filter_type and i.get("score", 0) >= min_score
]
all_items.extend(matching)
progress = (batch_start - start_id) / (end_id - start_id) * 100
print(f" {progress:.1f}% — {len(all_items)} matching items")
return all_items
Common Pitfalls
Firebase returns null for deleted items. Always check for None before accessing fields. About 3-5% of items in any large batch will be null.
Algolia has a 10,000 hit limit. You cannot paginate past page 200 at 50 hits/page. Use created_at_i numeric filters to window your queries across time ranges.
Comment trees can be deep. Some HN threads go 20+ levels deep. Set a max_depth parameter or you will burn through API calls on deeply nested threads.
The text field contains HTML. Comments come back with <p> tags, <a> links, and <code> blocks. Use BeautifulSoup to parse:
from bs4 import BeautifulSoup
def clean_comment_text(html_text: str) -> str:
if not html_text:
return ""
soup = BeautifulSoup(html_text, "html.parser")
for p in soup.find_all("p"):
p.replace_with("\n\n" + p.get_text())
return soup.get_text().strip()
Algolia timestamps are Unix epoch. The created_at_i field is seconds since epoch. Convert with datetime.fromtimestamp(ts).
The url field is absent for Ask HN posts. Ask HN, Show HN, and polls do not have external URLs — only text. Always check before accessing.
Scores change over time. HN applies time decay to story scores. A story score when you fetch it may differ from when it was first submitted.
Wrapping Up
Hacker News is a scraper's dream — two well-maintained APIs, no authentication, and clean JSON responses. The Firebase API gives you real-time access to individual items, while Algolia gives you full-text search, filtering, and entire comment trees in single requests.
Start with Algolia for filtered queries and dataset building. Use Firebase when you need specific items or real-time data. Keep your concurrency at 20 or fewer simultaneous requests to stay within rate limits, and use a proxy pool for bulk historical scanning. ThorData provides residential IP infrastructure needed when scaling beyond a few thousand requests. The data is rich, the APIs are free, and the signal-to-noise ratio is among the best of any public dataset.
Building a News Intelligence System
The most powerful application of HN scraping is building a continuously-updated news intelligence system — a private dashboard showing what matters to the technical community right now.
Here is a production-ready implementation that runs on a schedule, deduplicates across runs, and surfaces trends:
#!/usr/bin/env python3
"""
HN news intelligence system.
Runs periodically to track trending topics, emerging technologies,
and high-signal discussions in the developer community.
"""
import asyncio
import json
import httpx
import time
from pathlib import Path
from datetime import datetime, timedelta
from collections import Counter, defaultdict
import re
HN_API = "https://hacker-news.firebaseio.com/v0"
ALGOLIA = "https://hn.algolia.com/api/v1"
DATA_DIR = Path("hn_intelligence")
DATA_DIR.mkdir(exist_ok=True)
SEM = asyncio.Semaphore(20)
async def fetch_item(client: httpx.AsyncClient, item_id: int) -> dict | None:
async with SEM:
try:
r = await client.get(f"{HN_API}/item/{item_id}.json", timeout=10)
r.raise_for_status()
return r.json()
except Exception:
return None
def get_trending_stories(hours_back: int = 24, min_points: int = 100) -> list[dict]:
cutoff = int(time.time()) - hours_back * 3600
all_stories = []
page = 0
while page < 10:
resp = httpx.get(
f"{ALGOLIA}/search_by_date",
params={
"tags": "story",
"numericFilters": f"points>{min_points},created_at_i>{cutoff}",
"hitsPerPage": 50,
"page": page,
},
timeout=15,
)
resp.raise_for_status()
hits = resp.json()["hits"]
if not hits:
break
all_stories.extend(hits)
page += 1
time.sleep(0.3)
return all_stories
def extract_topics(stories: list[dict]) -> dict:
"""Extract topic signals from story titles and text."""
tech_patterns = {
"ai_ml": r"\b(llm|gpt|claude|gemini|ai|machine learning|neural|transformer|rag|embedding)\b",
"web_dev": r"\b(react|vue|svelte|nextjs|tailwind|typescript|javascript|frontend)\b",
"systems": r"\b(rust|c\+\+|golang|zig|kernel|os|low.level|memory|performance)\b",
"cloud": r"\b(aws|gcp|azure|kubernetes|docker|serverless|terraform|cloud)\b",
"security": r"\b(cve|vulnerability|exploit|zero.day|hack|breach|ransomware|security)\b",
"startups": r"\b(yc|ycombinator|series.a|funding|startup|launch|saas|indie)\b",
"databases": r"\b(postgres|mysql|sqlite|mongodb|redis|database|sql|nosql)\b",
"open_source": r"\b(open source|github|gitlab|fork|contributor|pull request)\b",
}
topic_counts = defaultdict(int)
topic_stories = defaultdict(list)
for story in stories:
text = (story.get("title", "") + " " + (story.get("story_text") or "")).lower()
for topic, pattern in tech_patterns.items():
if re.search(pattern, text, re.IGNORECASE):
topic_counts[topic] += 1
topic_stories[topic].append({
"title": story.get("title"),
"points": story.get("points", 0),
"url": story.get("url"),
"objectID": story.get("objectID"),
})
return {
"counts": dict(topic_counts),
"top_stories_by_topic": {
topic: sorted(stories_list, key=lambda x: x["points"], reverse=True)[:3]
for topic, stories_list in topic_stories.items()
},
}
async def get_top_comment_excerpts(story_ids: list[int]) -> list[dict]:
"""Fetch top comments from high-signal stories."""
excerpts = []
async with httpx.AsyncClient() as client:
for story_id in story_ids[:10]: # limit to avoid too many requests
resp = httpx.get(f"{ALGOLIA}/items/{story_id}", timeout=20)
if resp.status_code != 200:
continue
data = resp.json()
top_comments = sorted(
[c for c in data.get("children", []) if c and c.get("text") and not c.get("deleted")],
key=lambda c: len(c.get("children", [])),
reverse=True,
)[:3]
for comment in top_comments:
from bs4 import BeautifulSoup
text = BeautifulSoup(comment.get("text", ""), "html.parser").get_text()
excerpts.append({
"story_id": story_id,
"story_title": data.get("title"),
"comment_id": comment.get("id"),
"author": comment.get("author"),
"text_preview": text[:300],
"reply_count": len(comment.get("children", [])),
})
time.sleep(0.5)
return excerpts
def generate_daily_digest(stories: list[dict], topics: dict) -> str:
"""Generate a markdown digest of the day's HN highlights."""
lines = []
lines.append(f"# HN Daily Digest — {datetime.now().strftime('%B %d, %Y')}")
lines.append("")
lines.append(f"Total qualifying stories: {len(stories)}")
lines.append("")
lines.append("## Top Stories")
lines.append("")
top_stories = sorted(stories, key=lambda s: s.get("points", 0), reverse=True)[:20]
for i, story in enumerate(top_stories, 1):
pts = story.get("points", 0)
cmts = story.get("num_comments", 0)
title = story.get("title", "N/A")
url = story.get("url", f"https://news.ycombinator.com/item?id={story.get('objectID')}")
lines.append(f"{i}. **{title}**")
lines.append(f" - {pts} points | {cmts} comments | [Link]({url})")
lines.append("")
lines.append("## Topic Trends")
lines.append("")
for topic, count in sorted(topics["counts"].items(), key=lambda x: x[1], reverse=True):
lines.append(f"**{topic.replace('_', ' ').title()}**: {count} stories")
top = topics["top_stories_by_topic"].get(topic, [])[:2]
for s in top:
lines.append(f" - {s['title']} ({s['points']} pts)")
lines.append("")
return "\n".join(lines)
async def run_daily_collection():
print("Starting HN intelligence collection...")
# Fetch trending stories
stories_24h = get_trending_stories(hours_back=24, min_points=100)
print(f" Found {len(stories_24h)} stories in the past 24h with 100+ points")
# Extract topics
topics = extract_topics(stories_24h)
print(f" Topic analysis: {topics['counts']}")
# Fetch comment highlights from top 10 stories
top_ids = [int(s["objectID"]) for s in sorted(
stories_24h, key=lambda x: x.get("points", 0), reverse=True
)[:10]]
comment_highlights = await get_top_comment_excerpts(top_ids)
print(f" Extracted {len(comment_highlights)} comment highlights")
# Generate digest
digest = generate_daily_digest(stories_24h, topics)
# Save outputs
timestamp = datetime.now().strftime("%Y%m%d")
stories_file = DATA_DIR / f"stories_{timestamp}.json"
stories_file.write_text(json.dumps(stories_24h, indent=2))
topics_file = DATA_DIR / f"topics_{timestamp}.json"
topics_file.write_text(json.dumps(topics, indent=2))
digest_file = DATA_DIR / f"digest_{timestamp}.md"
digest_file.write_text(digest)
print(f"\nCollection complete!")
print(f" Stories: {stories_file}")
print(f" Topics: {topics_file}")
print(f" Digest: {digest_file}")
if __name__ == "__main__":
asyncio.run(run_daily_collection())
Monitoring Show HN for Product Launches
Show HN threads are the closest thing to a curated product launch feed for the technical community. Every major technical product launch attempts a Show HN. Here is a specialized collector:
import httpx
import json
import time
from pathlib import Path
from datetime import datetime, timedelta
ALGOLIA = "https://hn.algolia.com/api/v1"
def collect_show_hn_launches(
days_back: int = 7,
min_points: int = 20,
output_dir: str = "show_hn_launches",
) -> list[dict]:
out = Path(output_dir)
out.mkdir(exist_ok=True)
cutoff = int(time.time()) - days_back * 86400
launches = []
page = 0
while True:
resp = httpx.get(
f"{ALGOLIA}/search_by_date",
params={
"tags": "show_hn",
"numericFilters": f"points>{min_points},created_at_i>{cutoff}",
"hitsPerPage": 50,
"page": page,
},
timeout=15,
)
resp.raise_for_status()
hits = resp.json()["hits"]
if not hits:
break
for hit in hits:
launches.append({
"id": hit.get("objectID"),
"title": hit.get("title"),
"url": hit.get("url"),
"points": hit.get("points", 0),
"comments": hit.get("num_comments", 0),
"author": hit.get("author"),
"created_at": hit.get("created_at"),
"hn_url": f"https://news.ycombinator.com/item?id={hit.get('objectID')}",
})
page += 1
time.sleep(0.5)
# Sort by engagement (points + comments)
launches.sort(key=lambda x: x["points"] + x["comments"], reverse=True)
timestamp = datetime.now().strftime("%Y%m%d")
out_file = out / f"launches_{timestamp}.json"
out_file.write_text(json.dumps(launches, indent=2))
print(f"Collected {len(launches)} Show HN launches in past {days_back} days")
print(f" Saved to: {out_file}")
print()
print("Top launches:")
for l in launches[:10]:
print(f" {l['points']:>4} pts | {l['title'][:65]}")
return launches
Scraping HN Jobs Board
The HN Jobs board (news.ycombinator.com/jobs) is a curated list of YC-backed and high-quality tech jobs. It uses a different mechanism than regular HN stories:
import httpx
from bs4 import BeautifulSoup
import json
import time
from pathlib import Path
from datetime import datetime
def scrape_hn_jobs_board(max_pages: int = 3) -> list[dict]:
"""Scrape the HN jobs board at news.ycombinator.com/jobs."""
base_url = "https://news.ycombinator.com/jobs"
jobs = []
for page_num in range(max_pages):
params = {}
if page_num > 0:
params["next"] = jobs[-1].get("_next_token") if jobs else None
if not params["next"]:
break
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
}
resp = httpx.get(base_url, params=params, headers=headers, timeout=15)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# HN jobs board uses tr.athing rows
job_rows = soup.select("tr.athing")
for row in job_rows:
item_id = row.get("id")
title_link = row.select_one(".titleline a")
site_el = row.select_one(".sitestr")
age_el = row.select_one(".age a")
if title_link:
jobs.append({
"id": item_id,
"title": title_link.get_text(strip=True),
"url": title_link.get("href"),
"site": site_el.get_text(strip=True) if site_el else None,
"age": age_el.get_text(strip=True) if age_el else None,
"hn_url": f"https://news.ycombinator.com/item?id={item_id}",
})
# Find next page token
next_link = soup.select_one("a.morelink")
if next_link:
next_href = next_link.get("href", "")
next_token = next_href.split("next=")[-1] if "next=" in next_href else None
if jobs:
jobs[-1]["_next_token"] = next_token
else:
break
time.sleep(2)
# Clean internal tokens
for job in jobs:
job.pop("_next_token", None)
print(f"Scraped {len(jobs)} jobs from HN Jobs board")
return jobs
Competitor and Technology Monitoring
Track mentions of specific companies or technologies over time to spot trends before they appear in mainstream tech news:
import httpx
import json
import time
from pathlib import Path
from datetime import datetime
ALGOLIA = "https://hn.algolia.com/api/v1"
MONITOR_DIR = Path("hn_monitoring")
MONITOR_DIR.mkdir(exist_ok=True)
def get_mentions(
query: str,
days_back: int = 30,
min_points: int = 0,
) -> list[dict]:
cutoff = int(time.time()) - days_back * 86400
params = {
"query": query,
"tags": "story",
"numericFilters": f"created_at_i>{cutoff}",
"hitsPerPage": 50,
}
if min_points > 0:
params["numericFilters"] += f",points>{min_points}"
resp = httpx.get(f"{ALGOLIA}/search_by_date", params=params, timeout=15)
resp.raise_for_status()
return resp.json()["hits"]
def compare_technologies(
tech_list: list[str],
days_back: int = 90,
) -> dict:
results = {}
for tech in tech_list:
mentions = get_mentions(tech, days_back=days_back)
results[tech] = {
"mention_count": len(mentions),
"avg_points": sum(m.get("points", 0) for m in mentions) / len(mentions) if mentions else 0,
"avg_comments": sum(m.get("num_comments", 0) for m in mentions) / len(mentions) if mentions else 0,
"recent_stories": [
{"title": m.get("title"), "points": m.get("points", 0)}
for m in sorted(mentions, key=lambda x: x.get("points", 0), reverse=True)[:3]
],
}
time.sleep(0.5)
# Rank by mention count
ranked = sorted(results.items(), key=lambda x: x[1]["mention_count"], reverse=True)
print(f"\nTechnology mentions on HN (past {days_back} days):")
print(f"{'Technology':<20} {'Mentions':>8} {'Avg pts':>8} {'Avg cmts':>10}")
print("-" * 50)
for tech, data in ranked:
print(f"{tech:<20} {data['mention_count']:>8} {data['avg_points']:>8.0f} {data['avg_comments']:>10.0f}")
return dict(ranked)
# Example: compare competing frameworks
compare_technologies(
["react", "vue", "svelte", "htmx", "datastar"],
days_back=90,
)
Exporting for Analysis and Visualization
HN data is well-suited for analysis in pandas, visualization in matplotlib, or feeding into LLMs for summarization:
import json
from pathlib import Path
from datetime import datetime
def export_for_analysis(
stories: list[dict],
output_path: str = "hn_analysis_export.json",
) -> None:
"""Export stories in a format ready for data analysis."""
export = []
for s in stories:
# Parse timestamp
created_ts = None
if s.get("created_at"):
try:
dt = datetime.fromisoformat(s["created_at"].replace("Z", "+00:00"))
created_ts = dt.isoformat()
except Exception:
pass
export.append({
"id": s.get("objectID"),
"title": s.get("title"),
"url": s.get("url"),
"domain": s.get("url", "").split("/")[2] if s.get("url") and len(s["url"].split("/")) > 2 else None,
"author": s.get("author"),
"points": s.get("points", 0),
"comments": s.get("num_comments", 0),
"created_at": created_ts,
"is_ask_hn": "Ask HN" in (s.get("title") or ""),
"is_show_hn": "Show HN" in (s.get("title") or ""),
"tags": s.get("_tags", []),
})
Path(output_path).write_text(json.dumps(export, indent=2))
print(f"Exported {len(export)} stories to {output_path}")
Rate Limit Reference and Best Practices
Summary of rate limits and recommended practices across both APIs:
| API | Endpoint | Rate Limit | Recommended Delay |
|---|---|---|---|
| Firebase | /item/{id}.json |
~1000/min/IP | 20 concurrent max |
| Firebase | /topstories.json |
Generous | No delay needed |
| Algolia | /search |
~100/min | 0.5s between calls |
| Algolia | /search_by_date |
~100/min | 0.5s between calls |
| Algolia | /items/{id} |
~50/min | 1s between calls |
For sustained scraping beyond these limits, use a proxy pool. ThorData's residential proxies let you safely increase throughput by distributing requests across many IPs:
import httpx
import asyncio
PROXY_URL = "http://USER:[email protected]:9000"
async def create_proxied_client() -> httpx.AsyncClient:
transport = httpx.AsyncHTTPTransport(proxy=PROXY_URL)
return httpx.AsyncClient(
transport=transport,
timeout=15.0,
limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
)
# Use as context manager
async def bulk_fetch_with_proxy(item_ids: list[int]) -> list[dict]:
transport = httpx.AsyncHTTPTransport(proxy=PROXY_URL)
async with httpx.AsyncClient(transport=transport) as client:
sem = asyncio.Semaphore(30)
async def fetch_one(iid):
async with sem:
try:
r = await client.get(
f"https://hacker-news.firebaseio.com/v0/item/{iid}.json",
timeout=15,
)
return r.json()
except Exception:
return None
results = await asyncio.gather(*[fetch_one(i) for i in item_ids])
return [r for r in results if r]
The combination of async I/O, reasonable concurrency (20-30 simultaneous requests), and residential proxy rotation allows sustained collection rates of several thousand items per minute — fast enough to process HN's full historical archive of 40+ million items in a matter of days.
Exporting HN Data for LLM Processing
HN discussions are exceptionally valuable for LLM training, retrieval-augmented generation (RAG), and prompt testing. The combination of technical depth and community curation makes HN one of the highest-quality text corpora available:
import asyncio
import json
import httpx
from pathlib import Path
from datetime import datetime
from bs4 import BeautifulSoup
ALGOLIA = "https://hn.algolia.com/api/v1"
HN_API = "https://hacker-news.firebaseio.com/v0"
def clean_comment_html(html_text: str) -> str:
if not html_text:
return ""
soup = BeautifulSoup(html_text, "html.parser")
for a in soup.find_all("a"):
url = a.get("href", "")
text = a.get_text()
a.replace_with(f"{text} ({url})" if url else text)
text = soup.get_text(separator="
")
lines = [line.strip() for line in text.splitlines()]
return "
".join(line for line in lines if line)
def flatten_comment_tree(comment: dict, depth: int = 0) -> list[dict]:
if not comment or comment.get("deleted"):
return []
flat = [{
"id": comment.get("id"),
"author": comment.get("author"),
"text": clean_comment_html(comment.get("text", "")),
"depth": depth,
"reply_count": len(comment.get("children", [])),
"created_at": comment.get("created_at"),
}]
for child in comment.get("children", []):
flat.extend(flatten_comment_tree(child, depth + 1))
return flat
def export_story_for_rag(story_id: int, max_comments: int = 100) -> dict:
resp = httpx.get(f"{ALGOLIA}/items/{story_id}", timeout=30)
resp.raise_for_status()
data = resp.json()
all_comments = []
for child in data.get("children", []):
all_comments.extend(flatten_comment_tree(child, depth=0))
all_comments = [c for c in all_comments if c.get("text") and len(c["text"]) > 20]
top_comments = sorted(all_comments, key=lambda c: c["reply_count"], reverse=True)[:max_comments]
return {
"id": data.get("id"),
"title": data.get("title"),
"url": data.get("url"),
"author": data.get("author"),
"points": data.get("points", 0),
"created_at": data.get("created_at"),
"hn_url": f"https://news.ycombinator.com/item?id={data.get('id')}",
"story_text": clean_comment_html(data.get("text", "")),
"comment_count": len(all_comments),
"top_comments": top_comments,
"all_text": data.get("title", "") + "\n\n" + clean_comment_html(data.get("text", "")) + "\n\n" + "\n\n".join(c["text"] for c in top_comments[:50]),
}
def build_rag_corpus(
min_points: int = 200,
days_back: int = 180,
max_stories: int = 500,
output_file: str = "hn_rag_corpus.jsonl",
) -> int:
import time
cutoff = int(time.time()) - days_back * 86400
resp = httpx.get(
f"{ALGOLIA}/search_by_date",
params={
"tags": "story",
"numericFilters": f"points>{min_points},created_at_i>{cutoff}",
"hitsPerPage": 50,
},
timeout=15,
)
story_ids = [int(h["objectID"]) for h in resp.json()["hits"][:max_stories]]
print(f"Exporting {len(story_ids)} stories for RAG corpus")
exported = 0
with open(output_file, "w", encoding="utf-8") as f:
for i, sid in enumerate(story_ids):
try:
story = export_story_for_rag(sid)
if story.get("all_text"):
f.write(json.dumps(story, ensure_ascii=False) + "\n")
exported += 1
except Exception as e:
print(f" Failed {sid}: {e}")
if (i + 1) % 20 == 0:
print(f" Exported {exported}/{i+1}")
time.sleep(0.5)
print(f"RAG corpus saved to {output_file}: {exported} stories")
return exported
Monitoring Domain Reputation via HN Link Tracking
Tracking which domains get submitted to HN and how they perform reveals domain reputation — useful for SEO, content strategy, and competitive intelligence:
import time
import httpx
from collections import Counter
from urllib.parse import urlparse
ALGOLIA = "https://hn.algolia.com/api/v1"
def track_domain_performance(
domain: str,
days_back: int = 365,
) -> dict:
cutoff = int(time.time()) - days_back * 86400
all_stories = []
page = 0
while page < 20:
resp = httpx.get(
f"{ALGOLIA}/search",
params={
"query": domain,
"tags": "story",
"numericFilters": f"created_at_i>{cutoff}",
"hitsPerPage": 50,
"page": page,
},
timeout=15,
)
resp.raise_for_status()
hits = resp.json()["hits"]
if not hits:
break
for hit in hits:
if domain in (hit.get("url") or ""):
all_stories.append(hit)
page += 1
time.sleep(0.3)
if not all_stories:
return {"domain": domain, "submissions": 0}
points = [s.get("points", 0) for s in all_stories]
comments = [s.get("num_comments", 0) for s in all_stories]
import statistics
return {
"domain": domain,
"submissions": len(all_stories),
"avg_points": statistics.mean(points) if points else 0,
"median_points": statistics.median(points) if points else 0,
"max_points": max(points) if points else 0,
"avg_comments": statistics.mean(comments) if comments else 0,
"top_story": max(all_stories, key=lambda s: s.get("points", 0), default={}).get("title"),
"total_points": sum(points),
}
Summary and Quick Reference
Here is a quick reference for HN scraping in 2026:
| Task | API | Endpoint | Auth |
|---|---|---|---|
| Top story IDs | Firebase | /topstories.json |
None |
| Story detail | Firebase | /item/{id}.json |
None |
| Search stories | Algolia | /search?query=X |
None |
| Date-filtered stories | Algolia | /search_by_date |
None |
| Full story + comments | Algolia | /items/{id} |
None |
| User profile | Firebase | /user/{username}.json |
None |
| Historical scan | Firebase | /maxitem.json + item loop |
None |
All endpoints are free, unauthenticated, and well-maintained. ThorData provides residential proxy rotation for bulk operations where IP-level rate limiting becomes a concern.