Scraping Reddit Posts and Comments in 2026: API Changes, PRAW, and Direct Methods
Scraping Reddit Posts and Comments in 2026: API Changes, PRAW, and Direct Methods
Reddit used to be the easiest site on the internet to scrape. Free API, no auth required, just slap .json on the end of any URL and you had structured data. Those days are gone.
In June 2023, Reddit announced API pricing that would charge third-party apps up to $12,000/year for moderate usage. Apollo, Reddit is Fun, Sync -- all dead within weeks. The developer community went ballistic, subreddits went dark in protest, and Reddit didn't blink.
Now in 2026, the dust has settled. The API still exists, it's just different. Let's go through every method that works, from the official API down to archival services and raw HTML parsing.
Table of Contents
- The Official Reddit API: What You Actually Get
- Setting Up OAuth and a Reddit App
- PRAW: The Standard Python Approach
- Scraping Comments -- Including Nested Replies
- Handling Deleted, Removed, and Edited Content
- The old.reddit.com HTML Approach
- The .json URL Trick -- Still Alive in 2026
- PushShift and Pullpush: Historical Archives
- Rate Limits, Quotas, and Scaling Strategies
- Proxy Rotation for Multi-Token Scraping
- Storing Reddit Data: SQLite and JSON Schemas
- Filtering, Deduplication, and Data Quality
- Real Use Cases: Research, Monitoring, Analysis
- Common Errors and How to Fix Them
- Which Approach Should You Use?
1. The Official Reddit API: What You Actually Get {#official-api}
Reddit's Data API has a free tier that's surprisingly usable for small-to-medium projects. Here's the deal:
- 100 requests per minute (QPM) for OAuth-authenticated apps
- 10 requests per minute without OAuth (basically unusable)
- OAuth is mandatory for anything serious
- No per-request cost -- it's purely rate-limited
Compared to Twitter/X charging $100/month for 10K reads, Reddit's free tier is generous. 100 QPM means you can pull ~144,000 requests per day. That's enough for most research projects and personal tools.
The catch: you need to register an app and use OAuth. No more anonymous .json endpoints at scale.
Key endpoints you'll use:
| Endpoint | Description | Notes |
|---|---|---|
/r/{sub}/hot |
Hot posts | Sorted by hot algorithm |
/r/{sub}/new |
New posts | Chronological |
/r/{sub}/top |
Top posts | Supports t=hour/day/week/month/year/all |
/r/{sub}/rising |
Rising posts | Early trending signals |
/r/{sub}/search |
Search within subreddit | |
/comments/{post_id} |
Post + comment tree | Core data |
/user/{username}/submitted |
User's post history | |
/user/{username}/comments |
User's comment history |
2. Setting Up OAuth and a Reddit App {#setup}
Creating a Reddit App
- Go to reddit.com/prefs/apps
- Click "create another app" at the bottom
- Pick "script" for personal use or "web app" if building a service
- Set the redirect URI to
http://localhost:8080(for script apps, this doesn't matter) - Note your
client_id(under the app name) andclient_secret
No credit card, no approval process. The whole thing takes 30 seconds.
Dependency Installation
pip install praw requests beautifulsoup4
PRAW Configuration File
Instead of hardcoding credentials, use a praw.ini file in your project root:
[DEFAULT]
client_id=your_client_id
client_secret=your_client_secret
user_agent=my-scraper/1.0 (by u/your_username)
Or configure programmatically -- see next section.
3. PRAW: The Standard Python Approach {#praw}
PRAW (Python Reddit API Wrapper) handles OAuth, rate limiting, and pagination automatically. You don't need to think about tokens or sleep timers.
pip install praw
Basic Setup
import praw
reddit = praw.Reddit(
client_id="your_client_id",
client_secret="your_client_secret",
user_agent="my-scraper/1.0 (by u/your_username)",
)
# Read-only mode -- no login needed for public data
print(reddit.read_only) # True
The user_agent matters. Reddit will throttle or block generic user agents. Include your app name and a contact username -- it's part of their API rules and keeps you from hitting silent rate limits.
Scraping Subreddit Posts
def scrape_subreddit(subreddit_name: str, sort: str = "hot", limit: int = 100,
time_filter: str = "week") -> list[dict]:
"""Pull posts from a subreddit with full metadata."""
subreddit = reddit.subreddit(subreddit_name)
posts = []
listing_fn = {
"hot": subreddit.hot,
"new": subreddit.new,
"top": lambda **kw: subreddit.top(time_filter=time_filter, **kw),
"rising": subreddit.rising,
"controversial": subreddit.controversial,
}.get(sort, subreddit.hot)
for post in listing_fn(limit=limit):
posts.append({
"id": post.id,
"title": post.title,
"author": str(post.author) if post.author else "[deleted]",
"score": post.score,
"upvote_ratio": post.upvote_ratio,
"url": post.url,
"domain": post.domain,
"selftext": post.selftext,
"selftext_html": post.selftext_html,
"created_utc": post.created_utc,
"num_comments": post.num_comments,
"permalink": f"https://reddit.com{post.permalink}",
"subreddit": post.subreddit.display_name,
"flair": post.link_flair_text,
"is_self": post.is_self,
"is_video": post.is_video,
"over_18": post.over_18,
"spoiler": post.spoiler,
"stickied": post.stickied,
"locked": post.locked,
"awards": post.total_awards_received,
"gilded": post.gilded,
})
return posts
# Usage examples
hot_posts = scrape_subreddit("python", sort="hot", limit=100)
top_of_week = scrape_subreddit("datascience", sort="top", limit=50, time_filter="week")
top_of_alltime = scrape_subreddit("worldnews", sort="top", limit=1000, time_filter="all")
PRAW handles pagination internally. You can set limit=None to pull everything Reddit will give you (roughly 1,000 posts per listing endpoint). For .hot(), .new(), .top(), and .controversial(), that's the ceiling.
Streaming New Posts in Real Time
PRAW supports streaming for monitoring subreddits as new posts appear:
def stream_subreddit(subreddit_name: str, callback):
"""Stream new posts from a subreddit in real time."""
subreddit = reddit.subreddit(subreddit_name)
for post in subreddit.stream.submissions(skip_existing=True):
callback({
"id": post.id,
"title": post.title,
"score": post.score,
"created_utc": post.created_utc,
"permalink": f"https://reddit.com{post.permalink}",
})
# Monitor multiple subreddits at once
def stream_multi(subreddits: list[str], callback):
"""Stream from multiple subreddits simultaneously."""
combined = reddit.subreddit("+".join(subreddits))
for post in combined.stream.submissions(skip_existing=True):
callback(post)
Streaming is perfect for real-time brand monitoring, keyword alerts, or building news aggregators.
4. Scraping Comments -- Including Nested Replies {#comments}
Comments are where it gets interesting -- and messy. Reddit's comment trees can be deeply nested, and large threads have "load more comments" nodes that require separate API requests.
Full Comment Tree Extraction
from praw.models import MoreComments
import json
def scrape_comments(post_id: str, max_depth: int = 10,
max_comments: int = 5000) -> list[dict]:
"""Scrape all comments from a post, expanding 'more comments' nodes."""
submission = reddit.submission(id=post_id)
submission.comments.replace_more(limit=max_depth)
comments = []
def process_comment_tree(comment_list, depth=0):
for comment in comment_list:
if isinstance(comment, MoreComments):
continue
if len(comments) >= max_comments:
return
comments.append({
"id": comment.id,
"author": str(comment.author) if comment.author else "[deleted]",
"body": comment.body,
"body_html": comment.body_html,
"score": comment.score,
"ups": comment.ups,
"created_utc": comment.created_utc,
"parent_id": comment.parent_id,
"link_id": comment.link_id,
"is_submitter": comment.is_submitter,
"distinguished": comment.distinguished,
"gilded": comment.gilded,
"depth": depth,
"awards": comment.total_awards_received,
"edited": comment.edited if comment.edited else False,
"stickied": comment.stickied,
})
if comment.replies:
process_comment_tree(comment.replies, depth=depth+1)
process_comment_tree(submission.comments)
return comments
# Usage
comments = scrape_comments("abc123")
print(f"Got {len(comments)} comments")
# Build comment tree structure
def build_tree(comments: list[dict]) -> dict:
"""Reconstruct nested comment tree from flat list."""
by_id = {c["id"]: {**c, "replies": []} for c in comments}
roots = []
for c in comments:
parent = c["parent_id"]
if parent.startswith("t1_"):
parent_id = parent[3:]
if parent_id in by_id:
by_id[parent_id]["replies"].append(by_id[c["id"]])
else:
roots.append(by_id[c["id"]])
else:
roots.append(by_id[c["id"]])
return roots
The replace_more(limit=max_depth) call is the expensive part. Each "more comments" expansion is a separate API request. A viral post with thousands of comments could eat 20-30 requests just to fully expand the tree. Set limit=0 to skip expansions entirely if you only need top-level comments.
Scraping Comments for Multiple Posts Efficiently
import time
from concurrent.futures import ThreadPoolExecutor
def batch_scrape_comments(post_ids: list[str],
delay: float = 0.7) -> dict[str, list]:
"""Scrape comments for multiple posts with rate limiting."""
results = {}
for i, post_id in enumerate(post_ids):
try:
comments = scrape_comments(post_id, max_depth=5)
results[post_id] = comments
print(f"[{i+1}/{len(post_ids)}] Post {post_id}: {len(comments)} comments")
except Exception as e:
print(f"Error on {post_id}: {e}")
results[post_id] = []
time.sleep(delay) # stay under 100 QPM
return results
5. Handling Deleted, Removed, and Edited Content {#deleted}
Reddit has two types of missing content that look similar but aren't:
- [deleted]: The user deleted their own post/comment. The
authorfield becomesNoneandbodybecomes[deleted]. - [removed]: A moderator removed the content. The
authorstays butbodybecomes[removed].
def classify_content(item) -> str:
"""Classify the visibility state of a Reddit post or comment."""
body = getattr(item, 'body', None) or getattr(item, 'selftext', None) or ''
author = item.author
if author is None and body == "[deleted]":
return "user_deleted"
elif body == "[removed]":
return "mod_removed"
elif getattr(item, 'banned_by', None):
return "admin_removed"
elif getattr(item, 'locked', False):
return "locked"
return "active"
def safe_get_body(comment) -> str:
"""Get comment body, handling deleted/removed states gracefully."""
if comment.body in ("[deleted]", "[removed]"):
return ""
return comment.body or ""
There's no way to recover deleted content through the official API. For that, you need archives -- see the PushShift section below.
6. The old.reddit.com HTML Approach {#old-reddit}
Reddit's old design at old.reddit.com still serves simpler HTML than the React-heavy new site. For quick scrapes without OAuth:
import requests
from bs4 import BeautifulSoup
import time
import random
def scrape_old_reddit(subreddit: str, pages: int = 5,
delay_range: tuple = (2, 4)) -> list[dict]:
"""Scrape posts from old.reddit.com - no API key needed."""
posts = []
after = None
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
}
for page_num in range(pages):
url = f"https://old.reddit.com/r/{subreddit}/"
params = {"after": after} if after else {}
resp = requests.get(url, headers=headers, params=params, timeout=10)
if resp.status_code == 429:
print(f"Rate limited on page {page_num}, backing off 30s")
time.sleep(30)
continue
if resp.status_code != 200:
print(f"Got {resp.status_code} on page {page_num}")
break
soup = BeautifulSoup(resp.text, "html.parser")
for thing in soup.select("div.thing[data-fullname]"):
title_el = thing.select_one("a.title")
score_el = thing.select_one("div.score.unvoted")
time_el = thing.select_one("time.live-timestamp")
author_el = thing.select_one("a.author")
comment_el = thing.select_one("a.comments")
flair_el = thing.select_one("span.flair")
posts.append({
"title": title_el.text.strip() if title_el else "",
"score": thing.get("data-score", "0"),
"fullname": thing.get("data-fullname", ""),
"url": thing.get("data-url", ""),
"subreddit": thing.get("data-subreddit", ""),
"domain": thing.get("data-domain", ""),
"author": author_el.text if author_el else "[deleted]",
"timestamp": time_el.get("datetime", "") if time_el else "",
"comments": comment_el.text.split()[0] if comment_el else "0",
"flair": flair_el.text.strip() if flair_el else "",
"nsfw": "nsfw" in (thing.get("class") or []),
"link": title_el["href"] if title_el else "",
})
# Pagination
next_btn = soup.select_one("span.next-button a")
if next_btn:
after = next_btn["href"].split("after=")[-1].split("&")[0]
else:
break
time.sleep(random.uniform(*delay_range))
return posts
This approach is fragile -- any HTML change breaks it -- but old.reddit.com has been remarkably stable since it hasn't had a redesign in years. Reddit has said they won't kill it.
The main downside: you're limited to about 25 posts per page, and aggressive scraping will get your IP blocked. Keep it under 1 request every 2-3 seconds.
Scraping Comments from old.reddit.com
Old Reddit also exposes comments in a cleaner HTML format:
def scrape_old_reddit_comments(post_url: str, max_pages: int = 5) -> list[dict]:
"""Scrape comments from an old.reddit.com thread."""
comments = []
if "reddit.com" in post_url and "old.reddit.com" not in post_url:
post_url = post_url.replace("reddit.com", "old.reddit.com")
headers = {"User-Agent": "Mozilla/5.0 (research bot, contact u/myuser)"}
resp = requests.get(post_url, headers=headers)
soup = BeautifulSoup(resp.text, "html.parser")
for div in soup.select("div.comment"):
author_el = div.select_one("a.author")
body_el = div.select_one("div.md")
score_el = div.select_one("span.score.unvoted")
time_el = div.select_one("time.live-timestamp")
comments.append({
"author": author_el.text if author_el else "[deleted]",
"body": body_el.get_text("\n").strip() if body_el else "",
"score": score_el.text.strip() if score_el else "?",
"timestamp": time_el.get("datetime", "") if time_el else "",
"id": div.get("data-fullname", ""),
"parent": div.get("data-fullname-parent", ""),
})
return comments
7. The .json URL Trick -- Still Alive in 2026 {#json-trick}
Reddit still returns JSON for .json-suffixed URLs, though rate limiting is stricter without OAuth:
import requests
import time
def fetch_reddit_json(url: str, params: dict = None,
session_token: str = None) -> dict:
"""Fetch Reddit JSON directly -- works on any public URL."""
if not url.endswith(".json"):
url = url.rstrip("/") + ".json"
headers = {
"User-Agent": "Mozilla/5.0 (research project; contact u/myuser)",
}
if session_token:
headers["Authorization"] = f"Bearer {session_token}"
resp = requests.get(url, headers=headers, params=params or {}, timeout=10)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
time.sleep(retry_after)
return fetch_reddit_json(url, params, session_token)
resp.raise_for_status()
return resp.json()
# Fetch hot posts from a subreddit
data = fetch_reddit_json("https://www.reddit.com/r/python/hot.json", {"limit": 25})
posts = data["data"]["children"]
# Fetch a specific post with its comments
post_data = fetch_reddit_json("https://www.reddit.com/r/python/comments/abc123/post_title.json")
post = post_data[0]["data"]["children"][0]["data"]
comment_tree = post_data[1]["data"]["children"]
The JSON API still supports the after cursor parameter for pagination:
def paginate_json_api(subreddit: str, sort: str = "new",
max_pages: int = 10) -> list[dict]:
"""Paginate through Reddit JSON API results."""
all_posts = []
after = None
for _ in range(max_pages):
params = {"limit": 100}
if after:
params["after"] = after
url = f"https://www.reddit.com/r/{subreddit}/{sort}.json"
data = fetch_reddit_json(url, params)
items = data["data"]["children"]
if not items:
break
all_posts.extend(i["data"] for i in items)
after = data["data"]["after"]
if not after:
break
time.sleep(1.2) # Respect rate limits
return all_posts
8. PushShift and Pullpush: Historical Archives {#pushshift}
PushShift was the gold standard for historical Reddit data. It archived every post and comment in near-real-time, and researchers relied on it for years. Then in 2023, Reddit cut off its access as part of the API changes.
The project partially recovered. Pullpush.io now serves as the community continuation, with data coverage that has gaps but is still useful for historical research.
Pullpush API -- Posts
import requests
def search_pullpush_posts(subreddit: str = None, query: str = None,
author: str = None, size: int = 100,
before: int = None, after: int = None) -> list[dict]:
"""Search historical Reddit posts via Pullpush API."""
url = "https://api.pullpush.io/reddit/search/submission/"
params = {"size": size, "sort": "desc", "sort_type": "score"}
if subreddit:
params["subreddit"] = subreddit
if query:
params["q"] = query
if author:
params["author"] = author
if before:
params["before"] = before
if after:
params["after"] = after
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json().get("data", [])
# Find top posts about a topic
results = search_pullpush_posts(subreddit="python", query="web scraping", size=25)
for r in results[:5]:
print(f"[{r.get('score', 0)}] {r['title']}")
Pullpush API -- Comments
def search_pullpush_comments(subreddit: str = None, query: str = None,
link_id: str = None, size: int = 100) -> list[dict]:
"""Search historical Reddit comments via Pullpush API."""
url = "https://api.pullpush.io/reddit/search/comment/"
params = {"size": size}
if subreddit:
params["subreddit"] = subreddit
if query:
params["q"] = query
if link_id:
params["link_id"] = link_id # filter by post
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json().get("data", [])
Important Limitations
The Pullpush API is free but has its own rate limits (around 1 request per second). Data coverage varies -- some months have complete archives, others have gaps where the ingestion pipeline went down. Don't rely on it for completeness, but it's invaluable for:
- Finding posts/comments that have since been deleted
- Historical trend analysis going back years
- Recovering data from banned subreddits
- Large-scale academic corpus building
9. Rate Limits, Quotas, and Scaling Strategies {#rate-limits}
Reddit's 100 QPM limit is per OAuth token. For larger throughput:
Token-Based Rate Limiter
import time
import threading
class RedditRateLimiter:
"""Thread-safe rate limiter for Reddit API calls."""
def __init__(self, requests_per_minute: int = 90):
self.interval = 60.0 / requests_per_minute
self.last_request = 0.0
self._lock = threading.Lock()
def wait(self):
with self._lock:
elapsed = time.time() - self.last_request
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
self.last_request = time.time()
limiter = RedditRateLimiter(90) # Stay under 100 QPM
# Wrap PRAW calls
def rate_limited_request(fn, *args, **kwargs):
limiter.wait()
return fn(*args, **kwargs)
Multi-Token Rotation
If you need more throughput, create multiple Reddit apps under different accounts:
import praw
from itertools import cycle
REDDIT_CREDENTIALS = [
{"client_id": "id1", "client_secret": "secret1", "user_agent": "app1/1.0"},
{"client_id": "id2", "client_secret": "secret2", "user_agent": "app2/1.0"},
{"client_id": "id3", "client_secret": "secret3", "user_agent": "app3/1.0"},
]
reddit_clients = [praw.Reddit(**creds) for creds in REDDIT_CREDENTIALS]
client_pool = cycle(reddit_clients)
def get_next_client():
return next(client_pool)
Exponential Backoff for API Errors
import time
import random
from functools import wraps
def with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return fn(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
print(f"Attempt {attempt+1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)
return wrapper
return decorator
@with_backoff(max_retries=5)
def safe_fetch_posts(subreddit_name, limit=100):
return scrape_subreddit(subreddit_name, limit=limit)
10. Proxy Rotation for Multi-Token Scraping {#proxies}
Reddit's rate limiting is IP-based in addition to token-based. When running multiple OAuth tokens and scraping at higher rates, rotating residential proxies help distribute the traffic.
ThorData provides rotating residential proxies with country targeting, sticky sessions, and HTTP/HTTPS/SOCKS5 support. Their residential pool makes each token's requests appear to originate from a different household IP.
import praw
import requests
# Using ThorData's rotating residential proxies
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000
def get_proxy_url(country: str = "US") -> str:
"""Build proxy URL with optional country targeting."""
user = f"{THORDATA_USER}-country-{country.lower()}"
return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"
# For raw requests (JSON API, HTML scraping)
def proxied_request(url: str, country: str = "US", **kwargs) -> requests.Response:
proxy_url = get_proxy_url(country)
proxies = {"http": proxy_url, "https": proxy_url}
headers = {"User-Agent": "Mozilla/5.0 (research project)"}
return requests.get(url, proxies=proxies, headers=headers, **kwargs)
# Test proxy
resp = proxied_request("https://httpbin.org/ip")
print(f"Outbound IP: {resp.json()['origin']}")
For the PRAW OAuth approach, proxies matter less since the rate limit is per-token. But for the raw .json URL approach and old.reddit.com scraping, residential proxies prevent IP bans when collecting at volume.
11. Storing Reddit Data: SQLite and JSON Schemas {#storage}
SQLite Schema for Posts + Comments
import sqlite3
import json
from pathlib import Path
def init_db(db_path: str = "reddit_data.db") -> sqlite3.Connection:
"""Create optimized SQLite database for Reddit data."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS posts (
id TEXT PRIMARY KEY,
subreddit TEXT NOT NULL,
title TEXT,
author TEXT,
score INTEGER,
upvote_ratio REAL,
num_comments INTEGER,
url TEXT,
domain TEXT,
selftext TEXT,
created_utc REAL,
permalink TEXT,
flair TEXT,
is_self INTEGER,
over_18 INTEGER,
locked INTEGER,
awards INTEGER,
scraped_at REAL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS comments (
id TEXT PRIMARY KEY,
post_id TEXT NOT NULL,
subreddit TEXT,
author TEXT,
body TEXT,
score INTEGER,
created_utc REAL,
parent_id TEXT,
depth INTEGER,
is_submitter INTEGER,
edited INTEGER,
awards INTEGER,
scraped_at REAL,
FOREIGN KEY (post_id) REFERENCES posts(id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_posts_subreddit ON posts(subreddit)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_utc)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id)")
conn.commit()
return conn
def save_posts(conn: sqlite3.Connection, posts: list[dict]):
"""Bulk insert posts with upsert semantics."""
import time
now = time.time()
conn.executemany("""
INSERT OR REPLACE INTO posts VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", [
(p["id"], p.get("subreddit",""), p.get("title",""),
p.get("author",""), p.get("score",0), p.get("upvote_ratio",0),
p.get("num_comments",0), p.get("url",""), p.get("domain",""),
p.get("selftext",""), p.get("created_utc",0), p.get("permalink",""),
p.get("flair",""), int(p.get("is_self",False)),
int(p.get("over_18",False)), int(p.get("locked",False)),
p.get("awards",0), now)
for p in posts
])
conn.commit()
def save_comments(conn: sqlite3.Connection, post_id: str, comments: list[dict]):
"""Bulk insert comments with upsert semantics."""
import time
now = time.time()
conn.executemany("""
INSERT OR REPLACE INTO comments VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", [
(c["id"], post_id, c.get("subreddit",""), c.get("author",""),
c.get("body",""), c.get("score",0), c.get("created_utc",0),
c.get("parent_id",""), c.get("depth",0),
int(c.get("is_submitter",False)), int(c.get("edited",False)),
c.get("awards",0), now)
for c in comments
])
conn.commit()
Exporting to JSONL for Analysis
import json
def export_to_jsonl(conn: sqlite3.Connection, output_path: str,
subreddit: str = None):
"""Export posts and comments to JSONL format for analysis."""
query = "SELECT * FROM posts"
params = ()
if subreddit:
query += " WHERE subreddit = ?"
params = (subreddit,)
with open(output_path, "w") as f:
for row in conn.execute(query, params):
columns = [d[0] for d in conn.execute(query, params).description]
post = dict(zip(columns, row))
f.write(json.dumps(post) + "\n")
12. Filtering, Deduplication, and Data Quality {#data-quality}
When collecting posts across multiple runs and multiple methods, you'll encounter duplicates and noisy data:
import hashlib
def deduplicate_posts(posts: list[dict]) -> list[dict]:
"""Remove duplicate posts by ID."""
seen = set()
unique = []
for post in posts:
pid = post.get("id")
if pid and pid not in seen:
seen.add(pid)
unique.append(post)
return unique
def clean_post(post: dict) -> dict:
"""Normalize and clean a post dict."""
post = dict(post)
# Normalize author field
if post.get("author") in (None, "None", ""):
post["author"] = "[deleted]"
# Clean selftext
if post.get("selftext") in ("[removed]", "[deleted]", None):
post["selftext"] = ""
# Ensure numeric types
post["score"] = int(post.get("score") or 0)
post["num_comments"] = int(post.get("num_comments") or 0)
return post
def filter_posts(posts: list[dict], min_score: int = 0,
exclude_nsfw: bool = True,
exclude_removed: bool = True,
require_text: bool = False) -> list[dict]:
"""Filter posts by quality criteria."""
filtered = []
for post in posts:
if post.get("score", 0) < min_score:
continue
if exclude_nsfw and post.get("over_18"):
continue
if exclude_removed and post.get("selftext") == "[removed]":
continue
if require_text and not post.get("selftext", "").strip():
continue
filtered.append(post)
return filtered
13. Real Use Cases {#use-cases}
Brand Monitoring
def monitor_brand(brand_name: str, subreddits: list[str] = None) -> list[dict]:
"""Monitor mentions of a brand across Reddit."""
target = reddit.subreddit("+".join(subreddits) if subreddits else "all")
mentions = []
for post in target.search(brand_name, sort="new", limit=100):
mentions.append({
"id": post.id,
"title": post.title,
"subreddit": post.subreddit.display_name,
"score": post.score,
"url": f"https://reddit.com{post.permalink}",
"created_utc": post.created_utc,
})
return mentions
Sentiment Analysis Dataset Builder
def build_sentiment_dataset(subreddits: list[str],
min_score: int = 10) -> list[dict]:
"""Build a labeled sentiment dataset from Reddit comments."""
dataset = []
for sub_name in subreddits:
posts = scrape_subreddit(sub_name, sort="top", limit=50, time_filter="month")
for post in posts:
if post["score"] < min_score:
continue
comments = scrape_comments(post["id"], max_depth=3)
for c in comments:
if len(c["body"]) > 20 and c["score"] != 0:
dataset.append({
"text": c["body"],
"score": c["score"],
"subreddit": sub_name,
"label": "positive" if c["score"] > 5 else
"negative" if c["score"] < 0 else "neutral",
})
return dataset
Trend Tracking Over Time
import sqlite3
from datetime import datetime, timedelta
def track_keyword_trends(keywords: list[str], subreddits: list[str],
db_path: str = "trends.db"):
"""Track daily mention frequency of keywords across subreddits."""
conn = sqlite3.connect(db_path)
conn.execute("""CREATE TABLE IF NOT EXISTS mentions (
keyword TEXT, subreddit TEXT, date TEXT,
post_count INTEGER, total_score INTEGER,
PRIMARY KEY (keyword, subreddit, date))""")
for keyword in keywords:
for sub in subreddits:
posts = search_pullpush_posts(subreddit=sub, query=keyword, size=100)
today = datetime.utcnow().strftime("%Y-%m-%d")
count = len(posts)
total_score = sum(p.get("score", 0) for p in posts)
conn.execute("INSERT OR REPLACE INTO mentions VALUES (?,?,?,?,?)",
(keyword, sub, today, count, total_score))
conn.commit()
14. Common Errors and How to Fix Them {#errors}
| Error | Cause | Fix |
|---|---|---|
prawcore.exceptions.ResponseException: 401 |
Invalid credentials | Re-check client_id and client_secret |
prawcore.exceptions.TooManyRequests |
Rate limit exceeded | PRAW handles this automatically; if persistent, reduce QPM |
Reddit.RequestException: 403 |
App revoked or wrong permissions | Recreate app; check if banned |
Empty selftext |
Post deleted before scrape | Check selftext for [removed]/[deleted] |
replace_more taking forever |
Huge comment thread | Set limit=5 for faster partial expansion |
Stale after token in pagination |
Reddit invalidates cursors after 1h | Restart pagination from scratch |
RequestException: received 503 |
Reddit temporarily unavailable | Retry with exponential backoff |
| All posts show score=1 | Score fuzzing for new posts | Normal -- scores stabilize after ~30min |
15. Which Approach Should You Use? {#summary}
| Use Case | Recommended Approach |
|---|---|
| Small project, public data | PRAW with a single app (100 QPM free) |
| Historical research pre-2023 | Pullpush API for archived data, then PRAW for recent |
| Quick one-off scrape | old.reddit.com HTML parsing |
| Real-time monitoring | PRAW streaming (subreddit.stream.submissions) |
| Large-scale collection | Multiple OAuth tokens + rate limiting + SQLite caching |
| Deleted/removed content | Pullpush archives (incomplete coverage post-2023) |
| High-volume scraping | Multiple tokens + ThorData residential proxies |
Reddit's API situation could be worse. The free tier is functional, PRAW handles the complexity, and the data is still accessible if you follow the rules. Compared to what Twitter did to its ecosystem, Reddit developers got off relatively easy.
The main things to remember: always set a proper user agent, respect rate limits, cache your data locally, and use replace_more(limit=5) instead of full expansion on large threads unless you specifically need every reply. Reddit's API team does monitor usage patterns, and getting your app revoked means starting over.
For multi-account, high-throughput setups, routing your requests through ThorData's residential proxy network distributes the IP footprint across real residential addresses -- reducing the chance of IP-level throttling when you're pushing toward the QPM ceiling across multiple tokens.