← Back to blog

How to Scrape Hacker News: Firebase API, Algolia Search & Python (2026)

How to Scrape Hacker News: Firebase API, Algolia Search & Python (2026)

Hacker News is one of the cleanest sites to scrape because it actually wants you to access its data programmatically. There are two official APIs — the Firebase real-time API and Algolia's search API — and neither requires authentication.

No API key. No rate limit headers. No OAuth dance. You just hit the endpoints and get JSON back.

That said, there are still things you can get wrong. Fetching 500 stories one at a time will be painfully slow without concurrency. Algolia has undocumented rate limits. And if you want comment trees, you need to understand HN's data model.

Let's walk through both APIs and build something useful.

The Firebase API: Real-Time Data

HN's official API lives at hacker-news.firebaseio.com/v0/. It's a REST API that mirrors the Firebase real-time database.

Key endpoints:

Every story, comment, and poll on HN is an 'item' with a numeric ID. Stories have kids (comment IDs), comments have kids (reply IDs), and you walk the tree recursively.

Item Schema

A typical story item looks like:

{
  "by": "dhouston",
  "descendants": 71,
  "id": 8863,
  "kids": [8952, 9224, 8917],
  "score": 111,
  "time": 1175714200,
  "title": "My YC app: Dropbox - Throw away your USB drive",
  "type": "story",
  "url": "http://www.getdropbox.com/u/2/screencast.html"
}

Comments include the parent ID and text as HTML:

{
  "by": "norvig",
  "id": 2921983,
  "kids": [2922097, 2922429],
  "parent": 2921506,
  "text": "Agreed, but...",
  "time": 1314211127,
  "type": "comment"
}

Basic Fetcher: Async with Concurrency Control

Here's a solid foundation for all HN fetching. The semaphore prevents overwhelming the API:

import httpx
import asyncio
import json
from pathlib import Path

HN_API = "https://hacker-news.firebaseio.com/v0"
SEM = asyncio.Semaphore(20)  # max 20 concurrent requests

async def fetch_item(client: httpx.AsyncClient, item_id: int) -> dict | None:
    async with SEM:
        try:
            resp = await client.get(
                f"{HN_API}/item/{item_id}.json",
                timeout=10.0,
            )
            resp.raise_for_status()
            return resp.json()
        except (httpx.HTTPError, httpx.TimeoutException) as e:
            print(f"  Warning: failed to fetch item {item_id}: {e}")
            return None

async def fetch_top_stories(limit: int = 30) -> list[dict]:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"{HN_API}/topstories.json")
        story_ids = resp.json()[:limit]
        print(f"Fetching {len(story_ids)} stories...")
        tasks = [fetch_item(client, sid) for sid in story_ids]
        stories = await asyncio.gather(*tasks)
    return [s for s in stories if s and s.get("type") == "story"]

if __name__ == "__main__":
    stories = asyncio.run(fetch_top_stories(30))
    for s in stories[:10]:
        print(f"  {s.get('score', 0):>5} pts | {s.get('title', 'N/A')[:70]}")
        print(f"          by {s.get('by')} | {s.get('descendants', 0)} comments")

The async approach matters enormously here. Fetching 30 stories sequentially takes 5-10 seconds. With asyncio.gather, it completes in under a second.

Fetching Comment Trees

Comments are where HN's real value lives. Each story has a kids field — an array of top-level comment IDs. Each comment can also have kids. You walk the tree recursively:

async def fetch_comment_tree(
    client: httpx.AsyncClient,
    item_id: int,
    depth: int = 0,
    max_depth: int = 10,
) -> dict | None:
    if depth > max_depth:
        return None

    item = await fetch_item(client, item_id)
    if not item:
        return None

    # Skip deleted and dead comments
    if item.get("deleted") or item.get("dead"):
        return None

    item["depth"] = depth

    # Recursively fetch children
    kid_tasks = [
        fetch_comment_tree(client, kid_id, depth + 1, max_depth)
        for kid_id in item.get("kids", [])
    ]
    results = await asyncio.gather(*kid_tasks)
    item["children"] = [r for r in results if r is not None]
    return item

async def get_story_with_comments(story_id: int) -> dict:
    async with httpx.AsyncClient() as client:
        story = await fetch_item(client, story_id)
        if not story:
            return {}
        print(f"Fetching comments for: {story.get('title', story_id)[:60]}")
        top_level_tasks = [
            fetch_comment_tree(client, kid_id)
            for kid_id in story.get("kids", [])
        ]
        top_comments = await asyncio.gather(*top_level_tasks)
        story["comment_tree"] = [c for c in top_comments if c]
    return story

A popular story can have 500+ comments. That's 500+ API calls. For bulk jobs — say, fetching all comments from the top 100 stories — you're looking at tens of thousands of requests.

This is where you need to be careful. The Firebase API doesn't publish rate limits, but hammer it too hard and you'll get 429s or temporary blocks. Keep the semaphore at 20 concurrent requests max, and consider routing through a proxy pool for large-scale work. ThorData's residential proxies work well for this since they rotate IPs automatically and handle connection pooling for you.

Algolia Search API: The Power Tool

The Algolia API at hn.algolia.com/api/v1/ powers HN's built-in search. It's faster for filtered queries and returns richer data than Firebase for many use cases.

Key endpoints:

Tags you can filter on: story, comment, poll, job, ask_hn, show_hn, front_page, author_USERNAME, story_STORY_ID

import httpx
import time

ALGOLIA = "https://hn.algolia.com/api/v1"

def search_stories(
    query: str,
    page: int = 0,
    hits_per_page: int = 50,
    min_points: int = 0,
    tags: str = "story",
) -> tuple[list, int]:
    params = {
        "query": query,
        "tags": tags,
        "page": page,
        "hitsPerPage": hits_per_page,
    }
    if min_points > 0:
        params["numericFilters"] = f"points>{min_points}"

    resp = httpx.get(f"{ALGOLIA}/search", params=params, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    return data["hits"], data["nbPages"]

def search_all_pages(
    query: str,
    min_points: int = 100,
    max_pages: int = 10,
) -> list[dict]:
    all_hits = []
    for page in range(max_pages):
        hits, total_pages = search_stories(query, page=page, min_points=min_points)
        if not hits or page >= total_pages:
            break
        all_hits.extend(hits)
        print(f"  Page {page+1}/{min(max_pages, total_pages)}: {len(hits)} hits")
        time.sleep(0.5)
    return all_hits

Date Range Queries

Algolia supports created_at_i (Unix timestamp) for time filtering:

import time

def get_stories_in_range(
    start_ts: int,
    end_ts: int,
    min_points: int = 50,
) -> list[dict]:
    all_hits = []
    page = 0
    while True:
        params = {
            "tags": "story",
            "numericFilters": (
                f"created_at_i>{start_ts},"
                f"created_at_i<{end_ts},"
                f"points>{min_points}"
            ),
            "hitsPerPage": 50,
            "page": page,
        }
        resp = httpx.get(f"{ALGOLIA}/search_by_date", params=params)
        resp.raise_for_status()
        data = resp.json()
        if not data["hits"]:
            break
        all_hits.extend(data["hits"])
        page += 1
        time.sleep(0.3)
    return all_hits

# Get stories from last 7 days with 100+ points
one_week_ago = int(time.time()) - 7 * 86400
recent = get_stories_in_range(one_week_ago, int(time.time()), min_points=100)
print(f"Found {len(recent)} quality stories from the past week")

Full Comment Tree in One Call

Algolia's /items/{id} endpoint returns a story with its entire nested comment tree — no recursive Firebase calls needed:

def get_story_full_algolia(story_id: int) -> dict:
    resp = httpx.get(f"{ALGOLIA}/items/{story_id}", timeout=30)
    resp.raise_for_status()
    data = resp.json()

    def count_comments(node: dict) -> int:
        count = 0
        for child in node.get("children", []):
            count += 1 + count_comments(child)
        return count

    total = count_comments(data)
    title = data.get("title", "")[:60]
    print(f"  Loaded '{title}' with {total} comments")
    return data

Bulk Dataset Building

If you want to build a dataset — every story above 100 points from the last year — here's a production-grade approach:

import json
import time
import httpx
from pathlib import Path
from datetime import datetime

ALGOLIA = "https://hn.algolia.com/api/v1"
OUTPUT = Path("hn_dataset")
OUTPUT.mkdir(exist_ok=True)

def scrape_top_stories_dataset(
    min_points: int = 100,
    days_back: int = 365,
    max_pages: int = 200,
) -> list[dict]:
    cutoff = int(time.time()) - days_back * 86400
    dataset = []
    seen_ids = set()
    page = 0

    while page < max_pages:
        try:
            resp = httpx.get(
                f"{ALGOLIA}/search_by_date",
                params={
                    "tags": "story",
                    "numericFilters": f"points>{min_points},created_at_i>{cutoff}",
                    "hitsPerPage": 50,
                    "page": page,
                },
                timeout=15,
            )
            resp.raise_for_status()
            data = resp.json()
        except httpx.HTTPError as e:
            print(f"  HTTP error on page {page}: {e}")
            time.sleep(10)
            continue

        hits = data.get("hits", [])
        if not hits:
            break

        for hit in hits:
            hit_id = hit.get("objectID")
            if hit_id in seen_ids:
                continue
            seen_ids.add(hit_id)
            dataset.append({
                "id": hit_id,
                "title": hit.get("title"),
                "url": hit.get("url"),
                "points": hit.get("points", 0),
                "comments": hit.get("num_comments", 0),
                "author": hit.get("author"),
                "created_at": hit.get("created_at"),
                "tags": hit.get("_tags", []),
                "story_text": hit.get("story_text"),
            })

        page += 1
        print(f"  Page {page}: {len(dataset)} stories collected")
        time.sleep(1.0)

        if page % 10 == 0:
            checkpoint = OUTPUT / f"checkpoint_page_{page}.json"
            with open(checkpoint, "w") as f:
                json.dump(dataset, f, indent=2)

    return dataset

Ask HN and Show HN Mining

Ask HN and Show HN threads are particularly valuable — they contain curated expert opinion and product launches:

def get_ask_hn_threads(query: str = "", min_points: int = 50) -> list[dict]:
    params = {"tags": "ask_hn", "hitsPerPage": 50}
    if query:
        params["query"] = query
    if min_points:
        params["numericFilters"] = f"points>{min_points}"

    resp = httpx.get(f"{ALGOLIA}/search_by_date", params=params)
    resp.raise_for_status()
    return resp.json()["hits"]

def get_show_hn_launches(days_back: int = 30) -> list[dict]:
    cutoff = int(time.time()) - days_back * 86400
    resp = httpx.get(
        f"{ALGOLIA}/search_by_date",
        params={"tags": "show_hn", "numericFilters": f"created_at_i>{cutoff}", "hitsPerPage": 50},
    )
    resp.raise_for_status()
    return resp.json()["hits"]

ask_threads = get_ask_hn_threads("what are you building", min_points=100)
for t in ask_threads[:5]:
    print(f"  {t.get('points', 0):>4} pts | {t['title'][:65]}")

launches = get_show_hn_launches(days_back=7)
print(f"{len(launches)} Show HN launches this week")

Who Is Hiring? Mining Job Threads

HN's monthly 'Who is hiring?' megathreads are one of the best sources of tech job data. Each thread has thousands of top-level comments, each being a job posting:

import re

def find_hiring_threads(year: int = 2026) -> list[dict]:
    resp = httpx.get(
        f"{ALGOLIA}/search",
        params={"query": "Ask HN: Who is hiring?", "tags": "story", "hitsPerPage": 20},
    )
    resp.raise_for_status()
    hits = resp.json()["hits"]
    return [h for h in hits if str(year) in h.get("title", "")]

def scrape_hiring_thread(story_id: int) -> list[dict]:
    story = get_story_full_algolia(story_id)
    jobs = []
    for comment in story.get("children", []):
        text = comment.get("text", "")
        if not text or comment.get("deleted"):
            continue

        job = {
            "comment_id": comment.get("id"),
            "author": comment.get("author"),
            "text": text,
            "created_at": comment.get("created_at"),
        }

        # Extract salary range
        salary_match = re.search(r'\$\d{2,3}[Kk]?\s*[-]\s*\$?\d{2,3}[Kk]', text)
        if salary_match:
            nums = re.findall(r'\d{2,3}', salary_match.group())
            if len(nums) >= 2:
                low, high = int(nums[0]) * 1000, int(nums[1]) * 1000
                job["salary_low"] = low
                job["salary_high"] = high

        job["remote"] = bool(re.search(r'\bremote\b', text, re.IGNORECASE))
        job["visa_sponsorship"] = bool(re.search(r'visa', text, re.IGNORECASE))
        jobs.append(job)
    return jobs

User Profile Analysis

User data from Firebase reveals engagement patterns and karma history:

def get_user_profile(username: str) -> dict:
    # Firebase gives karma, created date, about, submission IDs
    fb_resp = httpx.get(f"{HN_API}/user/{username}.json", timeout=10)
    fb_resp.raise_for_status()
    user = fb_resp.json()
    if not user:
        return {}

    # Algolia gives submission history with engagement metrics
    algo_resp = httpx.get(f"{ALGOLIA}/users/{username}", timeout=10)
    algo_data = algo_resp.json() if algo_resp.status_code == 200 else {}

    return {
        "id": user["id"],
        "karma": user["karma"],
        "created": user["created"],
        "about": user.get("about", ""),
        "submitted_count": len(user.get("submitted", [])),
        "avg_story_score": algo_data.get("avg", 0),
    }

def get_user_best_posts(username: str, limit: int = 10) -> list[dict]:
    resp = httpx.get(
        f"{ALGOLIA}/search",
        params={"tags": f"story,author_{username}", "hitsPerPage": limit},
        timeout=10,
    )
    resp.raise_for_status()
    return [
        {
            "id": h["objectID"],
            "title": h.get("title"),
            "points": h.get("points", 0),
            "comments": h.get("num_comments", 0),
            "url": h.get("url"),
        }
        for h in resp.json()["hits"]
    ]

Trend Analysis

Building a trend detector over HN data reveals what topics the developer community cares about:

def analyze_topic_trends(queries: list[str], days_back: int = 30, min_points: int = 50) -> dict:
    cutoff = int(time.time()) - days_back * 86400
    results = {}
    for query in queries:
        resp = httpx.get(
            f"{ALGOLIA}/search_by_date",
            params={
                "query": query,
                "tags": "story",
                "numericFilters": f"points>{min_points},created_at_i>{cutoff}",
                "hitsPerPage": 50,
            },
            timeout=10,
        )
        resp.raise_for_status()
        hits = resp.json()["hits"]
        results[query] = {
            "count": len(hits),
            "avg_points": sum(h.get("points", 0) for h in hits) / len(hits) if hits else 0,
            "avg_comments": sum(h.get("num_comments", 0) for h in hits) / len(hits) if hits else 0,
        }
        time.sleep(0.5)
    return results

topics = ["rust", "python", "golang", "llm", "ai agent", "webassembly"]
trends = analyze_topic_trends(topics, days_back=30)
print(f"{'Topic':<15} {'Posts':>5} {'Avg pts':>8}")
for topic, data in sorted(trends.items(), key=lambda x: x[1]['count'], reverse=True):
    print(f"{topic:<15} {data['count']:>5} {data['avg_points']:>8.0f}")

Using Proxies for High-Volume Scraping

While HN's APIs are generous, large-scale operations — scanning all items since a given ID, bulk comment extraction for ML training data, running many parallel trend queries — benefit from proxy rotation.

The Firebase API is rate-limited per IP. Heavy concurrent use without rotating IPs hits 429s after a few thousand requests. ThorData's residential proxies distribute requests across real residential IPs, preventing rate limiting:

import httpx
import asyncio

# ThorData residential proxy — https://thordata.partnerstack.com/partner/0a0x4nzb (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
PROXY_URL = "http://USER:[email protected]:9000"

async def fetch_items_bulk_proxied(
    item_ids: list[int],
    proxy_url: str,
) -> list[dict]:
    transport = httpx.AsyncHTTPTransport(proxy=proxy_url)
    async with httpx.AsyncClient(transport=transport) as client:
        sem = asyncio.Semaphore(30)

        async def fetch_one(iid):
            async with sem:
                try:
                    r = await client.get(
                        f"https://hacker-news.firebaseio.com/v0/item/{iid}.json",
                        timeout=15,
                    )
                    return r.json()
                except Exception:
                    return None

        results = await asyncio.gather(*[fetch_one(i) for i in item_ids])
        return [r for r in results if r]

Error Handling and Retry Logic

The Firebase API occasionally returns null items. A production scraper needs robust handling:

import asyncio
import httpx
from typing import Optional

HN_API = "https://hacker-news.firebaseio.com/v0"

async def fetch_item_with_retry(
    client: httpx.AsyncClient,
    item_id: int,
    max_retries: int = 3,
    backoff_base: float = 2.0,
) -> Optional[dict]:
    for attempt in range(max_retries):
        try:
            resp = await client.get(
                f"{HN_API}/item/{item_id}.json",
                timeout=10.0,
            )

            if resp.status_code == 429:
                wait = backoff_base ** attempt
                print(f"  Rate limited on item {item_id}, waiting {wait:.1f}s")
                await asyncio.sleep(wait)
                continue

            if resp.status_code == 404:
                return None

            resp.raise_for_status()
            data = resp.json()
            return data  # may be None for deleted items

        except httpx.TimeoutException:
            if attempt < max_retries - 1:
                await asyncio.sleep(backoff_base ** attempt)
            else:
                print(f"  Timeout on item {item_id} after {max_retries} attempts")
                return None

        except httpx.HTTPStatusError as e:
            if e.response.status_code >= 500:
                await asyncio.sleep(backoff_base ** attempt)
            else:
                return None
    return None

Historical Archive Scanning

HN's maxitem endpoint lets you scan the entire historical record:

async def scan_items_from(
    start_id: int,
    end_id: int | None = None,
    filter_type: str = "story",
    min_score: int = 0,
    batch_size: int = 500,
) -> list[dict]:
    if end_id is None:
        resp = httpx.get(f"{HN_API}/maxitem.json")
        end_id = resp.json()

    print(f"Scanning items {start_id} to {end_id} ({end_id - start_id:,} total)")
    all_items = []

    for batch_start in range(start_id, end_id, batch_size):
        batch_end = min(batch_start + batch_size, end_id)
        batch_ids = list(range(batch_start, batch_end))
        items = await fetch_items_bulk_proxied(batch_ids, PROXY_URL)
        matching = [
            i for i in items
            if i and i.get("type") == filter_type and i.get("score", 0) >= min_score
        ]
        all_items.extend(matching)
        progress = (batch_start - start_id) / (end_id - start_id) * 100
        print(f"  {progress:.1f}% — {len(all_items)} matching items")

    return all_items

Common Pitfalls

Firebase returns null for deleted items. Always check for None before accessing fields. About 3-5% of items in any large batch will be null.

Algolia has a 10,000 hit limit. You cannot paginate past page 200 at 50 hits/page. Use created_at_i numeric filters to window your queries across time ranges.

Comment trees can be deep. Some HN threads go 20+ levels deep. Set a max_depth parameter or you will burn through API calls on deeply nested threads.

The text field contains HTML. Comments come back with <p> tags, <a> links, and <code> blocks. Use BeautifulSoup to parse:

from bs4 import BeautifulSoup

def clean_comment_text(html_text: str) -> str:
    if not html_text:
        return ""
    soup = BeautifulSoup(html_text, "html.parser")
    for p in soup.find_all("p"):
        p.replace_with("\n\n" + p.get_text())
    return soup.get_text().strip()

Algolia timestamps are Unix epoch. The created_at_i field is seconds since epoch. Convert with datetime.fromtimestamp(ts).

The url field is absent for Ask HN posts. Ask HN, Show HN, and polls do not have external URLs — only text. Always check before accessing.

Scores change over time. HN applies time decay to story scores. A story score when you fetch it may differ from when it was first submitted.

Wrapping Up

Hacker News is a scraper's dream — two well-maintained APIs, no authentication, and clean JSON responses. The Firebase API gives you real-time access to individual items, while Algolia gives you full-text search, filtering, and entire comment trees in single requests.

Start with Algolia for filtered queries and dataset building. Use Firebase when you need specific items or real-time data. Keep your concurrency at 20 or fewer simultaneous requests to stay within rate limits, and use a proxy pool for bulk historical scanning. ThorData provides residential IP infrastructure needed when scaling beyond a few thousand requests. The data is rich, the APIs are free, and the signal-to-noise ratio is among the best of any public dataset.

Building a News Intelligence System

The most powerful application of HN scraping is building a continuously-updated news intelligence system — a private dashboard showing what matters to the technical community right now.

Here is a production-ready implementation that runs on a schedule, deduplicates across runs, and surfaces trends:

#!/usr/bin/env python3
"""
HN news intelligence system.
Runs periodically to track trending topics, emerging technologies,
and high-signal discussions in the developer community.
"""

import asyncio
import json
import httpx
import time
from pathlib import Path
from datetime import datetime, timedelta
from collections import Counter, defaultdict
import re

HN_API = "https://hacker-news.firebaseio.com/v0"
ALGOLIA = "https://hn.algolia.com/api/v1"
DATA_DIR = Path("hn_intelligence")
DATA_DIR.mkdir(exist_ok=True)
SEM = asyncio.Semaphore(20)


async def fetch_item(client: httpx.AsyncClient, item_id: int) -> dict | None:
    async with SEM:
        try:
            r = await client.get(f"{HN_API}/item/{item_id}.json", timeout=10)
            r.raise_for_status()
            return r.json()
        except Exception:
            return None


def get_trending_stories(hours_back: int = 24, min_points: int = 100) -> list[dict]:
    cutoff = int(time.time()) - hours_back * 3600
    all_stories = []
    page = 0
    while page < 10:
        resp = httpx.get(
            f"{ALGOLIA}/search_by_date",
            params={
                "tags": "story",
                "numericFilters": f"points>{min_points},created_at_i>{cutoff}",
                "hitsPerPage": 50,
                "page": page,
            },
            timeout=15,
        )
        resp.raise_for_status()
        hits = resp.json()["hits"]
        if not hits:
            break
        all_stories.extend(hits)
        page += 1
        time.sleep(0.3)
    return all_stories


def extract_topics(stories: list[dict]) -> dict:
    """Extract topic signals from story titles and text."""
    tech_patterns = {
        "ai_ml": r"\b(llm|gpt|claude|gemini|ai|machine learning|neural|transformer|rag|embedding)\b",
        "web_dev": r"\b(react|vue|svelte|nextjs|tailwind|typescript|javascript|frontend)\b",
        "systems": r"\b(rust|c\+\+|golang|zig|kernel|os|low.level|memory|performance)\b",
        "cloud": r"\b(aws|gcp|azure|kubernetes|docker|serverless|terraform|cloud)\b",
        "security": r"\b(cve|vulnerability|exploit|zero.day|hack|breach|ransomware|security)\b",
        "startups": r"\b(yc|ycombinator|series.a|funding|startup|launch|saas|indie)\b",
        "databases": r"\b(postgres|mysql|sqlite|mongodb|redis|database|sql|nosql)\b",
        "open_source": r"\b(open source|github|gitlab|fork|contributor|pull request)\b",
    }

    topic_counts = defaultdict(int)
    topic_stories = defaultdict(list)

    for story in stories:
        text = (story.get("title", "") + " " + (story.get("story_text") or "")).lower()
        for topic, pattern in tech_patterns.items():
            if re.search(pattern, text, re.IGNORECASE):
                topic_counts[topic] += 1
                topic_stories[topic].append({
                    "title": story.get("title"),
                    "points": story.get("points", 0),
                    "url": story.get("url"),
                    "objectID": story.get("objectID"),
                })

    return {
        "counts": dict(topic_counts),
        "top_stories_by_topic": {
            topic: sorted(stories_list, key=lambda x: x["points"], reverse=True)[:3]
            for topic, stories_list in topic_stories.items()
        },
    }


async def get_top_comment_excerpts(story_ids: list[int]) -> list[dict]:
    """Fetch top comments from high-signal stories."""
    excerpts = []
    async with httpx.AsyncClient() as client:
        for story_id in story_ids[:10]:  # limit to avoid too many requests
            resp = httpx.get(f"{ALGOLIA}/items/{story_id}", timeout=20)
            if resp.status_code != 200:
                continue
            data = resp.json()
            top_comments = sorted(
                [c for c in data.get("children", []) if c and c.get("text") and not c.get("deleted")],
                key=lambda c: len(c.get("children", [])),
                reverse=True,
            )[:3]
            for comment in top_comments:
                from bs4 import BeautifulSoup
                text = BeautifulSoup(comment.get("text", ""), "html.parser").get_text()
                excerpts.append({
                    "story_id": story_id,
                    "story_title": data.get("title"),
                    "comment_id": comment.get("id"),
                    "author": comment.get("author"),
                    "text_preview": text[:300],
                    "reply_count": len(comment.get("children", [])),
                })
            time.sleep(0.5)
    return excerpts


def generate_daily_digest(stories: list[dict], topics: dict) -> str:
    """Generate a markdown digest of the day's HN highlights."""
    lines = []
    lines.append(f"# HN Daily Digest — {datetime.now().strftime('%B %d, %Y')}")
    lines.append("")
    lines.append(f"Total qualifying stories: {len(stories)}")
    lines.append("")
    lines.append("## Top Stories")
    lines.append("")
    top_stories = sorted(stories, key=lambda s: s.get("points", 0), reverse=True)[:20]
    for i, story in enumerate(top_stories, 1):
        pts = story.get("points", 0)
        cmts = story.get("num_comments", 0)
        title = story.get("title", "N/A")
        url = story.get("url", f"https://news.ycombinator.com/item?id={story.get('objectID')}")
        lines.append(f"{i}. **{title}**")
        lines.append(f"   - {pts} points | {cmts} comments | [Link]({url})")
        lines.append("")

    lines.append("## Topic Trends")
    lines.append("")
    for topic, count in sorted(topics["counts"].items(), key=lambda x: x[1], reverse=True):
        lines.append(f"**{topic.replace('_', ' ').title()}**: {count} stories")
        top = topics["top_stories_by_topic"].get(topic, [])[:2]
        for s in top:
            lines.append(f"  - {s['title']} ({s['points']} pts)")
        lines.append("")

    return "\n".join(lines)


async def run_daily_collection():
    print("Starting HN intelligence collection...")

    # Fetch trending stories
    stories_24h = get_trending_stories(hours_back=24, min_points=100)
    print(f"  Found {len(stories_24h)} stories in the past 24h with 100+ points")

    # Extract topics
    topics = extract_topics(stories_24h)
    print(f"  Topic analysis: {topics['counts']}")

    # Fetch comment highlights from top 10 stories
    top_ids = [int(s["objectID"]) for s in sorted(
        stories_24h, key=lambda x: x.get("points", 0), reverse=True
    )[:10]]

    comment_highlights = await get_top_comment_excerpts(top_ids)
    print(f"  Extracted {len(comment_highlights)} comment highlights")

    # Generate digest
    digest = generate_daily_digest(stories_24h, topics)

    # Save outputs
    timestamp = datetime.now().strftime("%Y%m%d")
    stories_file = DATA_DIR / f"stories_{timestamp}.json"
    stories_file.write_text(json.dumps(stories_24h, indent=2))

    topics_file = DATA_DIR / f"topics_{timestamp}.json"
    topics_file.write_text(json.dumps(topics, indent=2))

    digest_file = DATA_DIR / f"digest_{timestamp}.md"
    digest_file.write_text(digest)

    print(f"\nCollection complete!")
    print(f"  Stories: {stories_file}")
    print(f"  Topics: {topics_file}")
    print(f"  Digest: {digest_file}")


if __name__ == "__main__":
    asyncio.run(run_daily_collection())

Monitoring Show HN for Product Launches

Show HN threads are the closest thing to a curated product launch feed for the technical community. Every major technical product launch attempts a Show HN. Here is a specialized collector:

import httpx
import json
import time
from pathlib import Path
from datetime import datetime, timedelta

ALGOLIA = "https://hn.algolia.com/api/v1"


def collect_show_hn_launches(
    days_back: int = 7,
    min_points: int = 20,
    output_dir: str = "show_hn_launches",
) -> list[dict]:
    out = Path(output_dir)
    out.mkdir(exist_ok=True)
    cutoff = int(time.time()) - days_back * 86400
    launches = []
    page = 0

    while True:
        resp = httpx.get(
            f"{ALGOLIA}/search_by_date",
            params={
                "tags": "show_hn",
                "numericFilters": f"points>{min_points},created_at_i>{cutoff}",
                "hitsPerPage": 50,
                "page": page,
            },
            timeout=15,
        )
        resp.raise_for_status()
        hits = resp.json()["hits"]
        if not hits:
            break
        for hit in hits:
            launches.append({
                "id": hit.get("objectID"),
                "title": hit.get("title"),
                "url": hit.get("url"),
                "points": hit.get("points", 0),
                "comments": hit.get("num_comments", 0),
                "author": hit.get("author"),
                "created_at": hit.get("created_at"),
                "hn_url": f"https://news.ycombinator.com/item?id={hit.get('objectID')}",
            })
        page += 1
        time.sleep(0.5)

    # Sort by engagement (points + comments)
    launches.sort(key=lambda x: x["points"] + x["comments"], reverse=True)

    timestamp = datetime.now().strftime("%Y%m%d")
    out_file = out / f"launches_{timestamp}.json"
    out_file.write_text(json.dumps(launches, indent=2))

    print(f"Collected {len(launches)} Show HN launches in past {days_back} days")
    print(f"  Saved to: {out_file}")
    print()
    print("Top launches:")
    for l in launches[:10]:
        print(f"  {l['points']:>4} pts | {l['title'][:65]}")

    return launches

Scraping HN Jobs Board

The HN Jobs board (news.ycombinator.com/jobs) is a curated list of YC-backed and high-quality tech jobs. It uses a different mechanism than regular HN stories:

import httpx
from bs4 import BeautifulSoup
import json
import time
from pathlib import Path
from datetime import datetime

def scrape_hn_jobs_board(max_pages: int = 3) -> list[dict]:
    """Scrape the HN jobs board at news.ycombinator.com/jobs."""
    base_url = "https://news.ycombinator.com/jobs"
    jobs = []

    for page_num in range(max_pages):
        params = {}
        if page_num > 0:
            params["next"] = jobs[-1].get("_next_token") if jobs else None
            if not params["next"]:
                break

        headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
        }
        resp = httpx.get(base_url, params=params, headers=headers, timeout=15)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")

        # HN jobs board uses tr.athing rows
        job_rows = soup.select("tr.athing")
        for row in job_rows:
            item_id = row.get("id")
            title_link = row.select_one(".titleline a")
            site_el = row.select_one(".sitestr")
            age_el = row.select_one(".age a")

            if title_link:
                jobs.append({
                    "id": item_id,
                    "title": title_link.get_text(strip=True),
                    "url": title_link.get("href"),
                    "site": site_el.get_text(strip=True) if site_el else None,
                    "age": age_el.get_text(strip=True) if age_el else None,
                    "hn_url": f"https://news.ycombinator.com/item?id={item_id}",
                })

        # Find next page token
        next_link = soup.select_one("a.morelink")
        if next_link:
            next_href = next_link.get("href", "")
            next_token = next_href.split("next=")[-1] if "next=" in next_href else None
            if jobs:
                jobs[-1]["_next_token"] = next_token
        else:
            break

        time.sleep(2)

    # Clean internal tokens
    for job in jobs:
        job.pop("_next_token", None)

    print(f"Scraped {len(jobs)} jobs from HN Jobs board")
    return jobs

Competitor and Technology Monitoring

Track mentions of specific companies or technologies over time to spot trends before they appear in mainstream tech news:

import httpx
import json
import time
from pathlib import Path
from datetime import datetime

ALGOLIA = "https://hn.algolia.com/api/v1"
MONITOR_DIR = Path("hn_monitoring")
MONITOR_DIR.mkdir(exist_ok=True)


def get_mentions(
    query: str,
    days_back: int = 30,
    min_points: int = 0,
) -> list[dict]:
    cutoff = int(time.time()) - days_back * 86400
    params = {
        "query": query,
        "tags": "story",
        "numericFilters": f"created_at_i>{cutoff}",
        "hitsPerPage": 50,
    }
    if min_points > 0:
        params["numericFilters"] += f",points>{min_points}"

    resp = httpx.get(f"{ALGOLIA}/search_by_date", params=params, timeout=15)
    resp.raise_for_status()
    return resp.json()["hits"]


def compare_technologies(
    tech_list: list[str],
    days_back: int = 90,
) -> dict:
    results = {}
    for tech in tech_list:
        mentions = get_mentions(tech, days_back=days_back)
        results[tech] = {
            "mention_count": len(mentions),
            "avg_points": sum(m.get("points", 0) for m in mentions) / len(mentions) if mentions else 0,
            "avg_comments": sum(m.get("num_comments", 0) for m in mentions) / len(mentions) if mentions else 0,
            "recent_stories": [
                {"title": m.get("title"), "points": m.get("points", 0)}
                for m in sorted(mentions, key=lambda x: x.get("points", 0), reverse=True)[:3]
            ],
        }
        time.sleep(0.5)

    # Rank by mention count
    ranked = sorted(results.items(), key=lambda x: x[1]["mention_count"], reverse=True)

    print(f"\nTechnology mentions on HN (past {days_back} days):")
    print(f"{'Technology':<20} {'Mentions':>8} {'Avg pts':>8} {'Avg cmts':>10}")
    print("-" * 50)
    for tech, data in ranked:
        print(f"{tech:<20} {data['mention_count']:>8} {data['avg_points']:>8.0f} {data['avg_comments']:>10.0f}")

    return dict(ranked)


# Example: compare competing frameworks
compare_technologies(
    ["react", "vue", "svelte", "htmx", "datastar"],
    days_back=90,
)

Exporting for Analysis and Visualization

HN data is well-suited for analysis in pandas, visualization in matplotlib, or feeding into LLMs for summarization:

import json
from pathlib import Path
from datetime import datetime

def export_for_analysis(
    stories: list[dict],
    output_path: str = "hn_analysis_export.json",
) -> None:
    """Export stories in a format ready for data analysis."""
    export = []
    for s in stories:
        # Parse timestamp
        created_ts = None
        if s.get("created_at"):
            try:
                dt = datetime.fromisoformat(s["created_at"].replace("Z", "+00:00"))
                created_ts = dt.isoformat()
            except Exception:
                pass

        export.append({
            "id": s.get("objectID"),
            "title": s.get("title"),
            "url": s.get("url"),
            "domain": s.get("url", "").split("/")[2] if s.get("url") and len(s["url"].split("/")) > 2 else None,
            "author": s.get("author"),
            "points": s.get("points", 0),
            "comments": s.get("num_comments", 0),
            "created_at": created_ts,
            "is_ask_hn": "Ask HN" in (s.get("title") or ""),
            "is_show_hn": "Show HN" in (s.get("title") or ""),
            "tags": s.get("_tags", []),
        })

    Path(output_path).write_text(json.dumps(export, indent=2))
    print(f"Exported {len(export)} stories to {output_path}")

Rate Limit Reference and Best Practices

Summary of rate limits and recommended practices across both APIs:

API Endpoint Rate Limit Recommended Delay
Firebase /item/{id}.json ~1000/min/IP 20 concurrent max
Firebase /topstories.json Generous No delay needed
Algolia /search ~100/min 0.5s between calls
Algolia /search_by_date ~100/min 0.5s between calls
Algolia /items/{id} ~50/min 1s between calls

For sustained scraping beyond these limits, use a proxy pool. ThorData's residential proxies let you safely increase throughput by distributing requests across many IPs:

import httpx
import asyncio

PROXY_URL = "http://USER:[email protected]:9000"

async def create_proxied_client() -> httpx.AsyncClient:
    transport = httpx.AsyncHTTPTransport(proxy=PROXY_URL)
    return httpx.AsyncClient(
        transport=transport,
        timeout=15.0,
        limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
    )

# Use as context manager
async def bulk_fetch_with_proxy(item_ids: list[int]) -> list[dict]:
    transport = httpx.AsyncHTTPTransport(proxy=PROXY_URL)
    async with httpx.AsyncClient(transport=transport) as client:
        sem = asyncio.Semaphore(30)
        async def fetch_one(iid):
            async with sem:
                try:
                    r = await client.get(
                        f"https://hacker-news.firebaseio.com/v0/item/{iid}.json",
                        timeout=15,
                    )
                    return r.json()
                except Exception:
                    return None
        results = await asyncio.gather(*[fetch_one(i) for i in item_ids])
        return [r for r in results if r]

The combination of async I/O, reasonable concurrency (20-30 simultaneous requests), and residential proxy rotation allows sustained collection rates of several thousand items per minute — fast enough to process HN's full historical archive of 40+ million items in a matter of days.

Exporting HN Data for LLM Processing

HN discussions are exceptionally valuable for LLM training, retrieval-augmented generation (RAG), and prompt testing. The combination of technical depth and community curation makes HN one of the highest-quality text corpora available:

import asyncio
import json
import httpx
from pathlib import Path
from datetime import datetime
from bs4 import BeautifulSoup

ALGOLIA = "https://hn.algolia.com/api/v1"
HN_API = "https://hacker-news.firebaseio.com/v0"


def clean_comment_html(html_text: str) -> str:
    if not html_text:
        return ""
    soup = BeautifulSoup(html_text, "html.parser")
    for a in soup.find_all("a"):
        url = a.get("href", "")
        text = a.get_text()
        a.replace_with(f"{text} ({url})" if url else text)
    text = soup.get_text(separator="
")
    lines = [line.strip() for line in text.splitlines()]
    return "
".join(line for line in lines if line)


def flatten_comment_tree(comment: dict, depth: int = 0) -> list[dict]:
    if not comment or comment.get("deleted"):
        return []
    flat = [{
        "id": comment.get("id"),
        "author": comment.get("author"),
        "text": clean_comment_html(comment.get("text", "")),
        "depth": depth,
        "reply_count": len(comment.get("children", [])),
        "created_at": comment.get("created_at"),
    }]
    for child in comment.get("children", []):
        flat.extend(flatten_comment_tree(child, depth + 1))
    return flat


def export_story_for_rag(story_id: int, max_comments: int = 100) -> dict:
    resp = httpx.get(f"{ALGOLIA}/items/{story_id}", timeout=30)
    resp.raise_for_status()
    data = resp.json()

    all_comments = []
    for child in data.get("children", []):
        all_comments.extend(flatten_comment_tree(child, depth=0))

    all_comments = [c for c in all_comments if c.get("text") and len(c["text"]) > 20]
    top_comments = sorted(all_comments, key=lambda c: c["reply_count"], reverse=True)[:max_comments]

    return {
        "id": data.get("id"),
        "title": data.get("title"),
        "url": data.get("url"),
        "author": data.get("author"),
        "points": data.get("points", 0),
        "created_at": data.get("created_at"),
        "hn_url": f"https://news.ycombinator.com/item?id={data.get('id')}",
        "story_text": clean_comment_html(data.get("text", "")),
        "comment_count": len(all_comments),
        "top_comments": top_comments,
        "all_text": data.get("title", "") + "\n\n" + clean_comment_html(data.get("text", "")) + "\n\n" + "\n\n".join(c["text"] for c in top_comments[:50]),
    }


def build_rag_corpus(
    min_points: int = 200,
    days_back: int = 180,
    max_stories: int = 500,
    output_file: str = "hn_rag_corpus.jsonl",
) -> int:
    import time
    cutoff = int(time.time()) - days_back * 86400

    resp = httpx.get(
        f"{ALGOLIA}/search_by_date",
        params={
            "tags": "story",
            "numericFilters": f"points>{min_points},created_at_i>{cutoff}",
            "hitsPerPage": 50,
        },
        timeout=15,
    )
    story_ids = [int(h["objectID"]) for h in resp.json()["hits"][:max_stories]]
    print(f"Exporting {len(story_ids)} stories for RAG corpus")

    exported = 0
    with open(output_file, "w", encoding="utf-8") as f:
        for i, sid in enumerate(story_ids):
            try:
                story = export_story_for_rag(sid)
                if story.get("all_text"):
                    f.write(json.dumps(story, ensure_ascii=False) + "\n")
                    exported += 1
            except Exception as e:
                print(f"  Failed {sid}: {e}")
            if (i + 1) % 20 == 0:
                print(f"  Exported {exported}/{i+1}")
            time.sleep(0.5)

    print(f"RAG corpus saved to {output_file}: {exported} stories")
    return exported

Tracking which domains get submitted to HN and how they perform reveals domain reputation — useful for SEO, content strategy, and competitive intelligence:

import time
import httpx
from collections import Counter
from urllib.parse import urlparse

ALGOLIA = "https://hn.algolia.com/api/v1"


def track_domain_performance(
    domain: str,
    days_back: int = 365,
) -> dict:
    cutoff = int(time.time()) - days_back * 86400
    all_stories = []
    page = 0

    while page < 20:
        resp = httpx.get(
            f"{ALGOLIA}/search",
            params={
                "query": domain,
                "tags": "story",
                "numericFilters": f"created_at_i>{cutoff}",
                "hitsPerPage": 50,
                "page": page,
            },
            timeout=15,
        )
        resp.raise_for_status()
        hits = resp.json()["hits"]
        if not hits:
            break
        for hit in hits:
            if domain in (hit.get("url") or ""):
                all_stories.append(hit)
        page += 1
        time.sleep(0.3)

    if not all_stories:
        return {"domain": domain, "submissions": 0}

    points = [s.get("points", 0) for s in all_stories]
    comments = [s.get("num_comments", 0) for s in all_stories]

    import statistics
    return {
        "domain": domain,
        "submissions": len(all_stories),
        "avg_points": statistics.mean(points) if points else 0,
        "median_points": statistics.median(points) if points else 0,
        "max_points": max(points) if points else 0,
        "avg_comments": statistics.mean(comments) if comments else 0,
        "top_story": max(all_stories, key=lambda s: s.get("points", 0), default={}).get("title"),
        "total_points": sum(points),
    }

Summary and Quick Reference

Here is a quick reference for HN scraping in 2026:

Task API Endpoint Auth
Top story IDs Firebase /topstories.json None
Story detail Firebase /item/{id}.json None
Search stories Algolia /search?query=X None
Date-filtered stories Algolia /search_by_date None
Full story + comments Algolia /items/{id} None
User profile Firebase /user/{username}.json None
Historical scan Firebase /maxitem.json + item loop None

All endpoints are free, unauthenticated, and well-maintained. ThorData provides residential proxy rotation for bulk operations where IP-level rate limiting becomes a concern.