← Back to blog

How to Scrape Reddit for Subreddit Analytics in 2026: Posts, Comments & Trends

How to Scrape Reddit for Subreddit Analytics in 2026: Posts, Comments & Trends

Reddit is one of the few platforms that still exposes structured data through a semi-public API — if you know where to look. Subreddit analytics are useful for trend detection, content research, community health monitoring, and market research. This guide covers how to extract that data reliably in 2026, from the basic .json trick to historical data archives and proxy strategy.

What Subreddit Analytics Data Is Available

From Reddit's public endpoints you can extract:

For historical data beyond Reddit's own 1000-post limit, PullPush.io and Arctic Shift are the main options in 2026 after Pushshift went dark.

Dependencies and Setup

pip install httpx requests praw  # praw = Python Reddit API Wrapper

Reddit's official PRAW library handles OAuth for you. But the .json trick works without any auth setup.

Reddit's Public JSON API: The .json Trick

Append .json to almost any Reddit URL and you get machine-readable data instead of HTML:

https://www.reddit.com/r/datascience/.json
https://www.reddit.com/r/datascience/top/.json?t=week
https://www.reddit.com/r/datascience/comments/abc123/some_post/.json

Reddit's rate limit for unauthenticated requests is 60 requests per minute per IP. With OAuth (free app registration), it bumps to 100 per minute.

Subreddit Metadata via about.json

Every subreddit exposes its full metadata at about.json:

import httpx
import time
import random

HEADERS = {
    "User-Agent": "SubredditAnalytics/1.0 (research project; [email protected])",
    "Accept": "application/json",
}


def fetch_subreddit_metadata(subreddit: str, proxy: str = None) -> dict:
    """Fetch subreddit metadata from the about.json endpoint."""
    url = f"https://www.reddit.com/r/{subreddit}/about.json"

    client_kwargs = {"headers": HEADERS, "follow_redirects": True, "timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url)

    if resp.status_code == 429:
        raise RuntimeError("Rate limited — back off and retry")
    if resp.status_code == 404:
        raise ValueError(f"r/{subreddit} does not exist or is private")
    if resp.status_code != 200:
        raise RuntimeError(f"HTTP {resp.status_code} for r/{subreddit}")

    data = resp.json()["data"]

    return {
        "name": data["display_name"],
        "title": data["title"],
        "description": data.get("public_description", ""),
        "full_description": data.get("description", ""),
        "subscribers": data["subscribers"],
        "active_users": data["accounts_active"],
        "created_utc": data["created_utc"],
        "over18": data["over18"],
        "submission_type": data["submission_type"],  # link, self, any
        "lang": data.get("lang", "en"),
        "community_icon": data.get("community_icon", ""),
        "header_img": data.get("header_img", ""),
        "allowed_media_in_comments": data.get("allowed_media_in_comments", []),
        "spoilers_enabled": data.get("spoilers_enabled", False),
        "is_crosspostable_subreddit": data.get("is_crosspostable_subreddit"),
        "suggested_comment_sort": data.get("suggested_comment_sort"),
    }

accounts_active is the "currently browsing" count Reddit shows in the sidebar — useful as a real-time activity signal.

Paginating Posts

Reddit's listing endpoints paginate using the after parameter with fullnames (t3_postid). Reddit caps pagination at approximately 1000 posts per listing type.

def fetch_subreddit_posts(
    subreddit: str,
    sort: str = "hot",
    time_filter: str = "week",
    limit: int = 100,
    max_posts: int = 500,
    proxy: str = None,
) -> list:
    """
    Paginate through a subreddit's post feed.
    sort: hot | new | top | rising
    time_filter: hour | day | week | month | year | all (only applies to 'top')
    """
    posts = []
    after = None
    base_url = f"https://www.reddit.com/r/{subreddit}/{sort}.json"

    client_kwargs = {"headers": HEADERS, "follow_redirects": True, "timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        while len(posts) < max_posts:
            params = {"limit": min(limit, 100), "raw_json": 1}
            if after:
                params["after"] = after
            if sort == "top":
                params["t"] = time_filter

            resp = client.get(base_url, params=params)

            if resp.status_code == 429:
                print("Rate limited, waiting 60s...")
                time.sleep(60)
                continue
            if resp.status_code != 200:
                print(f"Error: HTTP {resp.status_code}")
                break

            envelope = resp.json()["data"]
            children = envelope.get("children", [])

            if not children:
                break

            for child in children:
                d = child["data"]
                posts.append({
                    "id": d["id"],
                    "fullname": d["name"],
                    "title": d["title"],
                    "author": d.get("author"),
                    "score": d["score"],
                    "upvote_ratio": d.get("upvote_ratio"),
                    "num_comments": d["num_comments"],
                    "url": d.get("url"),
                    "permalink": d["permalink"],
                    "created_utc": d["created_utc"],
                    "is_self": d["is_self"],
                    "selftext": (d.get("selftext") or "")[:500] if d["is_self"] else None,
                    "flair": d.get("link_flair_text"),
                    "domain": d.get("domain"),
                    "is_video": d.get("is_video", False),
                    "spoiler": d.get("spoiler", False),
                    "gilded": d.get("gilded", 0),
                    "awards": len(d.get("all_awardings", [])),
                })

            after = envelope.get("after")
            if not after:
                break

            time.sleep(1.1)  # Stay under 60 req/min

    return posts

Extracting Comment Threads

Comments are nested under any post at {permalink}.json. Reddit returns a two-element array — the first element is the post, the second is the comment tree.

def fetch_comment_thread(permalink: str, proxy: str = None) -> list:
    """Flatten a Reddit comment tree into a list of comment dicts."""
    url = f"https://www.reddit.com{permalink}.json"

    client_kwargs = {"headers": HEADERS, "follow_redirects": True, "timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params={"raw_json": 1, "limit": 500})

    if resp.status_code != 200:
        return []

    _, comment_listing = resp.json()
    return _flatten_comments(comment_listing["data"]["children"])


def _flatten_comments(children: list, depth: int = 0) -> list:
    """Recursively flatten nested comment structure."""
    comments = []
    for child in children:
        if child["kind"] == "more":
            continue  # Skip "load more" placeholders
        d = child["data"]
        comments.append({
            "id": d["id"],
            "author": d.get("author"),
            "body": d.get("body", ""),
            "score": d.get("score", 0),
            "created_utc": d.get("created_utc"),
            "depth": depth,
            "parent_id": d.get("parent_id"),
            "edited": d.get("edited", False),
            "is_submitter": d.get("is_submitter", False),
            "gilded": d.get("gilded", 0),
            "distinguished": d.get("distinguished"),
        })
        replies = d.get("replies")
        if replies and isinstance(replies, dict):
            nested = replies["data"]["children"]
            comments.extend(_flatten_comments(nested, depth + 1))
    return comments


def fetch_top_comment_text(post_id: str, permalink: str, proxy: str = None) -> str:
    """Get the top-level comment bodies as a single string for text analysis."""
    comments = fetch_comment_thread(permalink, proxy=proxy)
    top_level = [c for c in comments if c["depth"] == 0 and c["body"] != "[deleted]"]
    top_level_sorted = sorted(top_level, key=lambda c: c["score"], reverse=True)
    return "\n\n".join(c["body"] for c in top_level_sorted[:20])

Using PRAW for OAuth Access

For higher rate limits (100 req/min), use PRAW with an official Reddit app:

import praw
import time

def create_reddit_client(
    client_id: str,
    client_secret: str,
    user_agent: str = "SubredditAnalyzer/1.0",
) -> praw.Reddit:
    """Create an authenticated Reddit client with PRAW."""
    return praw.Reddit(
        client_id=client_id,
        client_secret=client_secret,
        user_agent=user_agent,
        # For read-only access, no username/password needed
    )


def fetch_posts_praw(reddit: praw.Reddit, subreddit: str, sort: str = "top", limit: int = 100, time_filter: str = "month") -> list:
    """Fetch posts via PRAW for clean structured data."""
    sub = reddit.subreddit(subreddit)

    if sort == "top":
        submissions = sub.top(time_filter=time_filter, limit=limit)
    elif sort == "hot":
        submissions = sub.hot(limit=limit)
    elif sort == "new":
        submissions = sub.new(limit=limit)
    elif sort == "rising":
        submissions = sub.rising(limit=limit)
    else:
        submissions = sub.hot(limit=limit)

    posts = []
    for s in submissions:
        posts.append({
            "id": s.id,
            "title": s.title,
            "author": str(s.author) if s.author else "[deleted]",
            "score": s.score,
            "upvote_ratio": s.upvote_ratio,
            "num_comments": s.num_comments,
            "url": s.url,
            "permalink": s.permalink,
            "created_utc": s.created_utc,
            "is_self": s.is_self,
            "selftext": s.selftext[:500] if s.is_self else None,
            "flair": s.link_flair_text,
            "is_video": s.is_video,
        })

    return posts

Historical Data: PullPush and Alternatives

Reddit's own API only returns the most recent ~1000 posts per listing. For historical analysis you need third-party archives:

def fetch_pullpush_posts(
    subreddit: str,
    after_utc: int,
    before_utc: int,
    size: int = 100,
    proxy: str = None,
) -> list:
    """Query PullPush for historical subreddit posts in a time range."""
    url = "https://api.pullpush.io/reddit/search/submission/"
    params = {
        "subreddit": subreddit,
        "after": after_utc,
        "before": before_utc,
        "size": size,
        "sort": "asc",
        "sort_type": "created_utc",
    }

    client_kwargs = {"headers": HEADERS, "timeout": 30}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params=params)

    if resp.status_code != 200:
        print(f"PullPush error: {resp.status_code}")
        return []

    return resp.json().get("data", [])


def fetch_pullpush_comments(
    subreddit: str,
    after_utc: int,
    before_utc: int,
    size: int = 100,
    proxy: str = None,
) -> list:
    """Query PullPush for historical comments in a subreddit."""
    url = "https://api.pullpush.io/reddit/search/comment/"
    params = {
        "subreddit": subreddit,
        "after": after_utc,
        "before": before_utc,
        "size": size,
        "sort": "asc",
        "sort_type": "created_utc",
    }

    client_kwargs = {"headers": HEADERS, "timeout": 30}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params=params)

    if resp.status_code != 200:
        return []

    return resp.json().get("data", [])


def collect_monthly_history(
    subreddit: str,
    months: int = 6,
    proxy: str = None,
) -> list:
    """
    Collect historical posts from the past N months via PullPush.
    Handles pagination by looping through time windows.
    """
    from datetime import datetime, timedelta
    import calendar

    all_posts = []
    now = datetime.utcnow()

    for month_offset in range(months):
        end_dt = now - timedelta(days=30 * month_offset)
        start_dt = end_dt - timedelta(days=30)
        after_utc = int(start_dt.timestamp())
        before_utc = int(end_dt.timestamp())

        print(f"Fetching {start_dt.strftime('%Y-%m')} ({month_offset+1}/{months})...")
        posts = fetch_pullpush_posts(subreddit, after_utc, before_utc, size=100, proxy=proxy)
        all_posts.extend(posts)
        print(f"  Got {len(posts)} posts ({len(all_posts)} total)")

        time.sleep(random.uniform(2, 5))

    return all_posts

Calculating Subreddit Statistics

from datetime import datetime, timezone

def compute_subreddit_stats(posts: list) -> dict:
    """Calculate posting frequency and engagement metrics from a post list."""
    if not posts:
        return {}

    scores = [p["score"] for p in posts]
    comment_counts = [p["num_comments"] for p in posts]
    ratios = [p["upvote_ratio"] for p in posts if p.get("upvote_ratio")]

    timestamps = sorted(p["created_utc"] for p in posts)
    span_days = (timestamps[-1] - timestamps[0]) / 86400
    posts_per_day = len(posts) / span_days if span_days > 0 else 0

    # Flair distribution
    from collections import Counter
    flair_counts = Counter(p.get("flair") for p in posts if p.get("flair"))

    # Domain distribution (for link posts)
    domain_counts = Counter(
        p.get("domain") for p in posts
        if not p.get("is_self") and p.get("domain") and not p["domain"].startswith("self.")
    )

    # Video vs text vs link
    video_count = sum(1 for p in posts if p.get("is_video"))
    self_count = sum(1 for p in posts if p.get("is_self"))
    link_count = len(posts) - video_count - self_count

    import statistics
    return {
        "total_posts": len(posts),
        "span_days": round(span_days, 1),
        "posts_per_day": round(posts_per_day, 2),
        "avg_score": round(sum(scores) / len(scores), 1),
        "median_score": sorted(scores)[len(scores) // 2],
        "score_stdev": round(statistics.stdev(scores), 1) if len(scores) > 1 else 0,
        "avg_comments": round(sum(comment_counts) / len(comment_counts), 1),
        "avg_upvote_ratio": round(sum(ratios) / len(ratios), 3) if ratios else None,
        "top_post_score": max(scores),
        "content_mix": {"video": video_count, "text": self_count, "link": link_count},
        "top_flairs": dict(flair_counts.most_common(10)),
        "top_domains": dict(domain_counts.most_common(10)),
    }


def find_top_authors(posts: list, top_n: int = 20) -> list:
    """Find the most active and highest-scoring authors."""
    from collections import defaultdict
    author_stats = defaultdict(lambda: {"posts": 0, "total_score": 0, "total_comments": 0})

    for post in posts:
        author = post.get("author", "[deleted]")
        if author == "[deleted]":
            continue
        author_stats[author]["posts"] += 1
        author_stats[author]["total_score"] += post["score"]
        author_stats[author]["total_comments"] += post["num_comments"]

    results = [
        {
            "author": author,
            "posts": stats["posts"],
            "total_score": stats["total_score"],
            "avg_score": round(stats["total_score"] / stats["posts"], 1),
            "total_comments_generated": stats["total_comments"],
        }
        for author, stats in author_stats.items()
    ]

    return sorted(results, key=lambda x: x["total_score"], reverse=True)[:top_n]

Anti-Bot Measures and Proxy Strategy

Reddit enforces rate limiting at 60 unauthenticated requests per minute per IP. That sounds like a lot until you realize a single subreddit analysis touching metadata + multiple feed pages + comment threads can burn through that in under two minutes.

For any meaningful data collection — multiple subreddits, historical pagination, comment extraction — you need to distribute requests across IPs. ThorData provides residential proxies that work well here because Reddit's filters look for datacenter ASNs. Residential IPs from real ISPs don't trigger the same blocks.

PROXY_USER = "your_username"
PROXY_PASS = "your_password"
PROXY_HOST = "proxy.thordata.com"
PROXY_PORT = 9000


def build_proxy(session_id: str = None) -> str:
    """Build a ThorData proxy URL with optional sticky session."""
    if session_id:
        # Sticky session keeps same IP for a sequence of requests
        user = f"{PROXY_USER}-session-{session_id}"
    else:
        user = PROXY_USER
    return f"http://{user}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"


def scrape_multiple_subreddits(subreddits: list, sort: str = "top", time_filter: str = "month") -> dict:
    """Scrape metadata and posts for multiple subreddits with sticky proxies."""
    results = {}

    for i, sub in enumerate(subreddits):
        # Use sticky session per subreddit to look like one user browsing
        proxy = build_proxy(session_id=f"sub{i}")

        print(f"Scraping r/{sub}...")
        try:
            meta = fetch_subreddit_metadata(sub, proxy=proxy)
            posts = fetch_subreddit_posts(
                sub, sort=sort, time_filter=time_filter,
                max_posts=200, proxy=proxy,
            )
            stats = compute_subreddit_stats(posts)
            top_authors = find_top_authors(posts)

            results[sub] = {
                "metadata": meta,
                "stats": stats,
                "top_authors": top_authors[:5],
                "posts": posts,
            }

            print(
                f"  r/{sub}: {meta['subscribers']:,} subscribers, "
                f"{meta['active_users']:,} active, "
                f"{stats['posts_per_day']:.1f} posts/day, "
                f"avg score {stats['avg_score']:.0f}"
            )

        except Exception as e:
            print(f"  r/{sub} failed: {e}")

        time.sleep(random.uniform(2, 5))

    return results

Building a Subreddit Analytics Dashboard

With the pieces above you can build a full analysis pipeline:

import sqlite3
import json
from datetime import datetime

def init_analytics_db(db_path: str = "reddit_analytics.db") -> sqlite3.Connection:
    """Initialize the Reddit analytics database."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS subreddit_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            subreddit TEXT,
            subscribers INTEGER,
            active_users INTEGER,
            posts_per_day REAL,
            avg_score REAL,
            avg_comments REAL,
            avg_upvote_ratio REAL,
            top_flairs TEXT,
            top_domains TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS posts (
            id TEXT,
            subreddit TEXT,
            title TEXT,
            author TEXT,
            score INTEGER,
            num_comments INTEGER,
            upvote_ratio REAL,
            created_utc INTEGER,
            flair TEXT,
            is_self BOOLEAN,
            is_video BOOLEAN,
            domain TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (id, subreddit)
        );

        CREATE TABLE IF NOT EXISTS author_stats (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            subreddit TEXT,
            author TEXT,
            post_count INTEGER,
            total_score INTEGER,
            avg_score REAL,
            snapshot_date TEXT
        );

        CREATE INDEX IF NOT EXISTS idx_posts_subreddit ON posts(subreddit);
        CREATE INDEX IF NOT EXISTS idx_posts_score ON posts(score DESC);
        CREATE INDEX IF NOT EXISTS idx_snapshots_sub ON subreddit_snapshots(subreddit);
    """)
    conn.commit()
    return conn


def store_subreddit_data(conn: sqlite3.Connection, sub: str, data: dict):
    """Store all scraped data for a subreddit."""
    meta = data["metadata"]
    stats = data["stats"]
    now = datetime.utcnow().isoformat()

    conn.execute("""
        INSERT INTO subreddit_snapshots
        (subreddit, subscribers, active_users, posts_per_day,
         avg_score, avg_comments, avg_upvote_ratio, top_flairs, top_domains)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        sub, meta["subscribers"], meta["active_users"],
        stats.get("posts_per_day"), stats.get("avg_score"),
        stats.get("avg_comments"), stats.get("avg_upvote_ratio"),
        json.dumps(stats.get("top_flairs", {})),
        json.dumps(stats.get("top_domains", {})),
    ))

    for post in data.get("posts", []):
        try:
            conn.execute(
                "INSERT OR REPLACE INTO posts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                (
                    post["id"], sub, post["title"], post.get("author"),
                    post["score"], post["num_comments"],
                    post.get("upvote_ratio"), post["created_utc"],
                    post.get("flair"), int(post.get("is_self", False)),
                    int(post.get("is_video", False)), post.get("domain"), now,
                )
            )
        except Exception as e:
            pass

    for author_stat in data.get("top_authors", []):
        conn.execute("""
            INSERT INTO author_stats (subreddit, author, post_count, total_score, avg_score, snapshot_date)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            sub, author_stat["author"], author_stat["posts"],
            author_stat["total_score"], author_stat["avg_score"],
            now[:10],
        ))

    conn.commit()


def build_dashboard_data(
    subreddits: list,
    proxy_base: str = None,
    db_path: str = "reddit_analytics.db",
):
    """Run full collection and storage for a list of subreddits."""
    conn = init_analytics_db(db_path)
    data = scrape_multiple_subreddits(subreddits)

    for sub, sub_data in data.items():
        store_subreddit_data(conn, sub, sub_data)

    conn.close()
    print(f"\nStored analytics for {len(data)} subreddits to {db_path}")


def compare_subreddits(db_path: str = "reddit_analytics.db") -> list:
    """Compare subreddits by key engagement metrics from latest snapshots."""
    conn = sqlite3.connect(db_path)
    cursor = conn.execute("""
        SELECT s1.subreddit, s1.subscribers, s1.active_users,
               s1.posts_per_day, s1.avg_score, s1.avg_comments,
               ROUND(s1.active_users * 100.0 / NULLIF(s1.subscribers, 0), 3) as active_pct
        FROM subreddit_snapshots s1
        WHERE s1.id = (
            SELECT MAX(id) FROM subreddit_snapshots s2
            WHERE s2.subreddit = s1.subreddit
        )
        ORDER BY s1.subscribers DESC
    """)
    rows = cursor.fetchall()
    conn.close()

    print(f"\n{'Subreddit':<25} {'Subscribers':>12} {'Active':>8} {'Posts/d':>8} {'AvgScore':>9}")
    print("-" * 70)
    for row in rows:
        sub, subs, active, ppd, avg_s, avg_c, active_pct = row
        print(f"r/{sub:<23} {subs:>12,} {active:>8,} {(ppd or 0):>8.1f} {(avg_s or 0):>9.0f}")

    return rows


# Example run
if __name__ == "__main__":
    build_dashboard_data([
        "MachineLearning", "datascience", "learnpython",
        "investing", "personalfinance", "startups",
    ])
    compare_subreddits()

Use the post titles and selftext to identify trending topics within subreddits:

import re
from collections import Counter

def extract_trending_keywords(posts: list, min_freq: int = 3) -> dict:
    """
    Extract frequently mentioned keywords from post titles.
    Filters common stop words to surface meaningful topics.
    """
    STOP_WORDS = {
        "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
        "of", "with", "by", "from", "is", "are", "was", "were", "be", "been",
        "have", "has", "had", "do", "does", "did", "will", "would", "could",
        "should", "may", "might", "this", "that", "these", "those", "i", "you",
        "he", "she", "it", "we", "they", "what", "which", "who", "how", "when",
        "where", "why", "not", "no", "can", "my", "your", "his", "her", "its",
        "our", "their", "about", "any", "some", "all", "more", "most", "just",
    }

    word_counter = Counter()
    bigram_counter = Counter()

    for post in posts:
        title = post.get("title", "").lower()
        words = re.findall(r'\b[a-z][a-z0-9]{2,}\b', title)
        clean_words = [w for w in words if w not in STOP_WORDS]

        word_counter.update(clean_words)

        # Bigrams (two-word phrases)
        for i in range(len(clean_words) - 1):
            bigram = f"{clean_words[i]} {clean_words[i+1]}"
            bigram_counter[bigram] += 1

    return {
        "top_keywords": dict(word_counter.most_common(30)),
        "top_bigrams": {k: v for k, v in bigram_counter.most_common(20) if v >= min_freq},
    }


def find_viral_posts(posts: list, min_score: int = 100) -> list:
    """Find posts that went viral based on score and comment engagement."""
    high_engagement = [
        p for p in posts
        if p["score"] >= min_score and p["num_comments"] > 10
    ]

    # Compute engagement efficiency: comments per 100 score points
    for post in high_engagement:
        post["engagement_efficiency"] = round(
            post["num_comments"] / max(post["score"], 1) * 100, 2
        )

    return sorted(high_engagement, key=lambda p: p["score"], reverse=True)

Key Takeaways