Scraping Semantic Scholar Paper Metadata and Citations with Python (2026)
Scraping Semantic Scholar Paper Metadata and Citations with Python (2026)
Semantic Scholar is one of the best-structured sources of academic data on the public internet. The Graph API is documented, public, and returns real structured data about papers, authors, citation networks, and more — without scraping HTML. The main challenge is rate limiting, which becomes painful fast at scale.
This guide covers the full API surface: what data it exposes, how to pull it efficiently with Python async, how to store it in SQLite, and how to scale across large citation graphs without hitting throttle walls.
What Data Is Available
The Semantic Scholar Graph API at api.semanticscholar.org/graph/v1 exposes:
Paper records: - Title, abstract, year, venue, journal name - Publication types (JournalArticle, Conference, Review, etc.) - Citation count, reference count, influential citation count - TLDR summaries — auto-generated one-sentence abstracts (not available for all papers) - Open access PDF links with status (GREEN, GOLD, BRONZE, CLOSED) - External IDs — DOI, ArXiv ID, PubMed ID, MAG ID, CorpusId (integer)
Citation and reference graphs:
- Full list of papers that cite a given paper
- Full reference list of papers a given paper cites
- isInfluential flag per citation — whether Semantic Scholar judged it foundational
Author profiles: - Name, h-index, total citations, paper count - Affiliations (institutional) - Homepage URL - Full paper list with pagination
Bulk endpoints:
- POST /paper/batch — fetch up to 500 papers in a single request
- POST /author/batch — fetch up to 1000 authors in a single request
The influentialCitationCount metric deserves special attention. It is Semantic Scholar's internal signal for papers that are explicitly cited as foundational work rather than just mentioned. A paper with 500 citations and 200 influential citations reads very differently from one with 500 citations and 10 influential ones.
Rate Limits and API Keys
Without an API key: 100 requests per minute, shared per IP. This sounds reasonable until you start paginating citation lists for dozens of papers simultaneously.
With a free API key: 1 request per second for most endpoints. The batch endpoints have their own separate limits. Register at semanticscholar.org/product/api — approval is typically fast for research use.
Bulk operation strategy: The POST /paper/batch endpoint accepts up to 500 paper IDs per request and is the correct approach for enriching a list of known papers. 500 papers in one POST is far more efficient than 500 individual GETs.
At scale: For large citation graph traversals across multiple machines, ThorData's residential proxies let you distribute requests across IPs so the per-IP rate counters reset cleanly for each worker. Each worker node gets a different exit IP. For single-machine work with an API key and an asyncio semaphore, proxies are unnecessary.
Setup
pip install httpx
import httpx
import asyncio
import sqlite3
import json
import time
from typing import Optional
API_KEY = "your_api_key_here" # or None for unauthenticated
BASE_URL = "https://api.semanticscholar.org/graph/v1"
HEADERS = {}
if API_KEY:
HEADERS["x-api-key"] = API_KEY
# 1 req/sec with key; use 1.1s to give a safety margin
REQUEST_SEMAPHORE = asyncio.Semaphore(1)
Searching Papers by Keyword
async def search_papers(
query: str,
fields: str = "paperId,title,year,citationCount,authors,abstract,externalIds",
limit: int = 10,
offset: int = 0,
) -> dict:
"""
Search papers by keyword.
Returns:
{"total": int, "offset": int, "next": int, "data": [...]}
Note: pagination caps at offset 9999. For more results, split by date range.
"""
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/paper/search",
params={
"query": query,
"fields": fields,
"limit": limit,
"offset": offset,
},
)
resp.raise_for_status()
return resp.json()
async def search_papers_paginated(
query: str, max_results: int = 100
) -> list[dict]:
"""Fetch multiple pages of search results."""
all_papers = []
offset = 0
limit = min(100, max_results)
while len(all_papers) < max_results:
page = await search_papers(query, limit=limit, offset=offset)
batch = page.get("data", [])
all_papers.extend(batch)
if "next" not in page or len(batch) < limit:
break
offset = page["next"]
if offset >= 10000: # API hard limit
break
return all_papers[:max_results]
Fetching Full Paper Details
PAPER_FIELDS = ",".join([
"paperId",
"title",
"abstract",
"year",
"citationCount",
"referenceCount",
"influentialCitationCount",
"tldr",
"openAccessPdf",
"authors",
"venue",
"publicationVenue",
"externalIds",
"publicationTypes",
"journal",
"fieldsOfStudy",
"s2FieldsOfStudy",
])
async def get_paper(paper_id: str) -> Optional[dict]:
"""
Fetch full paper details.
paper_id formats accepted:
- Semantic Scholar ID: "649def34f8be52c8b66281af98ae884c09aef38b"
- DOI: "DOI:10.18653/v1/N18-3011"
- ArXiv: "ARXIV:2106.15928"
- PubMed: "PMID:12345678"
- CorpusId: "CorpusId:215416146"
Returns None if paper not found.
"""
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/paper/{paper_id}",
params={"fields": PAPER_FIELDS},
)
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
Fetching Citation Graphs
async def get_citations(
paper_id: str,
fields: str = "paperId,title,year,citationCount,isInfluential",
limit: int = 500,
offset: int = 0,
) -> dict:
"""
Get papers that cite the given paper.
Returns:
{"offset": int, "next": int, "data": [
{"citingPaper": {...}, "isInfluential": bool},
...
]}
Max limit per request: 1000. Paginate using offset.
isInfluential: True if Semantic Scholar flagged this as a foundational citation.
"""
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/paper/{paper_id}/citations",
params={"fields": fields, "limit": limit, "offset": offset},
)
resp.raise_for_status()
return resp.json()
async def get_all_citations(
paper_id: str,
fields: str = "paperId,title,year,citationCount",
) -> list[dict]:
"""Paginate through all citations for a paper."""
all_citations = []
offset = 0
limit = 500
while True:
page = await get_citations(paper_id, fields=fields, limit=limit, offset=offset)
batch = page.get("data", [])
all_citations.extend(batch)
if "next" not in page or len(batch) < limit:
break
offset = page["next"]
return all_citations
async def get_references(
paper_id: str,
fields: str = "paperId,title,year,citationCount",
limit: int = 500,
) -> list[dict]:
"""Get all papers referenced by the given paper."""
all_refs = []
offset = 0
while True:
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/paper/{paper_id}/references",
params={"fields": fields, "limit": limit, "offset": offset},
)
resp.raise_for_status()
data = resp.json()
batch = data.get("data", [])
all_refs.extend(batch)
if "next" not in data or len(batch) < limit:
break
offset = data["next"]
return all_refs
Author Profiles with H-Index
AUTHOR_FIELDS = "name,hIndex,citationCount,paperCount,affiliations,homepage,externalIds"
async def get_author(author_id: str) -> Optional[dict]:
"""
Fetch author profile.
Response shape:
{
"authorId": "1741101",
"name": "Ashish Vaswani",
"hIndex": 42,
"citationCount": 98234,
"paperCount": 37,
"affiliations": ["Google Brain"],
"homepage": "https://...",
"externalIds": {"DBLP": "...", "ORCID": "..."}
}
"""
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/author/{author_id}",
params={"fields": AUTHOR_FIELDS},
)
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
async def get_author_papers(
author_id: str,
fields: str = "paperId,title,year,citationCount,influentialCitationCount",
) -> list[dict]:
"""Get all papers by an author with pagination."""
all_papers = []
offset = 0
limit = 500
while True:
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/author/{author_id}/papers",
params={"fields": fields, "limit": limit, "offset": offset},
)
resp.raise_for_status()
data = resp.json()
batch = data.get("data", [])
all_papers.extend(batch)
if "next" not in data or len(batch) < limit:
break
offset = data["next"]
return all_papers
Bulk Paper Lookup
The batch endpoint is critical for efficiency. One POST for 500 papers instead of 500 individual GETs:
async def batch_get_papers(
paper_ids: list[str],
fields: str = "paperId,title,year,citationCount,influentialCitationCount,authors,abstract",
) -> list[Optional[dict]]:
"""
Fetch up to 500 papers per request via POST.
paper_ids can be any mix of Semantic Scholar IDs, DOIs, ArXiv IDs.
Returns list in same order as input; None entries for not-found papers.
"""
results = []
for i in range(0, len(paper_ids), 500):
chunk = paper_ids[i:i + 500]
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=60) as client:
resp = await client.post(
f"{BASE_URL}/paper/batch",
params={"fields": fields},
json={"ids": chunk},
)
resp.raise_for_status()
results.extend(resp.json())
return results
async def batch_get_authors(
author_ids: list[str],
fields: str = "authorId,name,hIndex,citationCount,paperCount,affiliations",
) -> list[Optional[dict]]:
"""Fetch up to 1000 authors per request via POST."""
results = []
for i in range(0, len(author_ids), 1000):
chunk = author_ids[i:i + 1000]
async with REQUEST_SEMAPHORE:
await asyncio.sleep(1.1)
async with httpx.AsyncClient(headers=HEADERS, timeout=60) as client:
resp = await client.post(
f"{BASE_URL}/author/batch",
params={"fields": fields},
json={"ids": chunk},
)
resp.raise_for_status()
results.extend(resp.json())
return results
SQLite Schema
def init_db(db_path: str = "semantic_scholar.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS papers (
paper_id TEXT PRIMARY KEY,
title TEXT,
abstract TEXT,
year INTEGER,
citation_count INTEGER DEFAULT 0,
reference_count INTEGER DEFAULT 0,
influential_citation_count INTEGER DEFAULT 0,
tldr TEXT,
open_access_url TEXT,
open_access_status TEXT,
venue TEXT,
journal TEXT,
publication_types TEXT,
fields_of_study TEXT,
external_ids TEXT,
authors TEXT,
corpus_id INTEGER,
doi TEXT,
arxiv_id TEXT,
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS citations (
citing_paper_id TEXT NOT NULL,
cited_paper_id TEXT NOT NULL,
is_influential INTEGER DEFAULT 0,
PRIMARY KEY (citing_paper_id, cited_paper_id)
);
CREATE TABLE IF NOT EXISTS authors (
author_id TEXT PRIMARY KEY,
name TEXT,
h_index INTEGER,
citation_count INTEGER,
paper_count INTEGER,
affiliations TEXT,
homepage TEXT,
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_citations_cited
ON citations(cited_paper_id);
CREATE INDEX IF NOT EXISTS idx_papers_year
ON papers(year);
CREATE INDEX IF NOT EXISTS idx_papers_citations
ON papers(citation_count DESC);
CREATE INDEX IF NOT EXISTS idx_papers_corpus_id
ON papers(corpus_id);
""")
conn.commit()
return conn
def save_paper(paper: dict, conn: sqlite3.Connection):
if not paper or not paper.get("paperId"):
return
tldr_text = None
if paper.get("tldr"):
tldr_text = paper["tldr"].get("text")
open_access_url = None
open_access_status = None
if paper.get("openAccessPdf"):
open_access_url = paper["openAccessPdf"].get("url")
open_access_status = paper["openAccessPdf"].get("status")
external_ids = paper.get("externalIds") or {}
conn.execute(
"""INSERT OR REPLACE INTO papers
(paper_id, title, abstract, year, citation_count, reference_count,
influential_citation_count, tldr, open_access_url, open_access_status,
venue, journal, publication_types, fields_of_study, external_ids,
authors, corpus_id, doi, arxiv_id)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
paper.get("paperId"),
paper.get("title"),
paper.get("abstract"),
paper.get("year"),
paper.get("citationCount", 0),
paper.get("referenceCount", 0),
paper.get("influentialCitationCount", 0),
tldr_text,
open_access_url,
open_access_status,
paper.get("venue"),
paper.get("journal", {}).get("name") if isinstance(paper.get("journal"), dict) else None,
json.dumps(paper.get("publicationTypes") or []),
json.dumps([f.get("category") for f in (paper.get("s2FieldsOfStudy") or [])]),
json.dumps(external_ids),
json.dumps([a.get("name") for a in (paper.get("authors") or [])]),
external_ids.get("CorpusId"),
external_ids.get("DOI"),
external_ids.get("ArXiv"),
),
)
conn.commit()
def save_citations(
cited_paper_id: str,
citations: list[dict],
conn: sqlite3.Connection,
):
conn.executemany(
"""INSERT OR IGNORE INTO citations (citing_paper_id, cited_paper_id, is_influential)
VALUES (?,?,?)""",
[
(
entry.get("citingPaper", {}).get("paperId"),
cited_paper_id,
int(entry.get("isInfluential", False)),
)
for entry in citations
if entry.get("citingPaper", {}).get("paperId")
],
)
conn.commit()
Error Handling
The Semantic Scholar API returns structured errors that need different handling:
import asyncio
class SemanticScholarClient:
"""Client with retry logic and rate limit handling."""
def __init__(self, api_key: str = None, requests_per_second: float = 0.9):
self.headers = {"x-api-key": api_key} if api_key else {}
self.semaphore = asyncio.Semaphore(1)
self.interval = 1.0 / requests_per_second
async def get(self, path: str, **kwargs) -> dict:
"""GET request with exponential backoff retry."""
url = f"{BASE_URL}{path}"
max_retries = 5
for attempt in range(max_retries):
async with self.semaphore:
await asyncio.sleep(self.interval)
try:
async with httpx.AsyncClient(
headers=self.headers, timeout=30
) as client:
resp = await client.get(url, **kwargs)
if resp.status_code == 429:
# Rate limited — check Retry-After header
retry_after = int(resp.headers.get("Retry-After", 60))
print(f" Rate limited. Waiting {retry_after}s...")
await asyncio.sleep(retry_after)
continue
if resp.status_code == 404:
return {}
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in (500, 502, 503) and attempt < max_retries - 1:
wait = 2 ** attempt * 5
print(f" Server error {e.response.status_code}, retry in {wait}s")
await asyncio.sleep(wait)
continue
raise
return {}
Citation Graph Traversal
A common use case: map the citation network for an entire research area, starting from a set of seed papers:
async def build_citation_graph(
seed_paper_ids: list[str],
max_depth: int = 2,
min_citations: int = 10,
db_path: str = "semantic_scholar.db",
):
"""
BFS traversal of the citation graph starting from seed papers.
Fetches all papers that cite the seeds, then papers that cite those, etc.
Filters by min_citations to avoid pulling in obscure papers.
"""
conn = init_db(db_path)
client = SemanticScholarClient(api_key=API_KEY)
visited = set()
current_frontier = set(seed_paper_ids)
for depth in range(max_depth + 1):
print(f"\nDepth {depth}: {len(current_frontier)} papers to process")
next_frontier = set()
# Batch fetch paper details
ids_list = list(current_frontier - visited)
papers = await batch_get_papers(ids_list)
for paper in papers:
if not paper:
continue
save_paper(paper, conn)
visited.add(paper["paperId"])
# Fetch citations for each paper
for paper_id in ids_list:
if depth < max_depth:
citations = await get_all_citations(paper_id)
save_citations(paper_id, citations, conn)
# Add highly-cited papers to next frontier
for entry in citations:
citing_paper = entry.get("citingPaper", {})
pid = citing_paper.get("paperId")
if pid and pid not in visited:
if citing_paper.get("citationCount", 0) >= min_citations:
next_frontier.add(pid)
current_frontier = next_frontier
conn.close()
print(f"\nCitation graph built. Visited {len(visited)} papers.")
Practical Tips
Use CorpusId for stable references. The paperId hex string is stable, but CorpusId (integer) is shorter and more convenient for join-heavy SQL queries. It lives in externalIds.CorpusId.
Search pagination caps at 10,000. The /paper/search endpoint lets you paginate up to offset 9,999. Beyond that, split your query with tighter constraints — add date ranges like year:2020-2023 in the query string, or use more specific terms.
TLDR coverage is patchy. The TLDR field is model-generated and only exists for a subset of papers. Do not build pipeline logic that assumes it will be present.
Influential citation count is the interesting metric. citationCount tells you how often a paper is cited. influentialCitationCount tells you how often it was cited as foundational work. The ratio between them signals whether a paper is a cornerstone or just frequently mentioned in passing.
Scaling horizontally. For citation graph traversals across thousands of seed papers — say, mapping an entire subfield like "transformer architectures" — you will exhaust the per-key rate limit quickly. The right approach is multiple workers, each with its own API key (Semantic Scholar allows this for research use), running behind ThorData residential proxies to avoid IP-level throttling across workers. One key plus one IP per worker keeps you within limits without building exponential backoff into every call.
Open access PDFs. The openAccessPdf.url field gives you direct download links for papers with GREEN or GOLD status. These are hosted on ArXiv, PubMed Central, or publisher sites. You can download these directly without going through any paywall.
Complete Example
async def main():
"""Pull papers about a topic and their citation network."""
conn = init_db("ml_papers.db")
# Search for papers
results = await search_papers_paginated(
"attention mechanism transformer", max_results=50
)
print(f"Found {len(results)} papers")
# Batch enrich with full details
paper_ids = [p["paperId"] for p in results]
papers = await batch_get_papers(paper_ids, fields=PAPER_FIELDS)
for paper in papers:
if paper:
save_paper(paper, conn)
# Fetch citations for top papers
top_papers = sorted(
[p for p in papers if p],
key=lambda x: x.get("citationCount", 0),
reverse=True,
)[:10]
for paper in top_papers:
print(f"\nFetching citations for: {paper['title'][:60]}")
citations = await get_all_citations(paper["paperId"])
save_citations(paper["paperId"], citations, conn)
print(f" {len(citations)} citations saved")
conn.close()
asyncio.run(main())
Analyzing the Data with SQL
With papers, citations, and authors in SQLite, you can run meaningful research analysis:
conn = sqlite3.connect("semantic_scholar.db")
# Most cited papers in the dataset
most_cited = conn.execute("""
SELECT title, year, citation_count, influential_citation_count,
ROUND(influential_citation_count * 1.0 / citation_count * 100, 1) AS influential_pct
FROM papers
WHERE citation_count > 0
ORDER BY citation_count DESC
LIMIT 20
""").fetchall()
# Papers with highest ratio of influential citations (foundational work)
most_influential = conn.execute("""
SELECT title, year, citation_count, influential_citation_count,
ROUND(influential_citation_count * 1.0 / citation_count * 100, 1) AS influential_pct
FROM papers
WHERE citation_count >= 50 AND influential_citation_count > 0
ORDER BY influential_pct DESC
LIMIT 20
""").fetchall()
# Citation velocity by year (how quickly papers accumulate citations)
velocity_by_year = conn.execute("""
SELECT year,
COUNT(*) AS paper_count,
AVG(citation_count) AS avg_citations,
MAX(citation_count) AS max_citations
FROM papers
WHERE year >= 2015 AND year <= 2024
GROUP BY year
ORDER BY year
""").fetchall()
# Open access coverage
oa_stats = conn.execute("""
SELECT
COALESCE(open_access_status, 'CLOSED') AS status,
COUNT(*) AS count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM papers), 1) AS pct
FROM papers
GROUP BY status
ORDER BY count DESC
""").fetchall()
# Papers per field of study
field_counts = conn.execute("""
SELECT
json_each.value AS field,
COUNT(*) AS paper_count,
AVG(citation_count) AS avg_citations
FROM papers, json_each(fields_of_study)
WHERE fields_of_study IS NOT NULL AND fields_of_study != '[]'
GROUP BY field
ORDER BY paper_count DESC
LIMIT 15
""").fetchall()
for field, count, avg_cit in field_counts:
print(f" {field:<30} {count:>5} papers avg {avg_cit:.0f} citations")
Finding Author Collaboration Networks
async def build_author_coauthorship(
paper_ids: list[str],
db_path: str = "semantic_scholar.db",
):
"""
Build a coauthorship network from a set of papers.
Stores author pairs who have co-authored at least one paper.
"""
conn = sqlite3.connect(db_path)
# Create coauthorship table
conn.execute("""
CREATE TABLE IF NOT EXISTS coauthorships (
author1_id TEXT NOT NULL,
author2_id TEXT NOT NULL,
paper_count INTEGER DEFAULT 1,
PRIMARY KEY (author1_id, author2_id)
)
""")
# Fetch full author data for all papers
papers = await batch_get_papers(
paper_ids,
fields="paperId,authors"
)
for paper in papers:
if not paper:
continue
authors = paper.get("authors", [])
author_ids = [a.get("authorId") for a in authors if a.get("authorId")]
# Create pairs
for i, a1 in enumerate(author_ids):
for a2 in author_ids[i+1:]:
pair = tuple(sorted([a1, a2]))
conn.execute(
"""INSERT INTO coauthorships (author1_id, author2_id, paper_count)
VALUES (?, ?, 1)
ON CONFLICT(author1_id, author2_id)
DO UPDATE SET paper_count = paper_count + 1""",
pair,
)
conn.commit()
# Find most collaborative authors
top_collaborators = conn.execute("""
SELECT author1_id, COUNT(DISTINCT author2_id) AS collaborator_count,
SUM(paper_count) AS total_papers
FROM coauthorships
GROUP BY author1_id
ORDER BY collaborator_count DESC
LIMIT 10
""").fetchall()
conn.close()
return top_collaborators