How to Scrape Stack Overflow with Python: API v2 Guide (2026)
How to Scrape Stack Overflow with Python: API v2 Guide (2026)
Stack Overflow has a proper API. You don't need to scrape HTML — the API v2 gives you questions, answers, users, tags, and comments as structured JSON. It's well-documented, reasonably generous with quotas, and supports complex filtering.
The catch: without authentication you get 300 requests/day. With a free API key, you get 10,000. And the response format has some quirks that'll trip you up if you're not paying attention. This guide covers everything you need to go from zero to a working dataset, including quota management, batching, and what to do when 10,000 requests per day isn't enough.
Getting an API Key
Register an app at stackapps.com/apps/oauth/register. You need a Stack Exchange account. The key is free and instantly available — you don't need OAuth for read-only access.
Your key goes in the key query parameter on every request. Without it, you're capped at 300 requests per UTC day, which runs out in under an hour if you're doing any real work. With it, 10,000.
Installation and Base Setup
pip install httpx beautifulsoup4 lxml
import httpx
import time
import json
import html
from datetime import datetime
API_BASE = "https://api.stackexchange.com/2.3"
API_KEY = "your_api_key_here" # register free at stackapps.com
# Create a persistent client — stack overflow doesn't require special headers
client = httpx.Client(
timeout=20,
headers={
"Accept-Encoding": "gzip, deflate",
"User-Agent": "YourApp/1.0 ([email protected])",
}
)
def stack_request(endpoint: str, **params) -> dict:
"""Make a request to the Stack Overflow API with proper error handling."""
params.setdefault("site", "stackoverflow")
params.setdefault("key", API_KEY)
resp = client.get(f"{API_BASE}{endpoint}", params=params)
if resp.status_code == 400:
print(f"Bad request: {resp.json().get('error_message', 'Unknown error')}")
return {}
if resp.status_code == 429:
print("Quota exceeded — wait until UTC midnight")
return {}
resp.raise_for_status()
data = resp.json()
# Log quota status
remaining = data.get("quota_remaining")
if remaining is not None and remaining < 500:
print(f"WARNING: Only {remaining} API calls remaining today")
# Handle backoff — MUST respect this or get throttled
if "backoff" in data:
wait = data["backoff"]
print(f"API requested backoff: {wait}s — sleeping")
time.sleep(wait)
return data
Critical detail about gzip: The API returns gzip-compressed responses by default when you include Accept-Encoding: gzip. httpx and requests handle this automatically, but if you're using urllib directly, you'll need to decompress manually. Always include the encoding header — it cuts bandwidth by 70-80% for large responses.
Fetching Questions
The simplest use case: get questions by tag, sorted by votes or activity:
def get_questions(
tag: str = None,
sort: str = "votes",
page: int = 1,
pagesize: int = 100,
include_body: bool = False,
from_date: int = None,
to_date: int = None,
) -> tuple[list[dict], bool]:
"""
Fetch questions from Stack Overflow.
Args:
tag: filter by tag (e.g., 'python', 'javascript')
sort: votes, activity, creation, hot, week, month
page: page number (1-indexed)
pagesize: results per page (max 100)
include_body: if True, include full question body (HTML)
from_date: Unix timestamp for earliest creation date
to_date: Unix timestamp for latest creation date
Returns:
(list of questions, has_more bool)
"""
params = {
"order": "desc",
"sort": sort,
"pagesize": pagesize,
"page": page,
}
if tag:
params["tagged"] = tag
if include_body:
params["filter"] = "withbody"
if from_date:
params["fromdate"] = from_date
if to_date:
params["todate"] = to_date
data = stack_request("/questions", **params)
return data.get("items", []), data.get("has_more", False)
# Top Python questions by votes
questions, has_more = get_questions(tag="python", sort="votes")
for q in questions[:10]:
# HTML-decode the title (common gotcha: titles come HTML-encoded)
title = html.unescape(q['title'])
print(f"[{q['score']:>5}] {title}")
print(f" Views: {q['view_count']:,} | Answers: {q['answer_count']} | "
f"Tags: {', '.join(q['tags'][:3])}")
Custom Filters
Default responses return minimal fields — no body text, no accepted answer indicator. Stack Overflow's filter system lets you specify exactly what fields you want per request type:
def create_filter(include_fields: list[str], base: str = "none") -> str:
"""
Create a custom filter at the Stack Exchange API.
Returns a filter ID you can reuse.
base options: 'none' (no default fields), 'default', 'withbody', 'total'
"""
params = {
"base": base,
"include": ";".join(include_fields),
"unsafe": "false",
}
resp = client.get(f"{API_BASE}/filters/create", params={**params, "key": API_KEY})
data = resp.json()
if data.get("items"):
return data["items"][0]["filter"]
return "withbody" # fallback to built-in filter
# Pre-built filter for Q&A dataset building
# This filter includes question body, accepted answer flag, and full answer body
QA_FILTER = "!nNPvSNdWme" # community-known filter for full content
def get_questions_with_body(tag: str, pages: int = 5) -> list[dict]:
"""Get questions with full body text for a tag."""
all_questions = []
for page in range(1, pages + 1):
data = stack_request(
"/questions",
tagged=tag,
order="desc",
sort="votes",
page=page,
pagesize=100,
filter=QA_FILTER,
)
items = data.get("items", [])
all_questions.extend(items)
if not data.get("has_more"):
print(f"Reached end at page {page}")
break
# Respect the backoff
if "backoff" in data:
time.sleep(data["backoff"])
else:
time.sleep(0.1) # Small delay even without backoff
return all_questions
python_questions = get_questions_with_body("python", pages=3)
print(f"Fetched {len(python_questions)} questions with body text")
Advanced Search
The /search/advanced endpoint supports complex queries with multiple filters:
def search_questions(
query: str = None,
tag: str = None,
accepted: bool = None,
min_answers: int = None,
min_score: int = None,
max_score: int = None,
has_wiki: bool = None,
nottagged: str = None,
from_date: datetime = None,
to_date: datetime = None,
page: int = 1,
) -> list[dict]:
"""
Advanced question search with multiple filter options.
Examples:
# Highly voted asyncio questions with accepted answers
search_questions("asyncio event loop", tag="python", accepted=True, min_score=10)
# Recent unanswered questions (find gaps to fill)
search_questions(tag="rust", min_answers=0, from_date=datetime(2026, 1, 1))
# Performance questions excluding low-quality content
search_questions("performance optimization", tag="python", min_score=5, min_answers=1)
"""
params = {
"order": "desc",
"sort": "relevance" if query else "votes",
"filter": QA_FILTER,
"page": page,
"pagesize": 100,
}
if query:
params["q"] = query
if tag:
params["tagged"] = tag
if nottagged:
params["nottagged"] = nottagged
if accepted is not None:
params["accepted"] = str(accepted).lower()
if min_answers is not None:
params["answers"] = min_answers
if min_score is not None:
params["min"] = min_score
if not query:
params["sort"] = "votes"
if max_score is not None:
params["max"] = max_score
if has_wiki is not None:
params["wiki"] = str(has_wiki).lower()
if from_date:
params["fromdate"] = int(from_date.timestamp())
if to_date:
params["todate"] = int(to_date.timestamp())
data = stack_request("/search/advanced", **params)
return data.get("items", [])
# Find answered Python async questions with high scores
results = search_questions(
"asyncio coroutine await", tag="python", accepted=True, min_score=15
)
for q in results[:5]:
print(f"[{q['score']:>4}] {html.unescape(q['title'])}")
print(f" {q['link']}")
Fetching Answers
Get answers for specific questions, or browse top answers by tag:
def get_answers(question_id: int, include_body: bool = True) -> list[dict]:
"""Get all answers for a question, sorted by votes."""
params = {
"order": "desc",
"sort": "votes",
"pagesize": 100,
}
if include_body:
params["filter"] = "withbody"
data = stack_request(f"/questions/{question_id}/answers", **params)
return data.get("items", [])
def get_accepted_answer(question_id: int) -> dict | None:
"""Get just the accepted answer for a question."""
answers = get_answers(question_id)
accepted = [a for a in answers if a.get("is_accepted")]
return accepted[0] if accepted else None
def get_top_answers_by_tag(tag: str, pages: int = 3) -> list[dict]:
"""Get highest-voted answers in a tag."""
answers = []
for page in range(1, pages + 1):
data = stack_request(
"/answers",
order="desc",
sort="votes",
page=page,
pagesize=100,
filter="withbody",
tagged=tag,
)
answers.extend(data.get("items", []))
if not data.get("has_more"):
break
return answers
# Get answers for Python's most famous question
answers = get_answers(292357) # "Hidden features of Python"
accepted = [a for a in answers if a.get("is_accepted")]
if accepted:
print(f"Accepted answer score: {accepted[0]['score']}")
print(f"Answer body preview: {accepted[0].get('body', '')[:200]}...")
Quota Management — Critical to Get Right
This is where most people get burned. 10,000 requests/day sounds like a lot, but if you're paginating through questions and fetching answers for each one, the math is brutal: 100 pages of questions (100 requests) × 1 answer fetch each = 10,000 requests per 100-page job.
class QuotaTracker:
"""Track API quota usage and enforce spending limits."""
def __init__(self, daily_limit: int = 10000, reserve: int = 500):
self.daily_limit = daily_limit
self.reserve = reserve
self.remaining = daily_limit
self.requests_made = 0
self.start_time = datetime.now()
def update(self, api_response: dict):
"""Update quota tracking from an API response."""
if "quota_remaining" in api_response:
self.remaining = api_response["quota_remaining"]
if "quota_max" in api_response:
self.daily_limit = api_response["quota_max"]
self.requests_made += 1
def can_continue(self) -> bool:
"""Return False if we should stop to preserve reserve quota."""
return self.remaining > self.reserve
def budget_remaining(self) -> int:
"""How many requests we can safely make."""
return max(0, self.remaining - self.reserve)
def status(self) -> str:
elapsed = (datetime.now() - self.start_time).seconds / 3600
rate = self.requests_made / max(elapsed, 0.001)
return (f"Requests: {self.requests_made} made, "
f"{self.remaining} remaining "
f"({rate:.0f}/hr rate)")
tracker = QuotaTracker(reserve=1000) # Keep 1000 in reserve
def tracked_stack_request(endpoint: str, **params) -> dict:
"""Make a tracked request that stops on low quota."""
if not tracker.can_continue():
raise Exception(f"Quota too low to continue safely. {tracker.status()}")
data = stack_request(endpoint, **params)
tracker.update(data)
return data
Batching: The Single Most Important Optimization
The biggest quota win is batching multiple IDs into one request. You can fetch up to 100 items in a single API call by joining IDs with semicolons:
def batch_fetch_questions(question_ids: list[int], include_body: bool = True) -> list[dict]:
"""
Fetch multiple questions in one API call.
Stack Exchange allows up to 100 IDs per request, joined with semicolons.
This turns 100 requests into 1.
"""
all_questions = []
batch_size = 100 # API maximum
for i in range(0, len(question_ids), batch_size):
batch = question_ids[i:i + batch_size]
ids_str = ";".join(str(qid) for qid in batch)
params = {
"pagesize": batch_size,
}
if include_body:
params["filter"] = "withbody"
data = tracked_stack_request(f"/questions/{ids_str}", **params)
all_questions.extend(data.get("items", []))
return all_questions
def batch_fetch_answers(answer_ids: list[int], include_body: bool = True) -> list[dict]:
"""Fetch up to 100 answers by ID in one request."""
all_answers = []
for i in range(0, len(answer_ids), 100):
batch = answer_ids[i:i + 100]
ids_str = ";".join(str(aid) for aid in batch)
params = {"pagesize": 100}
if include_body:
params["filter"] = "withbody"
data = tracked_stack_request(f"/answers/{ids_str}", **params)
all_answers.extend(data.get("items", []))
return all_answers
# Instead of 100 requests, this uses 1:
question_ids = [292357, 231767, 100003, 986006, 419163] # + 95 more
questions = batch_fetch_questions(question_ids)
print(f"Fetched {len(questions)} questions in {len(question_ids)//100 + 1} request(s)")
Building a Q&A Training Dataset
Here's a complete example that builds a high-quality Q&A dataset suitable for machine learning:
import json
import re
def html_to_markdown(html_text: str) -> str:
"""Convert Stack Overflow HTML to clean text with code blocks preserved."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_text, "lxml")
# Convert code blocks to markdown
for pre in soup.find_all("pre"):
code = pre.get_text()
pre.replace_with(f"\n```\n{code}\n```\n")
# Convert inline code
for code in soup.find_all("code"):
code.replace_with(f"`{code.get_text()}`")
# Convert links
for a in soup.find_all("a"):
href = a.get("href", "")
text = a.get_text()
a.replace_with(f"[{text}]({href})")
# Convert headers
for tag in soup.find_all(["h1", "h2", "h3"]):
level = int(tag.name[1])
tag.replace_with(f"\n{'#' * level} {tag.get_text()}\n")
# Clean up and normalize whitespace
text = soup.get_text(separator="\n")
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def build_qa_dataset(
tag: str,
min_question_score: int = 5,
min_answer_score: int = 3,
max_items: int = 1000,
output_file: str = None,
) -> list[dict]:
"""
Build a Q&A dataset of accepted-answer pairs for a given tag.
This is the pattern for creating training data for code assistants,
FAQ systems, or RAG knowledge bases.
"""
dataset = []
page = 1
while len(dataset) < max_items:
# Check budget before continuing
if not tracker.can_continue():
print(f"Stopping: quota budget exhausted. {tracker.status()}")
break
# Search for accepted questions above score threshold
data = tracked_stack_request(
"/search/advanced",
tagged=tag,
accepted=True,
sort="votes",
order="desc",
min=min_question_score,
page=page,
pagesize=100,
filter=QA_FILTER,
)
questions = data.get("items", [])
if not questions:
print(f"No more questions at page {page}")
break
for q in questions:
if len(dataset) >= max_items:
break
# Get the accepted answer (it's embedded when using withbody filter)
# But if not present, fetch separately
accepted_answer_id = q.get("accepted_answer_id")
if not accepted_answer_id:
continue
# Try to get answer from the question data first
answer_body = None
# If not embedded, fetch the answer separately
# (This costs quota — batch if doing many)
answers = get_answers(q["question_id"])
accepted = [a for a in answers if a.get("is_accepted")]
if not accepted or accepted[0].get("score", 0) < min_answer_score:
continue
answer = accepted[0]
# Convert HTML to clean text
q_text = html_to_markdown(q.get("body", ""))
a_text = html_to_markdown(answer.get("body", ""))
if not q_text or not a_text:
continue
dataset.append({
"question_id": q["question_id"],
"title": html.unescape(q["title"]),
"question": q_text,
"answer": a_text,
"question_score": q["score"],
"answer_score": answer["score"],
"tags": q.get("tags", []),
"link": q.get("link"),
"answer_count": q.get("answer_count", 0),
"view_count": q.get("view_count", 0),
"creation_date": datetime.fromtimestamp(q["creation_date"]).isoformat(),
})
if not data.get("has_more"):
break
page += 1
print(f"Progress: {len(dataset)}/{max_items} items "
f"(quota: {tracker.remaining} remaining)")
if output_file:
with open(output_file, "w", encoding="utf-8") as f:
json.dump(dataset, f, indent=2, ensure_ascii=False)
print(f"Saved {len(dataset)} Q&A pairs to {output_file}")
return dataset[:max_items]
# Build Python Q&A dataset
dataset = build_qa_dataset(
tag="python",
min_question_score=10,
min_answer_score=5,
max_items=500,
output_file="python_qa_500.json",
)
print(f"\nDataset stats: {len(dataset)} items")
print(f"Avg question score: {sum(d['question_score'] for d in dataset)/len(dataset):.1f}")
print(f"Avg view count: {sum(d['view_count'] for d in dataset)/len(dataset):,.0f}")
Fetching User Data and Tag Statistics
Beyond questions and answers, the API exposes user profiles and tag metadata:
def get_tag_info(tags: list[str]) -> list[dict]:
"""Get statistics for a list of tags."""
tags_str = ";".join(tags)
data = stack_request(
f"/tags/{tags_str}/info",
pagesize=len(tags),
)
return [
{
"tag": item["name"],
"question_count": item["count"],
"has_synonyms": item.get("has_synonyms", False),
"is_moderator_only": item.get("is_moderator_only", False),
"is_required": item.get("is_required", False),
"last_activity_date": datetime.fromtimestamp(
item["last_activity_date"]
).isoformat() if "last_activity_date" in item else None,
}
for item in data.get("items", [])
]
def get_related_tags(tag: str) -> list[dict]:
"""Get tags that frequently appear with a given tag."""
data = stack_request(f"/tags/{tag}/related", pagesize=20)
return [
{"tag": t["name"], "count": t["count"]}
for t in data.get("items", [])
]
def get_top_users(tag: str, page: int = 1) -> list[dict]:
"""Get top answerers for a tag."""
data = stack_request(
"/users",
order="desc",
sort="reputation",
page=page,
pagesize=50,
filter="default",
)
return [
{
"user_id": u["user_id"],
"name": u["display_name"],
"reputation": u["reputation"],
"badge_gold": u.get("badge_counts", {}).get("gold", 0),
"badge_silver": u.get("badge_counts", {}).get("silver", 0),
"answer_count": u.get("answer_count", 0),
"question_count": u.get("question_count", 0),
}
for u in data.get("items", [])
]
# Get info on top Python-adjacent tags
py_tags = ["python", "python-3.x", "pandas", "numpy", "django", "flask", "fastapi"]
tag_info = get_tag_info(py_tags)
for t in sorted(tag_info, key=lambda x: x["question_count"], reverse=True):
print(f"{t['tag']:<20} {t['question_count']:>10,} questions")
Scaling Beyond the 10,000 Daily Quota
When 10,000 requests/day isn't enough, here are your options:
Option 1: Stack Exchange Data Dump
Stack Overflow releases quarterly data dumps on archive.org. For historical analysis, this is the best option: - Complete data, no API calls needed - Full post history and revision history - Data is in XML format (Posts.xml, Users.xml, Comments.xml) - Last dump: several GB uncompressed
import xml.etree.ElementTree as ET
def parse_posts_xml(xml_file: str, max_items: int = None):
"""Parse Stack Exchange data dump Posts.xml."""
posts = []
for event, elem in ET.iterparse(xml_file, events=("end",)):
if elem.tag == "row":
post_type = elem.get("PostTypeId")
if post_type == "1": # Questions
posts.append({
"id": int(elem.get("Id")),
"title": elem.get("Title"),
"body": elem.get("Body"),
"score": int(elem.get("Score", 0)),
"tags": elem.get("Tags", ""),
"accepted_answer_id": elem.get("AcceptedAnswerId"),
"view_count": int(elem.get("ViewCount", 0)),
"creation_date": elem.get("CreationDate"),
})
elif post_type == "2": # Answers
posts.append({
"id": int(elem.get("Id")),
"parent_id": int(elem.get("ParentId", 0)),
"body": elem.get("Body"),
"score": int(elem.get("Score", 0)),
"is_accepted": elem.get("AcceptedAnswerId") is not None,
"creation_date": elem.get("CreationDate"),
})
elem.clear() # Free memory
if max_items and len(posts) >= max_items:
break
return posts
Option 2: Proxy Rotation for IP-Based Limits
The API quota is per API key, not per IP. But Akamai-protected Stack Overflow infrastructure may also apply IP-level throttling at very high volumes. For distributed scraping across multiple services, routing through different IPs helps:
import os
# Route API requests through residential proxies to avoid IP throttling
# ThorData residential proxies work well for Stack Exchange API traffic
# https://thordata.partnerstack.com/partner/0a0x4nzq (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
def create_proxied_client(proxy_url: str) -> httpx.Client:
return httpx.Client(
timeout=20,
proxy=proxy_url,
headers={
"Accept-Encoding": "gzip, deflate",
"User-Agent": "StackOverflowScraper/1.0",
}
)
proxy_clients = [
create_proxied_client("http://user:[email protected]:7777"),
]
Option 3: Register Multiple Keys
You can register multiple Stack Apps (each tied to a different Stack Exchange account) and distribute requests across keys. This is within the ToS for legitimate use cases.
Handling Response Quirks
HTML encoding in titles: Question titles come HTML-encoded. Always decode:
import html
title = html.unescape(question["title"])
Unix timestamps: All dates are Unix timestamps (seconds since epoch):
created = datetime.fromtimestamp(question["creation_date"])
Missing fields: Questions without accepted answers won't have accepted_answer_id. Questions with 0 answers won't have answers in the response at all.
Deleted content: The API won't return deleted questions/answers — some historical IDs return empty.
error_id 502: "Throttle violation" — you're making requests too fast without respecting backoff. Add delays.
Body encoding: Answer bodies contain HTML with entities like <, &. Use BeautifulSoup or html.unescape to decode.
Storing Data Efficiently
import sqlite3
def setup_so_database(db_path: str) -> sqlite3.Connection:
"""Create optimized SQLite schema for Stack Overflow data."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL") # Better concurrent writes
conn.execute("PRAGMA synchronous=NORMAL") # Faster writes, still safe
conn.execute("""
CREATE TABLE IF NOT EXISTS questions (
question_id INTEGER PRIMARY KEY,
title TEXT,
body TEXT,
score INTEGER,
view_count INTEGER,
answer_count INTEGER,
accepted_answer_id INTEGER,
tags TEXT,
link TEXT,
creation_date INTEGER,
last_activity_date INTEGER,
is_answered INTEGER
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS answers (
answer_id INTEGER PRIMARY KEY,
question_id INTEGER,
body TEXT,
score INTEGER,
is_accepted INTEGER,
creation_date INTEGER,
owner_reputation INTEGER,
FOREIGN KEY (question_id) REFERENCES questions(question_id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_answers_question ON answers(question_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_questions_score ON questions(score)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_questions_tags ON questions(tags)")
conn.commit()
return conn
def insert_question(conn: sqlite3.Connection, q: dict):
conn.execute("""
INSERT OR REPLACE INTO questions
(question_id, title, body, score, view_count, answer_count,
accepted_answer_id, tags, link, creation_date, last_activity_date, is_answered)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", (
q["question_id"],
html.unescape(q.get("title", "")),
q.get("body", ""),
q.get("score", 0),
q.get("view_count", 0),
q.get("answer_count", 0),
q.get("accepted_answer_id"),
json.dumps(q.get("tags", [])),
q.get("link"),
q.get("creation_date"),
q.get("last_activity_date"),
int(q.get("is_answered", False)),
))
def insert_answer(conn: sqlite3.Connection, a: dict):
conn.execute("""
INSERT OR REPLACE INTO answers
(answer_id, question_id, body, score, is_accepted, creation_date, owner_reputation)
VALUES (?,?,?,?,?,?,?)
""", (
a["answer_id"],
a.get("question_id"),
a.get("body", ""),
a.get("score", 0),
int(a.get("is_accepted", False)),
a.get("creation_date"),
a.get("owner", {}).get("reputation", 0),
))
Wrapping Up
Stack Overflow's API is genuinely well-designed — consistent structure, good documentation, proper error handling, and quota tracking built into every response. Key points:
- Always use an API key — the difference between 300 and 10,000 requests/day is critical
- Batch requests aggressively —
/questions/{ids}with 100 semicolon-separated IDs is the biggest single optimization - Respect
backoff— it's in the response body, not a header; ignoring it gets your key throttled - Use the data dump for historical analysis — quarterly releases on archive.org, no quota needed
- Custom filters dramatically reduce response size by requesting only the fields you need
- For distributed scraping or very high volumes, ThorData residential proxies help avoid IP-level throttling on top of API quota limits
- HTML decode titles and dates (Unix timestamps) before storing