← Back to blog

Scraping Reddit Posts and Comments in 2026: API Changes, PRAW, and Direct Methods

Scraping Reddit Posts and Comments in 2026: API Changes, PRAW, and Direct Methods

Reddit used to be the easiest site on the internet to scrape. Free API, no auth required, just slap .json on the end of any URL and you had structured data. Those days are gone.

In June 2023, Reddit announced API pricing that would charge third-party apps up to $12,000/year for moderate usage. Apollo, Reddit is Fun, Sync -- all dead within weeks. The developer community went ballistic, subreddits went dark in protest, and Reddit didn't blink.

Now in 2026, the dust has settled. The API still exists, it's just different. Let's go through every method that works, from the official API down to archival services and raw HTML parsing.


Table of Contents

  1. The Official Reddit API: What You Actually Get
  2. Setting Up OAuth and a Reddit App
  3. PRAW: The Standard Python Approach
  4. Scraping Comments -- Including Nested Replies
  5. Handling Deleted, Removed, and Edited Content
  6. The old.reddit.com HTML Approach
  7. The .json URL Trick -- Still Alive in 2026
  8. PushShift and Pullpush: Historical Archives
  9. Rate Limits, Quotas, and Scaling Strategies
  10. Proxy Rotation for Multi-Token Scraping
  11. Storing Reddit Data: SQLite and JSON Schemas
  12. Filtering, Deduplication, and Data Quality
  13. Real Use Cases: Research, Monitoring, Analysis
  14. Common Errors and How to Fix Them
  15. Which Approach Should You Use?

1. The Official Reddit API: What You Actually Get {#official-api}

Reddit's Data API has a free tier that's surprisingly usable for small-to-medium projects. Here's the deal:

Compared to Twitter/X charging $100/month for 10K reads, Reddit's free tier is generous. 100 QPM means you can pull ~144,000 requests per day. That's enough for most research projects and personal tools.

The catch: you need to register an app and use OAuth. No more anonymous .json endpoints at scale.

Key endpoints you'll use:

Endpoint Description Notes
/r/{sub}/hot Hot posts Sorted by hot algorithm
/r/{sub}/new New posts Chronological
/r/{sub}/top Top posts Supports t=hour/day/week/month/year/all
/r/{sub}/rising Rising posts Early trending signals
/r/{sub}/search Search within subreddit
/comments/{post_id} Post + comment tree Core data
/user/{username}/submitted User's post history
/user/{username}/comments User's comment history

2. Setting Up OAuth and a Reddit App {#setup}

Creating a Reddit App

  1. Go to reddit.com/prefs/apps
  2. Click "create another app" at the bottom
  3. Pick "script" for personal use or "web app" if building a service
  4. Set the redirect URI to http://localhost:8080 (for script apps, this doesn't matter)
  5. Note your client_id (under the app name) and client_secret

No credit card, no approval process. The whole thing takes 30 seconds.

Dependency Installation

pip install praw requests beautifulsoup4

PRAW Configuration File

Instead of hardcoding credentials, use a praw.ini file in your project root:

[DEFAULT]
client_id=your_client_id
client_secret=your_client_secret
user_agent=my-scraper/1.0 (by u/your_username)

Or configure programmatically -- see next section.


3. PRAW: The Standard Python Approach {#praw}

PRAW (Python Reddit API Wrapper) handles OAuth, rate limiting, and pagination automatically. You don't need to think about tokens or sleep timers.

pip install praw

Basic Setup

import praw

reddit = praw.Reddit(
    client_id="your_client_id",
    client_secret="your_client_secret",
    user_agent="my-scraper/1.0 (by u/your_username)",
)

# Read-only mode -- no login needed for public data
print(reddit.read_only)  # True

The user_agent matters. Reddit will throttle or block generic user agents. Include your app name and a contact username -- it's part of their API rules and keeps you from hitting silent rate limits.

Scraping Subreddit Posts

def scrape_subreddit(subreddit_name: str, sort: str = "hot", limit: int = 100,
                     time_filter: str = "week") -> list[dict]:
    """Pull posts from a subreddit with full metadata."""
    subreddit = reddit.subreddit(subreddit_name)
    posts = []

    listing_fn = {
        "hot": subreddit.hot,
        "new": subreddit.new,
        "top": lambda **kw: subreddit.top(time_filter=time_filter, **kw),
        "rising": subreddit.rising,
        "controversial": subreddit.controversial,
    }.get(sort, subreddit.hot)

    for post in listing_fn(limit=limit):
        posts.append({
            "id": post.id,
            "title": post.title,
            "author": str(post.author) if post.author else "[deleted]",
            "score": post.score,
            "upvote_ratio": post.upvote_ratio,
            "url": post.url,
            "domain": post.domain,
            "selftext": post.selftext,
            "selftext_html": post.selftext_html,
            "created_utc": post.created_utc,
            "num_comments": post.num_comments,
            "permalink": f"https://reddit.com{post.permalink}",
            "subreddit": post.subreddit.display_name,
            "flair": post.link_flair_text,
            "is_self": post.is_self,
            "is_video": post.is_video,
            "over_18": post.over_18,
            "spoiler": post.spoiler,
            "stickied": post.stickied,
            "locked": post.locked,
            "awards": post.total_awards_received,
            "gilded": post.gilded,
        })

    return posts

# Usage examples
hot_posts = scrape_subreddit("python", sort="hot", limit=100)
top_of_week = scrape_subreddit("datascience", sort="top", limit=50, time_filter="week")
top_of_alltime = scrape_subreddit("worldnews", sort="top", limit=1000, time_filter="all")

PRAW handles pagination internally. You can set limit=None to pull everything Reddit will give you (roughly 1,000 posts per listing endpoint). For .hot(), .new(), .top(), and .controversial(), that's the ceiling.

Streaming New Posts in Real Time

PRAW supports streaming for monitoring subreddits as new posts appear:

def stream_subreddit(subreddit_name: str, callback):
    """Stream new posts from a subreddit in real time."""
    subreddit = reddit.subreddit(subreddit_name)
    for post in subreddit.stream.submissions(skip_existing=True):
        callback({
            "id": post.id,
            "title": post.title,
            "score": post.score,
            "created_utc": post.created_utc,
            "permalink": f"https://reddit.com{post.permalink}",
        })

# Monitor multiple subreddits at once
def stream_multi(subreddits: list[str], callback):
    """Stream from multiple subreddits simultaneously."""
    combined = reddit.subreddit("+".join(subreddits))
    for post in combined.stream.submissions(skip_existing=True):
        callback(post)

Streaming is perfect for real-time brand monitoring, keyword alerts, or building news aggregators.


4. Scraping Comments -- Including Nested Replies {#comments}

Comments are where it gets interesting -- and messy. Reddit's comment trees can be deeply nested, and large threads have "load more comments" nodes that require separate API requests.

Full Comment Tree Extraction

from praw.models import MoreComments
import json

def scrape_comments(post_id: str, max_depth: int = 10,
                    max_comments: int = 5000) -> list[dict]:
    """Scrape all comments from a post, expanding 'more comments' nodes."""
    submission = reddit.submission(id=post_id)
    submission.comments.replace_more(limit=max_depth)

    comments = []
    def process_comment_tree(comment_list, depth=0):
        for comment in comment_list:
            if isinstance(comment, MoreComments):
                continue
            if len(comments) >= max_comments:
                return

            comments.append({
                "id": comment.id,
                "author": str(comment.author) if comment.author else "[deleted]",
                "body": comment.body,
                "body_html": comment.body_html,
                "score": comment.score,
                "ups": comment.ups,
                "created_utc": comment.created_utc,
                "parent_id": comment.parent_id,
                "link_id": comment.link_id,
                "is_submitter": comment.is_submitter,
                "distinguished": comment.distinguished,
                "gilded": comment.gilded,
                "depth": depth,
                "awards": comment.total_awards_received,
                "edited": comment.edited if comment.edited else False,
                "stickied": comment.stickied,
            })

            if comment.replies:
                process_comment_tree(comment.replies, depth=depth+1)

    process_comment_tree(submission.comments)
    return comments

# Usage
comments = scrape_comments("abc123")
print(f"Got {len(comments)} comments")

# Build comment tree structure
def build_tree(comments: list[dict]) -> dict:
    """Reconstruct nested comment tree from flat list."""
    by_id = {c["id"]: {**c, "replies": []} for c in comments}
    roots = []
    for c in comments:
        parent = c["parent_id"]
        if parent.startswith("t1_"):
            parent_id = parent[3:]
            if parent_id in by_id:
                by_id[parent_id]["replies"].append(by_id[c["id"]])
            else:
                roots.append(by_id[c["id"]])
        else:
            roots.append(by_id[c["id"]])
    return roots

The replace_more(limit=max_depth) call is the expensive part. Each "more comments" expansion is a separate API request. A viral post with thousands of comments could eat 20-30 requests just to fully expand the tree. Set limit=0 to skip expansions entirely if you only need top-level comments.

Scraping Comments for Multiple Posts Efficiently

import time
from concurrent.futures import ThreadPoolExecutor

def batch_scrape_comments(post_ids: list[str],
                          delay: float = 0.7) -> dict[str, list]:
    """Scrape comments for multiple posts with rate limiting."""
    results = {}
    for i, post_id in enumerate(post_ids):
        try:
            comments = scrape_comments(post_id, max_depth=5)
            results[post_id] = comments
            print(f"[{i+1}/{len(post_ids)}] Post {post_id}: {len(comments)} comments")
        except Exception as e:
            print(f"Error on {post_id}: {e}")
            results[post_id] = []
        time.sleep(delay)  # stay under 100 QPM
    return results

5. Handling Deleted, Removed, and Edited Content {#deleted}

Reddit has two types of missing content that look similar but aren't:

def classify_content(item) -> str:
    """Classify the visibility state of a Reddit post or comment."""
    body = getattr(item, 'body', None) or getattr(item, 'selftext', None) or ''
    author = item.author

    if author is None and body == "[deleted]":
        return "user_deleted"
    elif body == "[removed]":
        return "mod_removed"
    elif getattr(item, 'banned_by', None):
        return "admin_removed"
    elif getattr(item, 'locked', False):
        return "locked"
    return "active"

def safe_get_body(comment) -> str:
    """Get comment body, handling deleted/removed states gracefully."""
    if comment.body in ("[deleted]", "[removed]"):
        return ""
    return comment.body or ""

There's no way to recover deleted content through the official API. For that, you need archives -- see the PushShift section below.


6. The old.reddit.com HTML Approach {#old-reddit}

Reddit's old design at old.reddit.com still serves simpler HTML than the React-heavy new site. For quick scrapes without OAuth:

import requests
from bs4 import BeautifulSoup
import time
import random

def scrape_old_reddit(subreddit: str, pages: int = 5,
                      delay_range: tuple = (2, 4)) -> list[dict]:
    """Scrape posts from old.reddit.com - no API key needed."""
    posts = []
    after = None
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/131.0.0.0 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9",
    }

    for page_num in range(pages):
        url = f"https://old.reddit.com/r/{subreddit}/"
        params = {"after": after} if after else {}
        resp = requests.get(url, headers=headers, params=params, timeout=10)

        if resp.status_code == 429:
            print(f"Rate limited on page {page_num}, backing off 30s")
            time.sleep(30)
            continue
        if resp.status_code != 200:
            print(f"Got {resp.status_code} on page {page_num}")
            break

        soup = BeautifulSoup(resp.text, "html.parser")

        for thing in soup.select("div.thing[data-fullname]"):
            title_el = thing.select_one("a.title")
            score_el = thing.select_one("div.score.unvoted")
            time_el = thing.select_one("time.live-timestamp")
            author_el = thing.select_one("a.author")
            comment_el = thing.select_one("a.comments")
            flair_el = thing.select_one("span.flair")

            posts.append({
                "title": title_el.text.strip() if title_el else "",
                "score": thing.get("data-score", "0"),
                "fullname": thing.get("data-fullname", ""),
                "url": thing.get("data-url", ""),
                "subreddit": thing.get("data-subreddit", ""),
                "domain": thing.get("data-domain", ""),
                "author": author_el.text if author_el else "[deleted]",
                "timestamp": time_el.get("datetime", "") if time_el else "",
                "comments": comment_el.text.split()[0] if comment_el else "0",
                "flair": flair_el.text.strip() if flair_el else "",
                "nsfw": "nsfw" in (thing.get("class") or []),
                "link": title_el["href"] if title_el else "",
            })

        # Pagination
        next_btn = soup.select_one("span.next-button a")
        if next_btn:
            after = next_btn["href"].split("after=")[-1].split("&")[0]
        else:
            break

        time.sleep(random.uniform(*delay_range))

    return posts

This approach is fragile -- any HTML change breaks it -- but old.reddit.com has been remarkably stable since it hasn't had a redesign in years. Reddit has said they won't kill it.

The main downside: you're limited to about 25 posts per page, and aggressive scraping will get your IP blocked. Keep it under 1 request every 2-3 seconds.

Scraping Comments from old.reddit.com

Old Reddit also exposes comments in a cleaner HTML format:

def scrape_old_reddit_comments(post_url: str, max_pages: int = 5) -> list[dict]:
    """Scrape comments from an old.reddit.com thread."""
    comments = []
    if "reddit.com" in post_url and "old.reddit.com" not in post_url:
        post_url = post_url.replace("reddit.com", "old.reddit.com")

    headers = {"User-Agent": "Mozilla/5.0 (research bot, contact u/myuser)"}
    resp = requests.get(post_url, headers=headers)
    soup = BeautifulSoup(resp.text, "html.parser")

    for div in soup.select("div.comment"):
        author_el = div.select_one("a.author")
        body_el = div.select_one("div.md")
        score_el = div.select_one("span.score.unvoted")
        time_el = div.select_one("time.live-timestamp")

        comments.append({
            "author": author_el.text if author_el else "[deleted]",
            "body": body_el.get_text("\n").strip() if body_el else "",
            "score": score_el.text.strip() if score_el else "?",
            "timestamp": time_el.get("datetime", "") if time_el else "",
            "id": div.get("data-fullname", ""),
            "parent": div.get("data-fullname-parent", ""),
        })

    return comments

7. The .json URL Trick -- Still Alive in 2026 {#json-trick}

Reddit still returns JSON for .json-suffixed URLs, though rate limiting is stricter without OAuth:

import requests
import time

def fetch_reddit_json(url: str, params: dict = None,
                      session_token: str = None) -> dict:
    """Fetch Reddit JSON directly -- works on any public URL."""
    if not url.endswith(".json"):
        url = url.rstrip("/") + ".json"

    headers = {
        "User-Agent": "Mozilla/5.0 (research project; contact u/myuser)",
    }
    if session_token:
        headers["Authorization"] = f"Bearer {session_token}"

    resp = requests.get(url, headers=headers, params=params or {}, timeout=10)

    if resp.status_code == 429:
        retry_after = int(resp.headers.get("Retry-After", 60))
        time.sleep(retry_after)
        return fetch_reddit_json(url, params, session_token)

    resp.raise_for_status()
    return resp.json()

# Fetch hot posts from a subreddit
data = fetch_reddit_json("https://www.reddit.com/r/python/hot.json", {"limit": 25})
posts = data["data"]["children"]

# Fetch a specific post with its comments
post_data = fetch_reddit_json("https://www.reddit.com/r/python/comments/abc123/post_title.json")
post = post_data[0]["data"]["children"][0]["data"]
comment_tree = post_data[1]["data"]["children"]

The JSON API still supports the after cursor parameter for pagination:

def paginate_json_api(subreddit: str, sort: str = "new",
                      max_pages: int = 10) -> list[dict]:
    """Paginate through Reddit JSON API results."""
    all_posts = []
    after = None

    for _ in range(max_pages):
        params = {"limit": 100}
        if after:
            params["after"] = after

        url = f"https://www.reddit.com/r/{subreddit}/{sort}.json"
        data = fetch_reddit_json(url, params)
        items = data["data"]["children"]

        if not items:
            break

        all_posts.extend(i["data"] for i in items)
        after = data["data"]["after"]
        if not after:
            break
        time.sleep(1.2)  # Respect rate limits

    return all_posts

8. PushShift and Pullpush: Historical Archives {#pushshift}

PushShift was the gold standard for historical Reddit data. It archived every post and comment in near-real-time, and researchers relied on it for years. Then in 2023, Reddit cut off its access as part of the API changes.

The project partially recovered. Pullpush.io now serves as the community continuation, with data coverage that has gaps but is still useful for historical research.

Pullpush API -- Posts

import requests

def search_pullpush_posts(subreddit: str = None, query: str = None,
                           author: str = None, size: int = 100,
                           before: int = None, after: int = None) -> list[dict]:
    """Search historical Reddit posts via Pullpush API."""
    url = "https://api.pullpush.io/reddit/search/submission/"
    params = {"size": size, "sort": "desc", "sort_type": "score"}
    if subreddit:
        params["subreddit"] = subreddit
    if query:
        params["q"] = query
    if author:
        params["author"] = author
    if before:
        params["before"] = before
    if after:
        params["after"] = after

    resp = requests.get(url, params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("data", [])

# Find top posts about a topic
results = search_pullpush_posts(subreddit="python", query="web scraping", size=25)
for r in results[:5]:
    print(f"[{r.get('score', 0)}] {r['title']}")

Pullpush API -- Comments

def search_pullpush_comments(subreddit: str = None, query: str = None,
                              link_id: str = None, size: int = 100) -> list[dict]:
    """Search historical Reddit comments via Pullpush API."""
    url = "https://api.pullpush.io/reddit/search/comment/"
    params = {"size": size}
    if subreddit:
        params["subreddit"] = subreddit
    if query:
        params["q"] = query
    if link_id:
        params["link_id"] = link_id  # filter by post

    resp = requests.get(url, params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("data", [])

Important Limitations

The Pullpush API is free but has its own rate limits (around 1 request per second). Data coverage varies -- some months have complete archives, others have gaps where the ingestion pipeline went down. Don't rely on it for completeness, but it's invaluable for:


9. Rate Limits, Quotas, and Scaling Strategies {#rate-limits}

Reddit's 100 QPM limit is per OAuth token. For larger throughput:

Token-Based Rate Limiter

import time
import threading

class RedditRateLimiter:
    """Thread-safe rate limiter for Reddit API calls."""

    def __init__(self, requests_per_minute: int = 90):
        self.interval = 60.0 / requests_per_minute
        self.last_request = 0.0
        self._lock = threading.Lock()

    def wait(self):
        with self._lock:
            elapsed = time.time() - self.last_request
            if elapsed < self.interval:
                time.sleep(self.interval - elapsed)
            self.last_request = time.time()

limiter = RedditRateLimiter(90)  # Stay under 100 QPM

# Wrap PRAW calls
def rate_limited_request(fn, *args, **kwargs):
    limiter.wait()
    return fn(*args, **kwargs)

Multi-Token Rotation

If you need more throughput, create multiple Reddit apps under different accounts:

import praw
from itertools import cycle

REDDIT_CREDENTIALS = [
    {"client_id": "id1", "client_secret": "secret1", "user_agent": "app1/1.0"},
    {"client_id": "id2", "client_secret": "secret2", "user_agent": "app2/1.0"},
    {"client_id": "id3", "client_secret": "secret3", "user_agent": "app3/1.0"},
]

reddit_clients = [praw.Reddit(**creds) for creds in REDDIT_CREDENTIALS]
client_pool = cycle(reddit_clients)

def get_next_client():
    return next(client_pool)

Exponential Backoff for API Errors

import time
import random
from functools import wraps

def with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
                    print(f"Attempt {attempt+1} failed: {e}. Retrying in {delay:.1f}s")
                    time.sleep(delay)
        return wrapper
    return decorator

@with_backoff(max_retries=5)
def safe_fetch_posts(subreddit_name, limit=100):
    return scrape_subreddit(subreddit_name, limit=limit)

10. Proxy Rotation for Multi-Token Scraping {#proxies}

Reddit's rate limiting is IP-based in addition to token-based. When running multiple OAuth tokens and scraping at higher rates, rotating residential proxies help distribute the traffic.

ThorData provides rotating residential proxies with country targeting, sticky sessions, and HTTP/HTTPS/SOCKS5 support. Their residential pool makes each token's requests appear to originate from a different household IP.

import praw
import requests

# Using ThorData's rotating residential proxies
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000

def get_proxy_url(country: str = "US") -> str:
    """Build proxy URL with optional country targeting."""
    user = f"{THORDATA_USER}-country-{country.lower()}"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"

# For raw requests (JSON API, HTML scraping)
def proxied_request(url: str, country: str = "US", **kwargs) -> requests.Response:
    proxy_url = get_proxy_url(country)
    proxies = {"http": proxy_url, "https": proxy_url}
    headers = {"User-Agent": "Mozilla/5.0 (research project)"}
    return requests.get(url, proxies=proxies, headers=headers, **kwargs)

# Test proxy
resp = proxied_request("https://httpbin.org/ip")
print(f"Outbound IP: {resp.json()['origin']}")

For the PRAW OAuth approach, proxies matter less since the rate limit is per-token. But for the raw .json URL approach and old.reddit.com scraping, residential proxies prevent IP bans when collecting at volume.


11. Storing Reddit Data: SQLite and JSON Schemas {#storage}

SQLite Schema for Posts + Comments

import sqlite3
import json
from pathlib import Path

def init_db(db_path: str = "reddit_data.db") -> sqlite3.Connection:
    """Create optimized SQLite database for Reddit data."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")

    conn.execute("""
        CREATE TABLE IF NOT EXISTS posts (
            id TEXT PRIMARY KEY,
            subreddit TEXT NOT NULL,
            title TEXT,
            author TEXT,
            score INTEGER,
            upvote_ratio REAL,
            num_comments INTEGER,
            url TEXT,
            domain TEXT,
            selftext TEXT,
            created_utc REAL,
            permalink TEXT,
            flair TEXT,
            is_self INTEGER,
            over_18 INTEGER,
            locked INTEGER,
            awards INTEGER,
            scraped_at REAL
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS comments (
            id TEXT PRIMARY KEY,
            post_id TEXT NOT NULL,
            subreddit TEXT,
            author TEXT,
            body TEXT,
            score INTEGER,
            created_utc REAL,
            parent_id TEXT,
            depth INTEGER,
            is_submitter INTEGER,
            edited INTEGER,
            awards INTEGER,
            scraped_at REAL,
            FOREIGN KEY (post_id) REFERENCES posts(id)
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_posts_subreddit ON posts(subreddit)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_utc)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id)")

    conn.commit()
    return conn

def save_posts(conn: sqlite3.Connection, posts: list[dict]):
    """Bulk insert posts with upsert semantics."""
    import time
    now = time.time()
    conn.executemany("""
        INSERT OR REPLACE INTO posts VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    """, [
        (p["id"], p.get("subreddit",""), p.get("title",""),
         p.get("author",""), p.get("score",0), p.get("upvote_ratio",0),
         p.get("num_comments",0), p.get("url",""), p.get("domain",""),
         p.get("selftext",""), p.get("created_utc",0), p.get("permalink",""),
         p.get("flair",""), int(p.get("is_self",False)),
         int(p.get("over_18",False)), int(p.get("locked",False)),
         p.get("awards",0), now)
        for p in posts
    ])
    conn.commit()

def save_comments(conn: sqlite3.Connection, post_id: str, comments: list[dict]):
    """Bulk insert comments with upsert semantics."""
    import time
    now = time.time()
    conn.executemany("""
        INSERT OR REPLACE INTO comments VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
    """, [
        (c["id"], post_id, c.get("subreddit",""), c.get("author",""),
         c.get("body",""), c.get("score",0), c.get("created_utc",0),
         c.get("parent_id",""), c.get("depth",0),
         int(c.get("is_submitter",False)), int(c.get("edited",False)),
         c.get("awards",0), now)
        for c in comments
    ])
    conn.commit()

Exporting to JSONL for Analysis

import json

def export_to_jsonl(conn: sqlite3.Connection, output_path: str,
                    subreddit: str = None):
    """Export posts and comments to JSONL format for analysis."""
    query = "SELECT * FROM posts"
    params = ()
    if subreddit:
        query += " WHERE subreddit = ?"
        params = (subreddit,)

    with open(output_path, "w") as f:
        for row in conn.execute(query, params):
            columns = [d[0] for d in conn.execute(query, params).description]
            post = dict(zip(columns, row))
            f.write(json.dumps(post) + "\n")

12. Filtering, Deduplication, and Data Quality {#data-quality}

When collecting posts across multiple runs and multiple methods, you'll encounter duplicates and noisy data:

import hashlib

def deduplicate_posts(posts: list[dict]) -> list[dict]:
    """Remove duplicate posts by ID."""
    seen = set()
    unique = []
    for post in posts:
        pid = post.get("id")
        if pid and pid not in seen:
            seen.add(pid)
            unique.append(post)
    return unique

def clean_post(post: dict) -> dict:
    """Normalize and clean a post dict."""
    post = dict(post)
    # Normalize author field
    if post.get("author") in (None, "None", ""):
        post["author"] = "[deleted]"
    # Clean selftext
    if post.get("selftext") in ("[removed]", "[deleted]", None):
        post["selftext"] = ""
    # Ensure numeric types
    post["score"] = int(post.get("score") or 0)
    post["num_comments"] = int(post.get("num_comments") or 0)
    return post

def filter_posts(posts: list[dict], min_score: int = 0,
                 exclude_nsfw: bool = True,
                 exclude_removed: bool = True,
                 require_text: bool = False) -> list[dict]:
    """Filter posts by quality criteria."""
    filtered = []
    for post in posts:
        if post.get("score", 0) < min_score:
            continue
        if exclude_nsfw and post.get("over_18"):
            continue
        if exclude_removed and post.get("selftext") == "[removed]":
            continue
        if require_text and not post.get("selftext", "").strip():
            continue
        filtered.append(post)
    return filtered

13. Real Use Cases {#use-cases}

Brand Monitoring

def monitor_brand(brand_name: str, subreddits: list[str] = None) -> list[dict]:
    """Monitor mentions of a brand across Reddit."""
    target = reddit.subreddit("+".join(subreddits) if subreddits else "all")
    mentions = []
    for post in target.search(brand_name, sort="new", limit=100):
        mentions.append({
            "id": post.id,
            "title": post.title,
            "subreddit": post.subreddit.display_name,
            "score": post.score,
            "url": f"https://reddit.com{post.permalink}",
            "created_utc": post.created_utc,
        })
    return mentions

Sentiment Analysis Dataset Builder

def build_sentiment_dataset(subreddits: list[str],
                            min_score: int = 10) -> list[dict]:
    """Build a labeled sentiment dataset from Reddit comments."""
    dataset = []
    for sub_name in subreddits:
        posts = scrape_subreddit(sub_name, sort="top", limit=50, time_filter="month")
        for post in posts:
            if post["score"] < min_score:
                continue
            comments = scrape_comments(post["id"], max_depth=3)
            for c in comments:
                if len(c["body"]) > 20 and c["score"] != 0:
                    dataset.append({
                        "text": c["body"],
                        "score": c["score"],
                        "subreddit": sub_name,
                        "label": "positive" if c["score"] > 5 else
                                 "negative" if c["score"] < 0 else "neutral",
                    })
    return dataset

Trend Tracking Over Time

import sqlite3
from datetime import datetime, timedelta

def track_keyword_trends(keywords: list[str], subreddits: list[str],
                         db_path: str = "trends.db"):
    """Track daily mention frequency of keywords across subreddits."""
    conn = sqlite3.connect(db_path)
    conn.execute("""CREATE TABLE IF NOT EXISTS mentions (
        keyword TEXT, subreddit TEXT, date TEXT,
        post_count INTEGER, total_score INTEGER,
        PRIMARY KEY (keyword, subreddit, date))""")

    for keyword in keywords:
        for sub in subreddits:
            posts = search_pullpush_posts(subreddit=sub, query=keyword, size=100)
            today = datetime.utcnow().strftime("%Y-%m-%d")
            count = len(posts)
            total_score = sum(p.get("score", 0) for p in posts)
            conn.execute("INSERT OR REPLACE INTO mentions VALUES (?,?,?,?,?)",
                        (keyword, sub, today, count, total_score))
    conn.commit()

14. Common Errors and How to Fix Them {#errors}

Error Cause Fix
prawcore.exceptions.ResponseException: 401 Invalid credentials Re-check client_id and client_secret
prawcore.exceptions.TooManyRequests Rate limit exceeded PRAW handles this automatically; if persistent, reduce QPM
Reddit.RequestException: 403 App revoked or wrong permissions Recreate app; check if banned
Empty selftext Post deleted before scrape Check selftext for [removed]/[deleted]
replace_more taking forever Huge comment thread Set limit=5 for faster partial expansion
Stale after token in pagination Reddit invalidates cursors after 1h Restart pagination from scratch
RequestException: received 503 Reddit temporarily unavailable Retry with exponential backoff
All posts show score=1 Score fuzzing for new posts Normal -- scores stabilize after ~30min

15. Which Approach Should You Use? {#summary}

Use Case Recommended Approach
Small project, public data PRAW with a single app (100 QPM free)
Historical research pre-2023 Pullpush API for archived data, then PRAW for recent
Quick one-off scrape old.reddit.com HTML parsing
Real-time monitoring PRAW streaming (subreddit.stream.submissions)
Large-scale collection Multiple OAuth tokens + rate limiting + SQLite caching
Deleted/removed content Pullpush archives (incomplete coverage post-2023)
High-volume scraping Multiple tokens + ThorData residential proxies

Reddit's API situation could be worse. The free tier is functional, PRAW handles the complexity, and the data is still accessible if you follow the rules. Compared to what Twitter did to its ecosystem, Reddit developers got off relatively easy.

The main things to remember: always set a proper user agent, respect rate limits, cache your data locally, and use replace_more(limit=5) instead of full expansion on large threads unless you specifically need every reply. Reddit's API team does monitor usage patterns, and getting your app revoked means starting over.

For multi-account, high-throughput setups, routing your requests through ThorData's residential proxy network distributes the IP footprint across real residential addresses -- reducing the chance of IP-level throttling when you're pushing toward the QPM ceiling across multiple tokens.