Scrape Stack Exchange Network: Q&A Data Across 170+ Sites via API (2026)
Scrape Stack Exchange Network: Q&A Data Across 170+ Sites via API (2026)
The Stack Exchange network hosts over 170 Q&A communities — Stack Overflow, Server Fault, Ask Ubuntu, Mathematics, and dozens of niche sites. Together, they contain hundreds of millions of questions and answers, all accessible through a single API.
The Stack Exchange API v2.3 is surprisingly generous. You get 300 requests/day without a key, and 10,000 requests/day with a free registered app key. Each request can return up to 100 items with filtering, so you can extract a lot of data within those limits.
What Makes Stack Exchange Data Valuable
Stack Exchange is one of the highest-quality structured knowledge datasets on the internet. Unlike social media, content is voted, curated, and filtered over years. The network's data is used extensively for:
- Training datasets for AI — High-quality question/answer pairs with known quality signals (votes, accepted status)
- Technology trend analysis — Tag volumes over time reveal adoption curves for programming languages and frameworks
- Developer tooling — Build search, recommendation, or documentation tools using the API as a backend
- Academic research — Hundreds of published papers use Stack Exchange data to study collaborative knowledge production
- Documentation gap analysis — Unanswered questions with high views identify where official docs are failing
The quarterly data dumps on archive.org contain the full dataset for bulk analysis, while the API supports real-time and incremental queries.
Getting Started with the API
Register a free app at stackapps.com to get an API key. No OAuth needed for read-only access to public data.
import httpx
import time
from datetime import datetime, timedelta
from typing import Generator
class StackExchangeClient:
"""Client for Stack Exchange API v2.3 with quota management."""
BASE = "https://api.stackexchange.com/2.3"
def __init__(self, api_key: str | None = None):
self.api_key = api_key
self.client = httpx.Client(timeout=15)
self._quota_remaining = None
self._backoff_until = 0
def _request(self, endpoint: str, params: dict = None) -> dict:
"""Make an API request with automatic key injection and quota tracking."""
# Respect any requested backoff
now = time.time()
if self._backoff_until > now:
wait = self._backoff_until - now
print(f"Backoff: waiting {wait:.1f}s")
time.sleep(wait)
params = params or {}
if self.api_key:
params["key"] = self.api_key
response = self.client.get(f"{self.BASE}{endpoint}", params=params)
data = response.json()
self._quota_remaining = data.get("quota_remaining")
if data.get("backoff"):
self._backoff_until = time.time() + int(data["backoff"])
print(f"API backoff: {data['backoff']}s (quota: {self._quota_remaining})")
if "error_id" in data:
raise Exception(
f"API error {data['error_id']}: {data.get('error_message', '')}"
)
return data
def _paginate(self, endpoint: str, params: dict = None,
max_pages: int = 10) -> Generator[dict, None, None]:
"""Auto-paginate through API results."""
params = params or {}
params.setdefault("pagesize", 100)
for page in range(1, max_pages + 1):
params["page"] = page
data = self._request(endpoint, params)
for item in data.get("items", []):
yield item
if not data.get("has_more", False):
break
# Respect API throttling — 1 request per 100ms safe
time.sleep(0.1)
@property
def quota(self) -> int | None:
return self._quota_remaining
def quota_remaining_pct(self) -> float | None:
if self._quota_remaining is None:
return None
daily = 10000 if self.api_key else 300
return (self._quota_remaining / daily) * 100
Searching Questions and Answers
The search endpoints are the bread and butter. You can search by tags, keywords, date ranges, and score thresholds:
def search_questions(client: StackExchangeClient, site: str = "stackoverflow",
tagged: str | None = None, query: str | None = None,
min_score: int = 0, days: int = 30,
max_results: int = 500) -> list[dict]:
"""Search for questions with flexible filtering."""
params = {
"site": site,
"sort": "votes",
"order": "desc",
"filter": "withbody", # Include question body text
"min": min_score,
}
if tagged:
params["tagged"] = tagged
from_date = int(
(datetime.utcnow() - timedelta(days=days)).timestamp()
)
params["fromdate"] = from_date
if query:
endpoint = "/search/advanced"
params["q"] = query
else:
endpoint = "/questions"
results = []
for item in client._paginate(endpoint, params):
results.append({
"question_id": item["question_id"],
"title": item["title"],
"score": item["score"],
"view_count": item["view_count"],
"answer_count": item["answer_count"],
"is_answered": item["is_answered"],
"accepted_answer_id": item.get("accepted_answer_id"),
"tags": item["tags"],
"creation_date": datetime.fromtimestamp(
item["creation_date"]
).isoformat(),
"last_activity": datetime.fromtimestamp(
item.get("last_activity_date", item["creation_date"])
).isoformat(),
"link": item["link"],
"body_excerpt": item.get("body", "")[:500],
"owner": item.get("owner", {}).get("display_name"),
"owner_reputation": item.get("owner", {}).get("reputation", 0),
})
if len(results) >= max_results:
break
return results
# Top Python async questions from the last 30 days
se = StackExchangeClient(api_key="YOUR_KEY")
python_qs = search_questions(se, tagged="python", min_score=5, days=30)
print(f"Found {len(python_qs)} questions (quota remaining: {se.quota})")
for q in python_qs[:5]:
print(f" [{q['score']:>4}] {q['title'][:70]}")
print(f" {q['view_count']:,} views, {q['answer_count']} answers")
Fetching Answers with Full Content
Questions are useful, but the answers are where the real knowledge lives:
def get_answers(client: StackExchangeClient, question_ids: list[int],
site: str = "stackoverflow") -> dict[int, list[dict]]:
"""Fetch answers for a batch of questions (up to 100 IDs)."""
answers_by_question = {}
# API supports up to 100 IDs per request
for i in range(0, len(question_ids), 100):
batch = question_ids[i:i + 100]
ids_str = ";".join(str(qid) for qid in batch)
data = client._request(f"/questions/{ids_str}/answers", {
"site": site,
"sort": "votes",
"order": "desc",
"filter": "withbody",
"pagesize": 100,
})
for answer in data.get("items", []):
qid = answer["question_id"]
if qid not in answers_by_question:
answers_by_question[qid] = []
answers_by_question[qid].append({
"answer_id": answer["answer_id"],
"score": answer["score"],
"is_accepted": answer["is_accepted"],
"body": answer.get("body", ""),
"creation_date": datetime.fromtimestamp(
answer["creation_date"]
).isoformat(),
"owner": answer.get("owner", {}).get("display_name"),
"owner_reputation": answer.get("owner", {}).get("reputation", 0),
})
time.sleep(0.1)
return answers_by_question
def get_full_qa_pairs(client: StackExchangeClient, query: str,
site: str = "stackoverflow",
min_answer_score: int = 5) -> list[dict]:
"""Get questions with their accepted or top answers."""
questions = search_questions(client, site=site, query=query,
max_results=200)
qids = [q["question_id"] for q in questions]
print(f"Fetching answers for {len(qids)} questions...")
answers_map = get_answers(client, qids, site=site)
qa_pairs = []
for q in questions:
qid = q["question_id"]
q_answers = answers_map.get(qid, [])
# Get accepted answer or top-voted answer
accepted = next(
(a for a in q_answers if a["is_accepted"]), None
)
top = max(q_answers, key=lambda a: a["score"], default=None)
best_answer = accepted or top
if best_answer and best_answer["score"] >= min_answer_score:
qa_pairs.append({
"question": q["title"],
"question_body": q["body_excerpt"],
"tags": q["tags"],
"question_score": q["score"],
"answer_body": best_answer["body"],
"answer_score": best_answer["score"],
"is_accepted": best_answer["is_accepted"],
"url": q["link"],
})
return qa_pairs
Cross-Site Data: Exploring All 170+ Communities
One of the unique things about the Stack Exchange API is that the same endpoints work across all sites in the network. You just change the site parameter:
def list_all_sites(client: StackExchangeClient) -> list[dict]:
"""Get metadata for all Stack Exchange sites sorted by question count."""
sites = []
for item in client._paginate("/sites", {"pagesize": 100}):
stats = item.get("site_statistics", item.get("statistics", {}))
sites.append({
"name": item["name"],
"api_site_parameter": item["api_site_parameter"],
"site_url": item["site_url"],
"audience": item.get("audience", ""),
"total_questions": stats.get("total_questions", 0),
"total_answers": stats.get("total_answers", 0),
"total_users": stats.get("total_users", 0),
"questions_per_day": stats.get("questions_per_day", 0),
"answer_ratio": stats.get("answer_ratio", 0),
})
return sorted(sites, key=lambda s: -s["total_questions"])
def cross_site_search(client: StackExchangeClient, query: str,
sites: list[str] | None = None,
days: int = 30) -> dict[str, list[dict]]:
"""Search a query across multiple Stack Exchange sites."""
if not sites:
sites = [
"stackoverflow", "serverfault", "superuser",
"askubuntu", "unix", "dba", "security",
"softwareengineering", "datascience", "ai",
]
results = {}
from_date = int(
(datetime.utcnow() - timedelta(days=days)).timestamp()
)
for site in sites:
try:
data = client._request("/search/advanced", {
"site": site,
"q": query,
"fromdate": from_date,
"sort": "votes",
"order": "desc",
"pagesize": 10,
"min": 1,
})
items = data.get("items", [])
if items:
results[site] = [{
"title": item["title"],
"score": item["score"],
"answers": item["answer_count"],
"link": item["link"],
} for item in items]
except Exception as e:
print(f"Error querying {site}: {e}")
time.sleep(0.15)
return results
Tag Trend Analysis
Tags are a goldmine for tracking technology adoption. Here's how to analyze tag trends over time:
def get_tag_volume_by_month(client: StackExchangeClient, tag: str,
site: str = "stackoverflow",
months: int = 12) -> list[dict]:
"""Get monthly question volume for a specific tag."""
monthly_data = []
now = datetime.utcnow()
for offset in range(months):
end = now - timedelta(days=30 * offset)
start = end - timedelta(days=30)
data = client._request("/search/advanced", {
"site": site,
"tagged": tag,
"fromdate": int(start.timestamp()),
"todate": int(end.timestamp()),
"filter": "total",
"pagesize": 0,
})
monthly_data.append({
"month": start.strftime("%Y-%m"),
"total": data.get("total", 0),
"tag": tag,
})
time.sleep(0.15)
return list(reversed(monthly_data)) # Oldest first
def compare_tag_trends(client: StackExchangeClient, tags: list[str],
site: str = "stackoverflow",
months: int = 6) -> dict:
"""Compare multiple tags' question volume over time."""
trends = {}
for tag in tags:
print(f"Fetching trend data for '{tag}'...")
trends[tag] = get_tag_volume_by_month(client, tag, site, months)
time.sleep(0.5)
return trends
# Compare Python web framework trends
trends = compare_tag_trends(se, ["fastapi", "flask", "django"], months=6)
print("\nFramework question volume (last 6 months):\n")
for framework, data in trends.items():
total = sum(m["total"] for m in data)
recent = data[-1]["total"]
print(f" {framework:12s}: {total:5,} total, {recent:4} last month")
for month in data:
bar = "█" * (month["total"] // 20)
print(f" {month['month']}: {month['total']:4,} {bar}")
The API Filter System
Stack Exchange's filter system is powerful and often overlooked. Instead of getting bloated responses with fields you don't need, create a custom filter that returns only what you want:
def create_custom_filter(client: StackExchangeClient, include: list[str],
base: str = "default") -> str:
"""Create a reusable API filter for efficient responses."""
data = client._request("/filters/create", {
"include": ";".join(include),
"base": base,
"unsafe": "false",
})
return data["items"][0]["filter"]
# Create a lightweight filter for bulk question collection
# This reduces response size by ~70% vs the default filter
light_filter = create_custom_filter(se, [
"question.title",
"question.score",
"question.view_count",
"question.tags",
"question.creation_date",
"question.is_answered",
"question.accepted_answer_id",
"question.link",
])
def bulk_collect_questions(client: StackExchangeClient,
tag: str, site: str = "stackoverflow",
days: int = 365,
filter_id: str = None) -> list[dict]:
"""Efficiently collect large volumes of questions for a tag."""
from_ts = int((datetime.utcnow() - timedelta(days=days)).timestamp())
params = {
"site": site,
"tagged": tag,
"fromdate": from_ts,
"sort": "creation",
"order": "desc",
"pagesize": 100,
}
if filter_id:
params["filter"] = filter_id
questions = []
for item in client._paginate("/questions", params, max_pages=50):
questions.append({
"id": item["question_id"],
"title": item["title"],
"score": item["score"],
"views": item["view_count"],
"answered": item["is_answered"],
"tags": item["tags"],
"created": item["creation_date"],
"link": item["link"],
})
return questions
Custom filters reduce response size by 60-80%, which means faster responses and more effective use of your daily quota.
Rate Limits and Quota Management
The API has a backoff mechanism — if you're requesting too fast, the response includes a backoff field telling you how many seconds to wait. The client above handles this automatically.
| Tier | Daily Requests | Items/Request | Daily Items |
|---|---|---|---|
| No key | 300 | 100 | 30,000 |
| With key | 10,000 | 100 | 1,000,000 |
def monitor_quota(client: StackExchangeClient) -> None:
"""Print current API quota status."""
data = client._request("/info", {"site": "stackoverflow"})
pct = client.quota_remaining_pct()
print(f"Quota remaining: {client.quota:,} requests "
f"({pct:.1f}% of daily limit)")
def quota_aware_batch(client: StackExchangeClient,
items: list, process_fn,
min_quota: int = 100) -> list:
"""Process items while monitoring quota."""
results = []
for i, item in enumerate(items):
if client.quota is not None and client.quota < min_quota:
print(f"Quota low ({client.quota}), stopping at {i}/{len(items)}")
break
try:
result = process_fn(item)
results.append(result)
except Exception as e:
print(f"Error processing item {i}: {e}")
if i % 50 == 0 and i > 0:
print(f" Progress: {i}/{len(items)}, quota: {client.quota}")
return results
Storing Data in SQLite
import sqlite3
import json
def init_db(db_path: str = "stackexchange.db") -> sqlite3.Connection:
"""Initialize database for Stack Exchange data."""
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS questions (
question_id INTEGER PRIMARY KEY,
site TEXT NOT NULL,
title TEXT,
score INTEGER DEFAULT 0,
view_count INTEGER DEFAULT 0,
answer_count INTEGER DEFAULT 0,
is_answered INTEGER DEFAULT 0,
accepted_answer_id INTEGER,
tags TEXT,
owner TEXT,
owner_reputation INTEGER DEFAULT 0,
creation_date TEXT,
body TEXT,
link TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS answers (
answer_id INTEGER PRIMARY KEY,
question_id INTEGER NOT NULL,
site TEXT NOT NULL,
score INTEGER DEFAULT 0,
is_accepted INTEGER DEFAULT 0,
owner TEXT,
owner_reputation INTEGER DEFAULT 0,
creation_date TEXT,
body TEXT,
FOREIGN KEY (question_id) REFERENCES questions(question_id)
);
CREATE TABLE IF NOT EXISTS tags (
site TEXT,
tag_name TEXT,
count INTEGER,
scraped_date TEXT,
PRIMARY KEY (site, tag_name, scraped_date)
);
CREATE INDEX IF NOT EXISTS idx_q_site ON questions(site);
CREATE INDEX IF NOT EXISTS idx_q_score ON questions(score);
CREATE INDEX IF NOT EXISTS idx_q_created ON questions(creation_date);
CREATE INDEX IF NOT EXISTS idx_a_question ON answers(question_id);
""")
conn.commit()
return conn
def save_questions(conn: sqlite3.Connection, questions: list[dict],
site: str) -> int:
"""Batch save questions. Returns count inserted."""
count = 0
for q in questions:
try:
conn.execute("""
INSERT OR IGNORE INTO questions
(question_id, site, title, score, view_count,
answer_count, is_answered, accepted_answer_id,
tags, owner, owner_reputation, creation_date,
body, link)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
q["question_id"], site, q.get("title"),
q.get("score", 0), q.get("view_count", 0),
q.get("answer_count", 0),
1 if q.get("is_answered") else 0,
q.get("accepted_answer_id"),
json.dumps(q.get("tags", [])),
q.get("owner"), q.get("owner_reputation", 0),
q.get("creation_date"),
q.get("body_excerpt", ""), q.get("link"),
))
count += 1
except sqlite3.Error:
pass
conn.commit()
return count
Bulk Data: Stack Exchange Data Dumps
For large-scale analysis, the API isn't the right tool. Stack Exchange publishes quarterly data dumps on archive.org containing the complete dataset in XML format:
import xml.etree.ElementTree as ET
def parse_posts_xml(xml_path: str, max_rows: int = 100_000,
min_score: int = 0) -> list[dict]:
"""
Parse a Stack Exchange data dump Posts.xml file.
Download from: https://archive.org/details/stackexchange
Each site has its own archive, e.g., stackoverflow.com-Posts.7z
"""
posts = []
for event, elem in ET.iterparse(xml_path, events=("end",)):
if elem.tag != "row":
continue
score = int(elem.get("Score", 0))
if score < min_score:
elem.clear()
continue
post_type = int(elem.get("PostTypeId", 0))
post = {
"id": int(elem.get("Id", 0)),
"post_type": post_type, # 1=question, 2=answer
"score": score,
"view_count": int(elem.get("ViewCount", 0)) if post_type == 1 else 0,
"title": elem.get("Title", ""),
"tags": elem.get("Tags", "").strip("<>").replace("><", ","),
"answer_count": int(elem.get("AnswerCount", 0)) if post_type == 1 else 0,
"accepted_answer_id": elem.get("AcceptedAnswerId"),
"parent_id": elem.get("ParentId"), # for answers
"creation_date": elem.get("CreationDate", ""),
"body": elem.get("Body", "")[:1000], # truncate for storage
}
posts.append(post)
elem.clear()
if len(posts) >= max_rows:
break
return posts
def load_dump_to_db(xml_path: str, db_path: str = "stackexchange.db",
site: str = "stackoverflow") -> None:
"""Load a data dump XML file into SQLite."""
conn = init_db(db_path)
print(f"Parsing {xml_path}...")
posts = parse_posts_xml(xml_path, max_rows=500_000, min_score=1)
questions = [p for p in posts if p["post_type"] == 1]
answers = [p for p in posts if p["post_type"] == 2]
print(f"Loaded {len(questions)} questions, {len(answers)} answers")
# Save in batches
saved_q = save_questions(conn, questions, site)
print(f"Saved {saved_q} questions")
conn.close()
Practical Applications and Analytical Queries
Once data is in SQLite, you can run complex analyses:
def analyze_tag_ecosystem(conn: sqlite3.Connection,
site: str = "stackoverflow") -> None:
"""Analyze the tag ecosystem for a Stack Exchange site."""
print(f"=== Tag Analysis for {site} ===\n")
# Top tags by question count
print("Top 10 tags by question volume:")
for row in conn.execute("""
SELECT value as tag, COUNT(*) as count,
AVG(score) as avg_score,
AVG(view_count) as avg_views
FROM questions,
json_each('[' || REPLACE(REPLACE(tags, ',', '","'), ' ', '') || ']')
WHERE site = ?
GROUP BY tag
ORDER BY count DESC
LIMIT 10
""", (site,)):
print(f" {row[0]:20s}: {row[1]:5,} questions, "
f"avg score {row[2]:.1f}, avg {row[3]:,.0f} views")
# Questions with high views but no accepted answer
print("\nHigh-traffic unanswered questions (documentation gaps):")
for row in conn.execute("""
SELECT title, view_count, score, tags, link
FROM questions
WHERE site = ?
AND is_answered = 0
AND accepted_answer_id IS NULL
AND view_count > 10000
ORDER BY view_count DESC
LIMIT 10
""", (site,)):
print(f" {row[1]:8,} views | {row[0][:60]}")
When you need to scrape the Stack Exchange website directly (not the API) — for user profile analytics, certain mod tools data, or real-time websocket feeds — you'll face standard anti-bot protections including Cloudflare. In that scenario, residential proxies from a provider like ThorData help maintain access without triggering rate limits on the web frontend.
Key Takeaways
- The Stack Exchange API v2.3 is one of the most developer-friendly public APIs available — generous quotas, comprehensive documentation, and consistent endpoint design.
- With a free API key, you get 10,000 requests/day and up to 100 items per request, giving you up to 1 million data points daily.
- Use custom filters to reduce response payload size by 60-80% — critical for staying under quota on large collection tasks.
- The quarterly data dumps on archive.org are the right tool for bulk analysis; the API is better for real-time and incremental queries.
- Tag trend analysis over time is one of the most valuable use cases — monthly question volumes for technology tags are leading indicators of adoption trends.
- Cross-site queries work seamlessly — the same code with a changed
siteparameter gives you data from any of the 170+ communities in the network.