← Back to blog

Scrape Stack Exchange Network: Q&A Data Across 170+ Sites via API (2026)

Scrape Stack Exchange Network: Q&A Data Across 170+ Sites via API (2026)

The Stack Exchange network hosts over 170 Q&A communities — Stack Overflow, Server Fault, Ask Ubuntu, Mathematics, and dozens of niche sites. Together, they contain hundreds of millions of questions and answers, all accessible through a single API.

The Stack Exchange API v2.3 is surprisingly generous. You get 300 requests/day without a key, and 10,000 requests/day with a free registered app key. Each request can return up to 100 items with filtering, so you can extract a lot of data within those limits.

What Makes Stack Exchange Data Valuable

Stack Exchange is one of the highest-quality structured knowledge datasets on the internet. Unlike social media, content is voted, curated, and filtered over years. The network's data is used extensively for:

The quarterly data dumps on archive.org contain the full dataset for bulk analysis, while the API supports real-time and incremental queries.

Getting Started with the API

Register a free app at stackapps.com to get an API key. No OAuth needed for read-only access to public data.

import httpx
import time
from datetime import datetime, timedelta
from typing import Generator

class StackExchangeClient:
    """Client for Stack Exchange API v2.3 with quota management."""

    BASE = "https://api.stackexchange.com/2.3"

    def __init__(self, api_key: str | None = None):
        self.api_key = api_key
        self.client = httpx.Client(timeout=15)
        self._quota_remaining = None
        self._backoff_until = 0

    def _request(self, endpoint: str, params: dict = None) -> dict:
        """Make an API request with automatic key injection and quota tracking."""
        # Respect any requested backoff
        now = time.time()
        if self._backoff_until > now:
            wait = self._backoff_until - now
            print(f"Backoff: waiting {wait:.1f}s")
            time.sleep(wait)

        params = params or {}
        if self.api_key:
            params["key"] = self.api_key

        response = self.client.get(f"{self.BASE}{endpoint}", params=params)
        data = response.json()

        self._quota_remaining = data.get("quota_remaining")

        if data.get("backoff"):
            self._backoff_until = time.time() + int(data["backoff"])
            print(f"API backoff: {data['backoff']}s (quota: {self._quota_remaining})")

        if "error_id" in data:
            raise Exception(
                f"API error {data['error_id']}: {data.get('error_message', '')}"
            )

        return data

    def _paginate(self, endpoint: str, params: dict = None,
                   max_pages: int = 10) -> Generator[dict, None, None]:
        """Auto-paginate through API results."""
        params = params or {}
        params.setdefault("pagesize", 100)

        for page in range(1, max_pages + 1):
            params["page"] = page
            data = self._request(endpoint, params)

            for item in data.get("items", []):
                yield item

            if not data.get("has_more", False):
                break

            # Respect API throttling — 1 request per 100ms safe
            time.sleep(0.1)

    @property
    def quota(self) -> int | None:
        return self._quota_remaining

    def quota_remaining_pct(self) -> float | None:
        if self._quota_remaining is None:
            return None
        daily = 10000 if self.api_key else 300
        return (self._quota_remaining / daily) * 100

Searching Questions and Answers

The search endpoints are the bread and butter. You can search by tags, keywords, date ranges, and score thresholds:

def search_questions(client: StackExchangeClient, site: str = "stackoverflow",
                      tagged: str | None = None, query: str | None = None,
                      min_score: int = 0, days: int = 30,
                      max_results: int = 500) -> list[dict]:
    """Search for questions with flexible filtering."""
    params = {
        "site": site,
        "sort": "votes",
        "order": "desc",
        "filter": "withbody",  # Include question body text
        "min": min_score,
    }

    if tagged:
        params["tagged"] = tagged

    from_date = int(
        (datetime.utcnow() - timedelta(days=days)).timestamp()
    )
    params["fromdate"] = from_date

    if query:
        endpoint = "/search/advanced"
        params["q"] = query
    else:
        endpoint = "/questions"

    results = []
    for item in client._paginate(endpoint, params):
        results.append({
            "question_id": item["question_id"],
            "title": item["title"],
            "score": item["score"],
            "view_count": item["view_count"],
            "answer_count": item["answer_count"],
            "is_answered": item["is_answered"],
            "accepted_answer_id": item.get("accepted_answer_id"),
            "tags": item["tags"],
            "creation_date": datetime.fromtimestamp(
                item["creation_date"]
            ).isoformat(),
            "last_activity": datetime.fromtimestamp(
                item.get("last_activity_date", item["creation_date"])
            ).isoformat(),
            "link": item["link"],
            "body_excerpt": item.get("body", "")[:500],
            "owner": item.get("owner", {}).get("display_name"),
            "owner_reputation": item.get("owner", {}).get("reputation", 0),
        })

        if len(results) >= max_results:
            break

    return results


# Top Python async questions from the last 30 days
se = StackExchangeClient(api_key="YOUR_KEY")
python_qs = search_questions(se, tagged="python", min_score=5, days=30)
print(f"Found {len(python_qs)} questions (quota remaining: {se.quota})")

for q in python_qs[:5]:
    print(f"  [{q['score']:>4}] {q['title'][:70]}")
    print(f"         {q['view_count']:,} views, {q['answer_count']} answers")

Fetching Answers with Full Content

Questions are useful, but the answers are where the real knowledge lives:

def get_answers(client: StackExchangeClient, question_ids: list[int],
                site: str = "stackoverflow") -> dict[int, list[dict]]:
    """Fetch answers for a batch of questions (up to 100 IDs)."""
    answers_by_question = {}

    # API supports up to 100 IDs per request
    for i in range(0, len(question_ids), 100):
        batch = question_ids[i:i + 100]
        ids_str = ";".join(str(qid) for qid in batch)

        data = client._request(f"/questions/{ids_str}/answers", {
            "site": site,
            "sort": "votes",
            "order": "desc",
            "filter": "withbody",
            "pagesize": 100,
        })

        for answer in data.get("items", []):
            qid = answer["question_id"]
            if qid not in answers_by_question:
                answers_by_question[qid] = []

            answers_by_question[qid].append({
                "answer_id": answer["answer_id"],
                "score": answer["score"],
                "is_accepted": answer["is_accepted"],
                "body": answer.get("body", ""),
                "creation_date": datetime.fromtimestamp(
                    answer["creation_date"]
                ).isoformat(),
                "owner": answer.get("owner", {}).get("display_name"),
                "owner_reputation": answer.get("owner", {}).get("reputation", 0),
            })

        time.sleep(0.1)

    return answers_by_question


def get_full_qa_pairs(client: StackExchangeClient, query: str,
                       site: str = "stackoverflow",
                       min_answer_score: int = 5) -> list[dict]:
    """Get questions with their accepted or top answers."""
    questions = search_questions(client, site=site, query=query,
                                  max_results=200)
    qids = [q["question_id"] for q in questions]

    print(f"Fetching answers for {len(qids)} questions...")
    answers_map = get_answers(client, qids, site=site)

    qa_pairs = []
    for q in questions:
        qid = q["question_id"]
        q_answers = answers_map.get(qid, [])

        # Get accepted answer or top-voted answer
        accepted = next(
            (a for a in q_answers if a["is_accepted"]), None
        )
        top = max(q_answers, key=lambda a: a["score"], default=None)
        best_answer = accepted or top

        if best_answer and best_answer["score"] >= min_answer_score:
            qa_pairs.append({
                "question": q["title"],
                "question_body": q["body_excerpt"],
                "tags": q["tags"],
                "question_score": q["score"],
                "answer_body": best_answer["body"],
                "answer_score": best_answer["score"],
                "is_accepted": best_answer["is_accepted"],
                "url": q["link"],
            })

    return qa_pairs

Cross-Site Data: Exploring All 170+ Communities

One of the unique things about the Stack Exchange API is that the same endpoints work across all sites in the network. You just change the site parameter:

def list_all_sites(client: StackExchangeClient) -> list[dict]:
    """Get metadata for all Stack Exchange sites sorted by question count."""
    sites = []
    for item in client._paginate("/sites", {"pagesize": 100}):
        stats = item.get("site_statistics", item.get("statistics", {}))
        sites.append({
            "name": item["name"],
            "api_site_parameter": item["api_site_parameter"],
            "site_url": item["site_url"],
            "audience": item.get("audience", ""),
            "total_questions": stats.get("total_questions", 0),
            "total_answers": stats.get("total_answers", 0),
            "total_users": stats.get("total_users", 0),
            "questions_per_day": stats.get("questions_per_day", 0),
            "answer_ratio": stats.get("answer_ratio", 0),
        })

    return sorted(sites, key=lambda s: -s["total_questions"])


def cross_site_search(client: StackExchangeClient, query: str,
                       sites: list[str] | None = None,
                       days: int = 30) -> dict[str, list[dict]]:
    """Search a query across multiple Stack Exchange sites."""
    if not sites:
        sites = [
            "stackoverflow", "serverfault", "superuser",
            "askubuntu", "unix", "dba", "security",
            "softwareengineering", "datascience", "ai",
        ]

    results = {}
    from_date = int(
        (datetime.utcnow() - timedelta(days=days)).timestamp()
    )

    for site in sites:
        try:
            data = client._request("/search/advanced", {
                "site": site,
                "q": query,
                "fromdate": from_date,
                "sort": "votes",
                "order": "desc",
                "pagesize": 10,
                "min": 1,
            })
            items = data.get("items", [])
            if items:
                results[site] = [{
                    "title": item["title"],
                    "score": item["score"],
                    "answers": item["answer_count"],
                    "link": item["link"],
                } for item in items]
        except Exception as e:
            print(f"Error querying {site}: {e}")

        time.sleep(0.15)

    return results

Tag Trend Analysis

Tags are a goldmine for tracking technology adoption. Here's how to analyze tag trends over time:

def get_tag_volume_by_month(client: StackExchangeClient, tag: str,
                              site: str = "stackoverflow",
                              months: int = 12) -> list[dict]:
    """Get monthly question volume for a specific tag."""
    monthly_data = []
    now = datetime.utcnow()

    for offset in range(months):
        end = now - timedelta(days=30 * offset)
        start = end - timedelta(days=30)

        data = client._request("/search/advanced", {
            "site": site,
            "tagged": tag,
            "fromdate": int(start.timestamp()),
            "todate": int(end.timestamp()),
            "filter": "total",
            "pagesize": 0,
        })

        monthly_data.append({
            "month": start.strftime("%Y-%m"),
            "total": data.get("total", 0),
            "tag": tag,
        })

        time.sleep(0.15)

    return list(reversed(monthly_data))  # Oldest first


def compare_tag_trends(client: StackExchangeClient, tags: list[str],
                        site: str = "stackoverflow",
                        months: int = 6) -> dict:
    """Compare multiple tags' question volume over time."""
    trends = {}
    for tag in tags:
        print(f"Fetching trend data for '{tag}'...")
        trends[tag] = get_tag_volume_by_month(client, tag, site, months)
        time.sleep(0.5)

    return trends


# Compare Python web framework trends
trends = compare_tag_trends(se, ["fastapi", "flask", "django"], months=6)
print("\nFramework question volume (last 6 months):\n")
for framework, data in trends.items():
    total = sum(m["total"] for m in data)
    recent = data[-1]["total"]
    print(f"  {framework:12s}: {total:5,} total, {recent:4} last month")
    for month in data:
        bar = "█" * (month["total"] // 20)
        print(f"    {month['month']}: {month['total']:4,}  {bar}")

The API Filter System

Stack Exchange's filter system is powerful and often overlooked. Instead of getting bloated responses with fields you don't need, create a custom filter that returns only what you want:

def create_custom_filter(client: StackExchangeClient, include: list[str],
                          base: str = "default") -> str:
    """Create a reusable API filter for efficient responses."""
    data = client._request("/filters/create", {
        "include": ";".join(include),
        "base": base,
        "unsafe": "false",
    })
    return data["items"][0]["filter"]


# Create a lightweight filter for bulk question collection
# This reduces response size by ~70% vs the default filter
light_filter = create_custom_filter(se, [
    "question.title",
    "question.score",
    "question.view_count",
    "question.tags",
    "question.creation_date",
    "question.is_answered",
    "question.accepted_answer_id",
    "question.link",
])


def bulk_collect_questions(client: StackExchangeClient,
                             tag: str, site: str = "stackoverflow",
                             days: int = 365,
                             filter_id: str = None) -> list[dict]:
    """Efficiently collect large volumes of questions for a tag."""
    from_ts = int((datetime.utcnow() - timedelta(days=days)).timestamp())
    params = {
        "site": site,
        "tagged": tag,
        "fromdate": from_ts,
        "sort": "creation",
        "order": "desc",
        "pagesize": 100,
    }
    if filter_id:
        params["filter"] = filter_id

    questions = []
    for item in client._paginate("/questions", params, max_pages=50):
        questions.append({
            "id": item["question_id"],
            "title": item["title"],
            "score": item["score"],
            "views": item["view_count"],
            "answered": item["is_answered"],
            "tags": item["tags"],
            "created": item["creation_date"],
            "link": item["link"],
        })

    return questions

Custom filters reduce response size by 60-80%, which means faster responses and more effective use of your daily quota.

Rate Limits and Quota Management

The API has a backoff mechanism — if you're requesting too fast, the response includes a backoff field telling you how many seconds to wait. The client above handles this automatically.

Tier Daily Requests Items/Request Daily Items
No key 300 100 30,000
With key 10,000 100 1,000,000
def monitor_quota(client: StackExchangeClient) -> None:
    """Print current API quota status."""
    data = client._request("/info", {"site": "stackoverflow"})
    pct = client.quota_remaining_pct()
    print(f"Quota remaining: {client.quota:,} requests "
          f"({pct:.1f}% of daily limit)")


def quota_aware_batch(client: StackExchangeClient,
                       items: list, process_fn,
                       min_quota: int = 100) -> list:
    """Process items while monitoring quota."""
    results = []
    for i, item in enumerate(items):
        if client.quota is not None and client.quota < min_quota:
            print(f"Quota low ({client.quota}), stopping at {i}/{len(items)}")
            break

        try:
            result = process_fn(item)
            results.append(result)
        except Exception as e:
            print(f"Error processing item {i}: {e}")

        if i % 50 == 0 and i > 0:
            print(f"  Progress: {i}/{len(items)}, quota: {client.quota}")

    return results

Storing Data in SQLite

import sqlite3
import json

def init_db(db_path: str = "stackexchange.db") -> sqlite3.Connection:
    """Initialize database for Stack Exchange data."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS questions (
            question_id INTEGER PRIMARY KEY,
            site TEXT NOT NULL,
            title TEXT,
            score INTEGER DEFAULT 0,
            view_count INTEGER DEFAULT 0,
            answer_count INTEGER DEFAULT 0,
            is_answered INTEGER DEFAULT 0,
            accepted_answer_id INTEGER,
            tags TEXT,
            owner TEXT,
            owner_reputation INTEGER DEFAULT 0,
            creation_date TEXT,
            body TEXT,
            link TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS answers (
            answer_id INTEGER PRIMARY KEY,
            question_id INTEGER NOT NULL,
            site TEXT NOT NULL,
            score INTEGER DEFAULT 0,
            is_accepted INTEGER DEFAULT 0,
            owner TEXT,
            owner_reputation INTEGER DEFAULT 0,
            creation_date TEXT,
            body TEXT,
            FOREIGN KEY (question_id) REFERENCES questions(question_id)
        );

        CREATE TABLE IF NOT EXISTS tags (
            site TEXT,
            tag_name TEXT,
            count INTEGER,
            scraped_date TEXT,
            PRIMARY KEY (site, tag_name, scraped_date)
        );

        CREATE INDEX IF NOT EXISTS idx_q_site ON questions(site);
        CREATE INDEX IF NOT EXISTS idx_q_score ON questions(score);
        CREATE INDEX IF NOT EXISTS idx_q_created ON questions(creation_date);
        CREATE INDEX IF NOT EXISTS idx_a_question ON answers(question_id);
    """)
    conn.commit()
    return conn


def save_questions(conn: sqlite3.Connection, questions: list[dict],
                    site: str) -> int:
    """Batch save questions. Returns count inserted."""
    count = 0
    for q in questions:
        try:
            conn.execute("""
                INSERT OR IGNORE INTO questions
                (question_id, site, title, score, view_count,
                 answer_count, is_answered, accepted_answer_id,
                 tags, owner, owner_reputation, creation_date,
                 body, link)
                VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
            """, (
                q["question_id"], site, q.get("title"),
                q.get("score", 0), q.get("view_count", 0),
                q.get("answer_count", 0),
                1 if q.get("is_answered") else 0,
                q.get("accepted_answer_id"),
                json.dumps(q.get("tags", [])),
                q.get("owner"), q.get("owner_reputation", 0),
                q.get("creation_date"),
                q.get("body_excerpt", ""), q.get("link"),
            ))
            count += 1
        except sqlite3.Error:
            pass

    conn.commit()
    return count

Bulk Data: Stack Exchange Data Dumps

For large-scale analysis, the API isn't the right tool. Stack Exchange publishes quarterly data dumps on archive.org containing the complete dataset in XML format:

import xml.etree.ElementTree as ET

def parse_posts_xml(xml_path: str, max_rows: int = 100_000,
                     min_score: int = 0) -> list[dict]:
    """
    Parse a Stack Exchange data dump Posts.xml file.

    Download from: https://archive.org/details/stackexchange
    Each site has its own archive, e.g., stackoverflow.com-Posts.7z
    """
    posts = []

    for event, elem in ET.iterparse(xml_path, events=("end",)):
        if elem.tag != "row":
            continue

        score = int(elem.get("Score", 0))
        if score < min_score:
            elem.clear()
            continue

        post_type = int(elem.get("PostTypeId", 0))
        post = {
            "id": int(elem.get("Id", 0)),
            "post_type": post_type,  # 1=question, 2=answer
            "score": score,
            "view_count": int(elem.get("ViewCount", 0)) if post_type == 1 else 0,
            "title": elem.get("Title", ""),
            "tags": elem.get("Tags", "").strip("<>").replace("><", ","),
            "answer_count": int(elem.get("AnswerCount", 0)) if post_type == 1 else 0,
            "accepted_answer_id": elem.get("AcceptedAnswerId"),
            "parent_id": elem.get("ParentId"),  # for answers
            "creation_date": elem.get("CreationDate", ""),
            "body": elem.get("Body", "")[:1000],  # truncate for storage
        }
        posts.append(post)
        elem.clear()

        if len(posts) >= max_rows:
            break

    return posts


def load_dump_to_db(xml_path: str, db_path: str = "stackexchange.db",
                     site: str = "stackoverflow") -> None:
    """Load a data dump XML file into SQLite."""
    conn = init_db(db_path)
    print(f"Parsing {xml_path}...")

    posts = parse_posts_xml(xml_path, max_rows=500_000, min_score=1)
    questions = [p for p in posts if p["post_type"] == 1]
    answers = [p for p in posts if p["post_type"] == 2]

    print(f"Loaded {len(questions)} questions, {len(answers)} answers")

    # Save in batches
    saved_q = save_questions(conn, questions, site)
    print(f"Saved {saved_q} questions")

    conn.close()

Practical Applications and Analytical Queries

Once data is in SQLite, you can run complex analyses:

def analyze_tag_ecosystem(conn: sqlite3.Connection,
                            site: str = "stackoverflow") -> None:
    """Analyze the tag ecosystem for a Stack Exchange site."""
    print(f"=== Tag Analysis for {site} ===\n")

    # Top tags by question count
    print("Top 10 tags by question volume:")
    for row in conn.execute("""
        SELECT value as tag, COUNT(*) as count,
               AVG(score) as avg_score,
               AVG(view_count) as avg_views
        FROM questions,
             json_each('[' || REPLACE(REPLACE(tags, ',', '","'), ' ', '') || ']')
        WHERE site = ?
        GROUP BY tag
        ORDER BY count DESC
        LIMIT 10
    """, (site,)):
        print(f"  {row[0]:20s}: {row[1]:5,} questions, "
              f"avg score {row[2]:.1f}, avg {row[3]:,.0f} views")

    # Questions with high views but no accepted answer
    print("\nHigh-traffic unanswered questions (documentation gaps):")
    for row in conn.execute("""
        SELECT title, view_count, score, tags, link
        FROM questions
        WHERE site = ?
          AND is_answered = 0
          AND accepted_answer_id IS NULL
          AND view_count > 10000
        ORDER BY view_count DESC
        LIMIT 10
    """, (site,)):
        print(f"  {row[1]:8,} views | {row[0][:60]}")

When you need to scrape the Stack Exchange website directly (not the API) — for user profile analytics, certain mod tools data, or real-time websocket feeds — you'll face standard anti-bot protections including Cloudflare. In that scenario, residential proxies from a provider like ThorData help maintain access without triggering rate limits on the web frontend.

Key Takeaways