How to Scrape Reddit for Subreddit Analytics in 2026: Posts, Comments & Trends
How to Scrape Reddit for Subreddit Analytics in 2026: Posts, Comments & Trends
Reddit is one of the few platforms that still exposes structured data through a semi-public API — if you know where to look. Subreddit analytics are useful for trend detection, content research, community health monitoring, and market research. This guide covers how to extract that data reliably in 2026, from the basic .json trick to historical data archives and proxy strategy.
What Subreddit Analytics Data Is Available
From Reddit's public endpoints you can extract:
- Subreddit metadata — subscriber count, active users, description, creation date, posting rules
- Post feeds — top, hot, new, and rising listings with scores, comment counts, author, flair
- Comment threads — full recursive tree structure with vote scores and timestamps
- Posting frequency — how often content is submitted across time windows
- Engagement rates — upvote ratios, comments-per-post averages, score distributions
- Historical posts — via third-party archives going back years
- Moderator info — moderator list and activity (public)
- Flair data — user and post flairs, useful for topic segmentation
For historical data beyond Reddit's own 1000-post limit, PullPush.io and Arctic Shift are the main options in 2026 after Pushshift went dark.
Dependencies and Setup
pip install httpx requests praw # praw = Python Reddit API Wrapper
Reddit's official PRAW library handles OAuth for you. But the .json trick works without any auth setup.
Reddit's Public JSON API: The .json Trick
Append .json to almost any Reddit URL and you get machine-readable data instead of HTML:
https://www.reddit.com/r/datascience/.json
https://www.reddit.com/r/datascience/top/.json?t=week
https://www.reddit.com/r/datascience/comments/abc123/some_post/.json
Reddit's rate limit for unauthenticated requests is 60 requests per minute per IP. With OAuth (free app registration), it bumps to 100 per minute.
Subreddit Metadata via about.json
Every subreddit exposes its full metadata at about.json:
import httpx
import time
import random
HEADERS = {
"User-Agent": "SubredditAnalytics/1.0 (research project; [email protected])",
"Accept": "application/json",
}
def fetch_subreddit_metadata(subreddit: str, proxy: str = None) -> dict:
"""Fetch subreddit metadata from the about.json endpoint."""
url = f"https://www.reddit.com/r/{subreddit}/about.json"
client_kwargs = {"headers": HEADERS, "follow_redirects": True, "timeout": 15}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
with httpx.Client(**client_kwargs) as client:
resp = client.get(url)
if resp.status_code == 429:
raise RuntimeError("Rate limited — back off and retry")
if resp.status_code == 404:
raise ValueError(f"r/{subreddit} does not exist or is private")
if resp.status_code != 200:
raise RuntimeError(f"HTTP {resp.status_code} for r/{subreddit}")
data = resp.json()["data"]
return {
"name": data["display_name"],
"title": data["title"],
"description": data.get("public_description", ""),
"full_description": data.get("description", ""),
"subscribers": data["subscribers"],
"active_users": data["accounts_active"],
"created_utc": data["created_utc"],
"over18": data["over18"],
"submission_type": data["submission_type"], # link, self, any
"lang": data.get("lang", "en"),
"community_icon": data.get("community_icon", ""),
"header_img": data.get("header_img", ""),
"allowed_media_in_comments": data.get("allowed_media_in_comments", []),
"spoilers_enabled": data.get("spoilers_enabled", False),
"is_crosspostable_subreddit": data.get("is_crosspostable_subreddit"),
"suggested_comment_sort": data.get("suggested_comment_sort"),
}
accounts_active is the "currently browsing" count Reddit shows in the sidebar — useful as a real-time activity signal.
Paginating Posts
Reddit's listing endpoints paginate using the after parameter with fullnames (t3_postid). Reddit caps pagination at approximately 1000 posts per listing type.
def fetch_subreddit_posts(
subreddit: str,
sort: str = "hot",
time_filter: str = "week",
limit: int = 100,
max_posts: int = 500,
proxy: str = None,
) -> list:
"""
Paginate through a subreddit's post feed.
sort: hot | new | top | rising
time_filter: hour | day | week | month | year | all (only applies to 'top')
"""
posts = []
after = None
base_url = f"https://www.reddit.com/r/{subreddit}/{sort}.json"
client_kwargs = {"headers": HEADERS, "follow_redirects": True, "timeout": 15}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
with httpx.Client(**client_kwargs) as client:
while len(posts) < max_posts:
params = {"limit": min(limit, 100), "raw_json": 1}
if after:
params["after"] = after
if sort == "top":
params["t"] = time_filter
resp = client.get(base_url, params=params)
if resp.status_code == 429:
print("Rate limited, waiting 60s...")
time.sleep(60)
continue
if resp.status_code != 200:
print(f"Error: HTTP {resp.status_code}")
break
envelope = resp.json()["data"]
children = envelope.get("children", [])
if not children:
break
for child in children:
d = child["data"]
posts.append({
"id": d["id"],
"fullname": d["name"],
"title": d["title"],
"author": d.get("author"),
"score": d["score"],
"upvote_ratio": d.get("upvote_ratio"),
"num_comments": d["num_comments"],
"url": d.get("url"),
"permalink": d["permalink"],
"created_utc": d["created_utc"],
"is_self": d["is_self"],
"selftext": (d.get("selftext") or "")[:500] if d["is_self"] else None,
"flair": d.get("link_flair_text"),
"domain": d.get("domain"),
"is_video": d.get("is_video", False),
"spoiler": d.get("spoiler", False),
"gilded": d.get("gilded", 0),
"awards": len(d.get("all_awardings", [])),
})
after = envelope.get("after")
if not after:
break
time.sleep(1.1) # Stay under 60 req/min
return posts
Extracting Comment Threads
Comments are nested under any post at {permalink}.json. Reddit returns a two-element array — the first element is the post, the second is the comment tree.
def fetch_comment_thread(permalink: str, proxy: str = None) -> list:
"""Flatten a Reddit comment tree into a list of comment dicts."""
url = f"https://www.reddit.com{permalink}.json"
client_kwargs = {"headers": HEADERS, "follow_redirects": True, "timeout": 15}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
with httpx.Client(**client_kwargs) as client:
resp = client.get(url, params={"raw_json": 1, "limit": 500})
if resp.status_code != 200:
return []
_, comment_listing = resp.json()
return _flatten_comments(comment_listing["data"]["children"])
def _flatten_comments(children: list, depth: int = 0) -> list:
"""Recursively flatten nested comment structure."""
comments = []
for child in children:
if child["kind"] == "more":
continue # Skip "load more" placeholders
d = child["data"]
comments.append({
"id": d["id"],
"author": d.get("author"),
"body": d.get("body", ""),
"score": d.get("score", 0),
"created_utc": d.get("created_utc"),
"depth": depth,
"parent_id": d.get("parent_id"),
"edited": d.get("edited", False),
"is_submitter": d.get("is_submitter", False),
"gilded": d.get("gilded", 0),
"distinguished": d.get("distinguished"),
})
replies = d.get("replies")
if replies and isinstance(replies, dict):
nested = replies["data"]["children"]
comments.extend(_flatten_comments(nested, depth + 1))
return comments
def fetch_top_comment_text(post_id: str, permalink: str, proxy: str = None) -> str:
"""Get the top-level comment bodies as a single string for text analysis."""
comments = fetch_comment_thread(permalink, proxy=proxy)
top_level = [c for c in comments if c["depth"] == 0 and c["body"] != "[deleted]"]
top_level_sorted = sorted(top_level, key=lambda c: c["score"], reverse=True)
return "\n\n".join(c["body"] for c in top_level_sorted[:20])
Using PRAW for OAuth Access
For higher rate limits (100 req/min), use PRAW with an official Reddit app:
import praw
import time
def create_reddit_client(
client_id: str,
client_secret: str,
user_agent: str = "SubredditAnalyzer/1.0",
) -> praw.Reddit:
"""Create an authenticated Reddit client with PRAW."""
return praw.Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent,
# For read-only access, no username/password needed
)
def fetch_posts_praw(reddit: praw.Reddit, subreddit: str, sort: str = "top", limit: int = 100, time_filter: str = "month") -> list:
"""Fetch posts via PRAW for clean structured data."""
sub = reddit.subreddit(subreddit)
if sort == "top":
submissions = sub.top(time_filter=time_filter, limit=limit)
elif sort == "hot":
submissions = sub.hot(limit=limit)
elif sort == "new":
submissions = sub.new(limit=limit)
elif sort == "rising":
submissions = sub.rising(limit=limit)
else:
submissions = sub.hot(limit=limit)
posts = []
for s in submissions:
posts.append({
"id": s.id,
"title": s.title,
"author": str(s.author) if s.author else "[deleted]",
"score": s.score,
"upvote_ratio": s.upvote_ratio,
"num_comments": s.num_comments,
"url": s.url,
"permalink": s.permalink,
"created_utc": s.created_utc,
"is_self": s.is_self,
"selftext": s.selftext[:500] if s.is_self else None,
"flair": s.link_flair_text,
"is_video": s.is_video,
})
return posts
Historical Data: PullPush and Alternatives
Reddit's own API only returns the most recent ~1000 posts per listing. For historical analysis you need third-party archives:
- PullPush.io — the main Pushshift replacement, community-operated
- Arctic Shift — full dataset dumps, useful for bulk analysis
- Academic Torrents — periodic Reddit dataset archives for research use
def fetch_pullpush_posts(
subreddit: str,
after_utc: int,
before_utc: int,
size: int = 100,
proxy: str = None,
) -> list:
"""Query PullPush for historical subreddit posts in a time range."""
url = "https://api.pullpush.io/reddit/search/submission/"
params = {
"subreddit": subreddit,
"after": after_utc,
"before": before_utc,
"size": size,
"sort": "asc",
"sort_type": "created_utc",
}
client_kwargs = {"headers": HEADERS, "timeout": 30}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
with httpx.Client(**client_kwargs) as client:
resp = client.get(url, params=params)
if resp.status_code != 200:
print(f"PullPush error: {resp.status_code}")
return []
return resp.json().get("data", [])
def fetch_pullpush_comments(
subreddit: str,
after_utc: int,
before_utc: int,
size: int = 100,
proxy: str = None,
) -> list:
"""Query PullPush for historical comments in a subreddit."""
url = "https://api.pullpush.io/reddit/search/comment/"
params = {
"subreddit": subreddit,
"after": after_utc,
"before": before_utc,
"size": size,
"sort": "asc",
"sort_type": "created_utc",
}
client_kwargs = {"headers": HEADERS, "timeout": 30}
if proxy:
client_kwargs["proxies"] = {"all://": proxy}
with httpx.Client(**client_kwargs) as client:
resp = client.get(url, params=params)
if resp.status_code != 200:
return []
return resp.json().get("data", [])
def collect_monthly_history(
subreddit: str,
months: int = 6,
proxy: str = None,
) -> list:
"""
Collect historical posts from the past N months via PullPush.
Handles pagination by looping through time windows.
"""
from datetime import datetime, timedelta
import calendar
all_posts = []
now = datetime.utcnow()
for month_offset in range(months):
end_dt = now - timedelta(days=30 * month_offset)
start_dt = end_dt - timedelta(days=30)
after_utc = int(start_dt.timestamp())
before_utc = int(end_dt.timestamp())
print(f"Fetching {start_dt.strftime('%Y-%m')} ({month_offset+1}/{months})...")
posts = fetch_pullpush_posts(subreddit, after_utc, before_utc, size=100, proxy=proxy)
all_posts.extend(posts)
print(f" Got {len(posts)} posts ({len(all_posts)} total)")
time.sleep(random.uniform(2, 5))
return all_posts
Calculating Subreddit Statistics
from datetime import datetime, timezone
def compute_subreddit_stats(posts: list) -> dict:
"""Calculate posting frequency and engagement metrics from a post list."""
if not posts:
return {}
scores = [p["score"] for p in posts]
comment_counts = [p["num_comments"] for p in posts]
ratios = [p["upvote_ratio"] for p in posts if p.get("upvote_ratio")]
timestamps = sorted(p["created_utc"] for p in posts)
span_days = (timestamps[-1] - timestamps[0]) / 86400
posts_per_day = len(posts) / span_days if span_days > 0 else 0
# Flair distribution
from collections import Counter
flair_counts = Counter(p.get("flair") for p in posts if p.get("flair"))
# Domain distribution (for link posts)
domain_counts = Counter(
p.get("domain") for p in posts
if not p.get("is_self") and p.get("domain") and not p["domain"].startswith("self.")
)
# Video vs text vs link
video_count = sum(1 for p in posts if p.get("is_video"))
self_count = sum(1 for p in posts if p.get("is_self"))
link_count = len(posts) - video_count - self_count
import statistics
return {
"total_posts": len(posts),
"span_days": round(span_days, 1),
"posts_per_day": round(posts_per_day, 2),
"avg_score": round(sum(scores) / len(scores), 1),
"median_score": sorted(scores)[len(scores) // 2],
"score_stdev": round(statistics.stdev(scores), 1) if len(scores) > 1 else 0,
"avg_comments": round(sum(comment_counts) / len(comment_counts), 1),
"avg_upvote_ratio": round(sum(ratios) / len(ratios), 3) if ratios else None,
"top_post_score": max(scores),
"content_mix": {"video": video_count, "text": self_count, "link": link_count},
"top_flairs": dict(flair_counts.most_common(10)),
"top_domains": dict(domain_counts.most_common(10)),
}
def find_top_authors(posts: list, top_n: int = 20) -> list:
"""Find the most active and highest-scoring authors."""
from collections import defaultdict
author_stats = defaultdict(lambda: {"posts": 0, "total_score": 0, "total_comments": 0})
for post in posts:
author = post.get("author", "[deleted]")
if author == "[deleted]":
continue
author_stats[author]["posts"] += 1
author_stats[author]["total_score"] += post["score"]
author_stats[author]["total_comments"] += post["num_comments"]
results = [
{
"author": author,
"posts": stats["posts"],
"total_score": stats["total_score"],
"avg_score": round(stats["total_score"] / stats["posts"], 1),
"total_comments_generated": stats["total_comments"],
}
for author, stats in author_stats.items()
]
return sorted(results, key=lambda x: x["total_score"], reverse=True)[:top_n]
Anti-Bot Measures and Proxy Strategy
Reddit enforces rate limiting at 60 unauthenticated requests per minute per IP. That sounds like a lot until you realize a single subreddit analysis touching metadata + multiple feed pages + comment threads can burn through that in under two minutes.
For any meaningful data collection — multiple subreddits, historical pagination, comment extraction — you need to distribute requests across IPs. ThorData provides residential proxies that work well here because Reddit's filters look for datacenter ASNs. Residential IPs from real ISPs don't trigger the same blocks.
PROXY_USER = "your_username"
PROXY_PASS = "your_password"
PROXY_HOST = "proxy.thordata.com"
PROXY_PORT = 9000
def build_proxy(session_id: str = None) -> str:
"""Build a ThorData proxy URL with optional sticky session."""
if session_id:
# Sticky session keeps same IP for a sequence of requests
user = f"{PROXY_USER}-session-{session_id}"
else:
user = PROXY_USER
return f"http://{user}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
def scrape_multiple_subreddits(subreddits: list, sort: str = "top", time_filter: str = "month") -> dict:
"""Scrape metadata and posts for multiple subreddits with sticky proxies."""
results = {}
for i, sub in enumerate(subreddits):
# Use sticky session per subreddit to look like one user browsing
proxy = build_proxy(session_id=f"sub{i}")
print(f"Scraping r/{sub}...")
try:
meta = fetch_subreddit_metadata(sub, proxy=proxy)
posts = fetch_subreddit_posts(
sub, sort=sort, time_filter=time_filter,
max_posts=200, proxy=proxy,
)
stats = compute_subreddit_stats(posts)
top_authors = find_top_authors(posts)
results[sub] = {
"metadata": meta,
"stats": stats,
"top_authors": top_authors[:5],
"posts": posts,
}
print(
f" r/{sub}: {meta['subscribers']:,} subscribers, "
f"{meta['active_users']:,} active, "
f"{stats['posts_per_day']:.1f} posts/day, "
f"avg score {stats['avg_score']:.0f}"
)
except Exception as e:
print(f" r/{sub} failed: {e}")
time.sleep(random.uniform(2, 5))
return results
Building a Subreddit Analytics Dashboard
With the pieces above you can build a full analysis pipeline:
import sqlite3
import json
from datetime import datetime
def init_analytics_db(db_path: str = "reddit_analytics.db") -> sqlite3.Connection:
"""Initialize the Reddit analytics database."""
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS subreddit_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subreddit TEXT,
subscribers INTEGER,
active_users INTEGER,
posts_per_day REAL,
avg_score REAL,
avg_comments REAL,
avg_upvote_ratio REAL,
top_flairs TEXT,
top_domains TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS posts (
id TEXT,
subreddit TEXT,
title TEXT,
author TEXT,
score INTEGER,
num_comments INTEGER,
upvote_ratio REAL,
created_utc INTEGER,
flair TEXT,
is_self BOOLEAN,
is_video BOOLEAN,
domain TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, subreddit)
);
CREATE TABLE IF NOT EXISTS author_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subreddit TEXT,
author TEXT,
post_count INTEGER,
total_score INTEGER,
avg_score REAL,
snapshot_date TEXT
);
CREATE INDEX IF NOT EXISTS idx_posts_subreddit ON posts(subreddit);
CREATE INDEX IF NOT EXISTS idx_posts_score ON posts(score DESC);
CREATE INDEX IF NOT EXISTS idx_snapshots_sub ON subreddit_snapshots(subreddit);
""")
conn.commit()
return conn
def store_subreddit_data(conn: sqlite3.Connection, sub: str, data: dict):
"""Store all scraped data for a subreddit."""
meta = data["metadata"]
stats = data["stats"]
now = datetime.utcnow().isoformat()
conn.execute("""
INSERT INTO subreddit_snapshots
(subreddit, subscribers, active_users, posts_per_day,
avg_score, avg_comments, avg_upvote_ratio, top_flairs, top_domains)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
sub, meta["subscribers"], meta["active_users"],
stats.get("posts_per_day"), stats.get("avg_score"),
stats.get("avg_comments"), stats.get("avg_upvote_ratio"),
json.dumps(stats.get("top_flairs", {})),
json.dumps(stats.get("top_domains", {})),
))
for post in data.get("posts", []):
try:
conn.execute(
"INSERT OR REPLACE INTO posts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
post["id"], sub, post["title"], post.get("author"),
post["score"], post["num_comments"],
post.get("upvote_ratio"), post["created_utc"],
post.get("flair"), int(post.get("is_self", False)),
int(post.get("is_video", False)), post.get("domain"), now,
)
)
except Exception as e:
pass
for author_stat in data.get("top_authors", []):
conn.execute("""
INSERT INTO author_stats (subreddit, author, post_count, total_score, avg_score, snapshot_date)
VALUES (?, ?, ?, ?, ?, ?)
""", (
sub, author_stat["author"], author_stat["posts"],
author_stat["total_score"], author_stat["avg_score"],
now[:10],
))
conn.commit()
def build_dashboard_data(
subreddits: list,
proxy_base: str = None,
db_path: str = "reddit_analytics.db",
):
"""Run full collection and storage for a list of subreddits."""
conn = init_analytics_db(db_path)
data = scrape_multiple_subreddits(subreddits)
for sub, sub_data in data.items():
store_subreddit_data(conn, sub, sub_data)
conn.close()
print(f"\nStored analytics for {len(data)} subreddits to {db_path}")
def compare_subreddits(db_path: str = "reddit_analytics.db") -> list:
"""Compare subreddits by key engagement metrics from latest snapshots."""
conn = sqlite3.connect(db_path)
cursor = conn.execute("""
SELECT s1.subreddit, s1.subscribers, s1.active_users,
s1.posts_per_day, s1.avg_score, s1.avg_comments,
ROUND(s1.active_users * 100.0 / NULLIF(s1.subscribers, 0), 3) as active_pct
FROM subreddit_snapshots s1
WHERE s1.id = (
SELECT MAX(id) FROM subreddit_snapshots s2
WHERE s2.subreddit = s1.subreddit
)
ORDER BY s1.subscribers DESC
""")
rows = cursor.fetchall()
conn.close()
print(f"\n{'Subreddit':<25} {'Subscribers':>12} {'Active':>8} {'Posts/d':>8} {'AvgScore':>9}")
print("-" * 70)
for row in rows:
sub, subs, active, ppd, avg_s, avg_c, active_pct = row
print(f"r/{sub:<23} {subs:>12,} {active:>8,} {(ppd or 0):>8.1f} {(avg_s or 0):>9.0f}")
return rows
# Example run
if __name__ == "__main__":
build_dashboard_data([
"MachineLearning", "datascience", "learnpython",
"investing", "personalfinance", "startups",
])
compare_subreddits()
Content Analysis: Finding Trending Topics
Use the post titles and selftext to identify trending topics within subreddits:
import re
from collections import Counter
def extract_trending_keywords(posts: list, min_freq: int = 3) -> dict:
"""
Extract frequently mentioned keywords from post titles.
Filters common stop words to surface meaningful topics.
"""
STOP_WORDS = {
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
"of", "with", "by", "from", "is", "are", "was", "were", "be", "been",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "this", "that", "these", "those", "i", "you",
"he", "she", "it", "we", "they", "what", "which", "who", "how", "when",
"where", "why", "not", "no", "can", "my", "your", "his", "her", "its",
"our", "their", "about", "any", "some", "all", "more", "most", "just",
}
word_counter = Counter()
bigram_counter = Counter()
for post in posts:
title = post.get("title", "").lower()
words = re.findall(r'\b[a-z][a-z0-9]{2,}\b', title)
clean_words = [w for w in words if w not in STOP_WORDS]
word_counter.update(clean_words)
# Bigrams (two-word phrases)
for i in range(len(clean_words) - 1):
bigram = f"{clean_words[i]} {clean_words[i+1]}"
bigram_counter[bigram] += 1
return {
"top_keywords": dict(word_counter.most_common(30)),
"top_bigrams": {k: v for k, v in bigram_counter.most_common(20) if v >= min_freq},
}
def find_viral_posts(posts: list, min_score: int = 100) -> list:
"""Find posts that went viral based on score and comment engagement."""
high_engagement = [
p for p in posts
if p["score"] >= min_score and p["num_comments"] > 10
]
# Compute engagement efficiency: comments per 100 score points
for post in high_engagement:
post["engagement_efficiency"] = round(
post["num_comments"] / max(post["score"], 1) * 100, 2
)
return sorted(high_engagement, key=lambda p: p["score"], reverse=True)
Key Takeaways
- Reddit's
.jsontrick works without auth but rate limits at 60 req/min per IP — plan accordingly. about.jsongives you clean subreddit metadata including live active user counts.- Paginate using the
afterparameter; you can extract up to ~1000 posts per listing type directly from Reddit. - PullPush.io is the best option for historical data, but treat it as best-effort — it has outages.
- Residential proxies through ThorData let you distribute load across IPs and avoid per-IP throttling when scraping at scale.
- Set a proper
User-Agentheader with contact info — Reddit is more lenient with identifiable bots than anonymous ones. - PRAW with OAuth doubles your rate limit with zero extra complexity.
- The most valuable analytics come from long-term tracking: subscriber velocity, engagement drift, and posting frequency changes over time are more actionable than any single snapshot.