← Back to blog

Scraping Semantic Scholar Paper Metadata and Citations with Python (2026)

Scraping Semantic Scholar Paper Metadata and Citations with Python (2026)

Semantic Scholar is one of the best-structured sources of academic data on the public internet. The Graph API is documented, public, and returns real structured data about papers, authors, citation networks, and more — without scraping HTML. The main challenge is rate limiting, which becomes painful fast at scale.

This guide covers the full API surface: what data it exposes, how to pull it efficiently with Python async, how to store it in SQLite, and how to scale across large citation graphs without hitting throttle walls.

What Data Is Available

The Semantic Scholar Graph API at api.semanticscholar.org/graph/v1 exposes:

Paper records: - Title, abstract, year, venue, journal name - Publication types (JournalArticle, Conference, Review, etc.) - Citation count, reference count, influential citation count - TLDR summaries — auto-generated one-sentence abstracts (not available for all papers) - Open access PDF links with status (GREEN, GOLD, BRONZE, CLOSED) - External IDs — DOI, ArXiv ID, PubMed ID, MAG ID, CorpusId (integer)

Citation and reference graphs: - Full list of papers that cite a given paper - Full reference list of papers a given paper cites - isInfluential flag per citation — whether Semantic Scholar judged it foundational

Author profiles: - Name, h-index, total citations, paper count - Affiliations (institutional) - Homepage URL - Full paper list with pagination

Bulk endpoints: - POST /paper/batch — fetch up to 500 papers in a single request - POST /author/batch — fetch up to 1000 authors in a single request

The influentialCitationCount metric deserves special attention. It is Semantic Scholar's internal signal for papers that are explicitly cited as foundational work rather than just mentioned. A paper with 500 citations and 200 influential citations reads very differently from one with 500 citations and 10 influential ones.

Rate Limits and API Keys

Without an API key: 100 requests per minute, shared per IP. This sounds reasonable until you start paginating citation lists for dozens of papers simultaneously.

With a free API key: 1 request per second for most endpoints. The batch endpoints have their own separate limits. Register at semanticscholar.org/product/api — approval is typically fast for research use.

Bulk operation strategy: The POST /paper/batch endpoint accepts up to 500 paper IDs per request and is the correct approach for enriching a list of known papers. 500 papers in one POST is far more efficient than 500 individual GETs.

At scale: For large citation graph traversals across multiple machines, ThorData's residential proxies let you distribute requests across IPs so the per-IP rate counters reset cleanly for each worker. Each worker node gets a different exit IP. For single-machine work with an API key and an asyncio semaphore, proxies are unnecessary.

Setup

pip install httpx
import httpx
import asyncio
import sqlite3
import json
import time
from typing import Optional

API_KEY = "your_api_key_here"  # or None for unauthenticated
BASE_URL = "https://api.semanticscholar.org/graph/v1"

HEADERS = {}
if API_KEY:
    HEADERS["x-api-key"] = API_KEY

# 1 req/sec with key; use 1.1s to give a safety margin
REQUEST_SEMAPHORE = asyncio.Semaphore(1)

Searching Papers by Keyword

async def search_papers(
    query: str,
    fields: str = "paperId,title,year,citationCount,authors,abstract,externalIds",
    limit: int = 10,
    offset: int = 0,
) -> dict:
    """
    Search papers by keyword.

    Returns:
        {"total": int, "offset": int, "next": int, "data": [...]}

    Note: pagination caps at offset 9999. For more results, split by date range.
    """
    async with REQUEST_SEMAPHORE:
        await asyncio.sleep(1.1)
        async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
            resp = await client.get(
                f"{BASE_URL}/paper/search",
                params={
                    "query": query,
                    "fields": fields,
                    "limit": limit,
                    "offset": offset,
                },
            )
            resp.raise_for_status()
            return resp.json()


async def search_papers_paginated(
    query: str, max_results: int = 100
) -> list[dict]:
    """Fetch multiple pages of search results."""
    all_papers = []
    offset = 0
    limit = min(100, max_results)

    while len(all_papers) < max_results:
        page = await search_papers(query, limit=limit, offset=offset)
        batch = page.get("data", [])
        all_papers.extend(batch)

        if "next" not in page or len(batch) < limit:
            break
        offset = page["next"]
        if offset >= 10000:  # API hard limit
            break

    return all_papers[:max_results]

Fetching Full Paper Details

PAPER_FIELDS = ",".join([
    "paperId",
    "title",
    "abstract",
    "year",
    "citationCount",
    "referenceCount",
    "influentialCitationCount",
    "tldr",
    "openAccessPdf",
    "authors",
    "venue",
    "publicationVenue",
    "externalIds",
    "publicationTypes",
    "journal",
    "fieldsOfStudy",
    "s2FieldsOfStudy",
])

async def get_paper(paper_id: str) -> Optional[dict]:
    """
    Fetch full paper details.

    paper_id formats accepted:
    - Semantic Scholar ID: "649def34f8be52c8b66281af98ae884c09aef38b"
    - DOI: "DOI:10.18653/v1/N18-3011"
    - ArXiv: "ARXIV:2106.15928"
    - PubMed: "PMID:12345678"
    - CorpusId: "CorpusId:215416146"

    Returns None if paper not found.
    """
    async with REQUEST_SEMAPHORE:
        await asyncio.sleep(1.1)
        async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
            resp = await client.get(
                f"{BASE_URL}/paper/{paper_id}",
                params={"fields": PAPER_FIELDS},
            )
            if resp.status_code == 404:
                return None
            resp.raise_for_status()
            return resp.json()

Fetching Citation Graphs

async def get_citations(
    paper_id: str,
    fields: str = "paperId,title,year,citationCount,isInfluential",
    limit: int = 500,
    offset: int = 0,
) -> dict:
    """
    Get papers that cite the given paper.

    Returns:
        {"offset": int, "next": int, "data": [
            {"citingPaper": {...}, "isInfluential": bool},
            ...
        ]}

    Max limit per request: 1000. Paginate using offset.
    isInfluential: True if Semantic Scholar flagged this as a foundational citation.
    """
    async with REQUEST_SEMAPHORE:
        await asyncio.sleep(1.1)
        async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
            resp = await client.get(
                f"{BASE_URL}/paper/{paper_id}/citations",
                params={"fields": fields, "limit": limit, "offset": offset},
            )
            resp.raise_for_status()
            return resp.json()


async def get_all_citations(
    paper_id: str,
    fields: str = "paperId,title,year,citationCount",
) -> list[dict]:
    """Paginate through all citations for a paper."""
    all_citations = []
    offset = 0
    limit = 500

    while True:
        page = await get_citations(paper_id, fields=fields, limit=limit, offset=offset)
        batch = page.get("data", [])
        all_citations.extend(batch)

        if "next" not in page or len(batch) < limit:
            break
        offset = page["next"]

    return all_citations


async def get_references(
    paper_id: str,
    fields: str = "paperId,title,year,citationCount",
    limit: int = 500,
) -> list[dict]:
    """Get all papers referenced by the given paper."""
    all_refs = []
    offset = 0

    while True:
        async with REQUEST_SEMAPHORE:
            await asyncio.sleep(1.1)
            async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
                resp = await client.get(
                    f"{BASE_URL}/paper/{paper_id}/references",
                    params={"fields": fields, "limit": limit, "offset": offset},
                )
                resp.raise_for_status()
                data = resp.json()

        batch = data.get("data", [])
        all_refs.extend(batch)

        if "next" not in data or len(batch) < limit:
            break
        offset = data["next"]

    return all_refs

Author Profiles with H-Index

AUTHOR_FIELDS = "name,hIndex,citationCount,paperCount,affiliations,homepage,externalIds"

async def get_author(author_id: str) -> Optional[dict]:
    """
    Fetch author profile.

    Response shape:
    {
        "authorId": "1741101",
        "name": "Ashish Vaswani",
        "hIndex": 42,
        "citationCount": 98234,
        "paperCount": 37,
        "affiliations": ["Google Brain"],
        "homepage": "https://...",
        "externalIds": {"DBLP": "...", "ORCID": "..."}
    }
    """
    async with REQUEST_SEMAPHORE:
        await asyncio.sleep(1.1)
        async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
            resp = await client.get(
                f"{BASE_URL}/author/{author_id}",
                params={"fields": AUTHOR_FIELDS},
            )
            if resp.status_code == 404:
                return None
            resp.raise_for_status()
            return resp.json()


async def get_author_papers(
    author_id: str,
    fields: str = "paperId,title,year,citationCount,influentialCitationCount",
) -> list[dict]:
    """Get all papers by an author with pagination."""
    all_papers = []
    offset = 0
    limit = 500

    while True:
        async with REQUEST_SEMAPHORE:
            await asyncio.sleep(1.1)
            async with httpx.AsyncClient(headers=HEADERS, timeout=30) as client:
                resp = await client.get(
                    f"{BASE_URL}/author/{author_id}/papers",
                    params={"fields": fields, "limit": limit, "offset": offset},
                )
                resp.raise_for_status()
                data = resp.json()

        batch = data.get("data", [])
        all_papers.extend(batch)

        if "next" not in data or len(batch) < limit:
            break
        offset = data["next"]

    return all_papers

Bulk Paper Lookup

The batch endpoint is critical for efficiency. One POST for 500 papers instead of 500 individual GETs:

async def batch_get_papers(
    paper_ids: list[str],
    fields: str = "paperId,title,year,citationCount,influentialCitationCount,authors,abstract",
) -> list[Optional[dict]]:
    """
    Fetch up to 500 papers per request via POST.

    paper_ids can be any mix of Semantic Scholar IDs, DOIs, ArXiv IDs.
    Returns list in same order as input; None entries for not-found papers.
    """
    results = []

    for i in range(0, len(paper_ids), 500):
        chunk = paper_ids[i:i + 500]

        async with REQUEST_SEMAPHORE:
            await asyncio.sleep(1.1)
            async with httpx.AsyncClient(headers=HEADERS, timeout=60) as client:
                resp = await client.post(
                    f"{BASE_URL}/paper/batch",
                    params={"fields": fields},
                    json={"ids": chunk},
                )
                resp.raise_for_status()
                results.extend(resp.json())

    return results


async def batch_get_authors(
    author_ids: list[str],
    fields: str = "authorId,name,hIndex,citationCount,paperCount,affiliations",
) -> list[Optional[dict]]:
    """Fetch up to 1000 authors per request via POST."""
    results = []

    for i in range(0, len(author_ids), 1000):
        chunk = author_ids[i:i + 1000]

        async with REQUEST_SEMAPHORE:
            await asyncio.sleep(1.1)
            async with httpx.AsyncClient(headers=HEADERS, timeout=60) as client:
                resp = await client.post(
                    f"{BASE_URL}/author/batch",
                    params={"fields": fields},
                    json={"ids": chunk},
                )
                resp.raise_for_status()
                results.extend(resp.json())

    return results

SQLite Schema

def init_db(db_path: str = "semantic_scholar.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS papers (
            paper_id TEXT PRIMARY KEY,
            title TEXT,
            abstract TEXT,
            year INTEGER,
            citation_count INTEGER DEFAULT 0,
            reference_count INTEGER DEFAULT 0,
            influential_citation_count INTEGER DEFAULT 0,
            tldr TEXT,
            open_access_url TEXT,
            open_access_status TEXT,
            venue TEXT,
            journal TEXT,
            publication_types TEXT,
            fields_of_study TEXT,
            external_ids TEXT,
            authors TEXT,
            corpus_id INTEGER,
            doi TEXT,
            arxiv_id TEXT,
            fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS citations (
            citing_paper_id TEXT NOT NULL,
            cited_paper_id TEXT NOT NULL,
            is_influential INTEGER DEFAULT 0,
            PRIMARY KEY (citing_paper_id, cited_paper_id)
        );

        CREATE TABLE IF NOT EXISTS authors (
            author_id TEXT PRIMARY KEY,
            name TEXT,
            h_index INTEGER,
            citation_count INTEGER,
            paper_count INTEGER,
            affiliations TEXT,
            homepage TEXT,
            fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_citations_cited
            ON citations(cited_paper_id);

        CREATE INDEX IF NOT EXISTS idx_papers_year
            ON papers(year);

        CREATE INDEX IF NOT EXISTS idx_papers_citations
            ON papers(citation_count DESC);

        CREATE INDEX IF NOT EXISTS idx_papers_corpus_id
            ON papers(corpus_id);
    """)
    conn.commit()
    return conn


def save_paper(paper: dict, conn: sqlite3.Connection):
    if not paper or not paper.get("paperId"):
        return

    tldr_text = None
    if paper.get("tldr"):
        tldr_text = paper["tldr"].get("text")

    open_access_url = None
    open_access_status = None
    if paper.get("openAccessPdf"):
        open_access_url = paper["openAccessPdf"].get("url")
        open_access_status = paper["openAccessPdf"].get("status")

    external_ids = paper.get("externalIds") or {}

    conn.execute(
        """INSERT OR REPLACE INTO papers
           (paper_id, title, abstract, year, citation_count, reference_count,
            influential_citation_count, tldr, open_access_url, open_access_status,
            venue, journal, publication_types, fields_of_study, external_ids,
            authors, corpus_id, doi, arxiv_id)
           VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
        (
            paper.get("paperId"),
            paper.get("title"),
            paper.get("abstract"),
            paper.get("year"),
            paper.get("citationCount", 0),
            paper.get("referenceCount", 0),
            paper.get("influentialCitationCount", 0),
            tldr_text,
            open_access_url,
            open_access_status,
            paper.get("venue"),
            paper.get("journal", {}).get("name") if isinstance(paper.get("journal"), dict) else None,
            json.dumps(paper.get("publicationTypes") or []),
            json.dumps([f.get("category") for f in (paper.get("s2FieldsOfStudy") or [])]),
            json.dumps(external_ids),
            json.dumps([a.get("name") for a in (paper.get("authors") or [])]),
            external_ids.get("CorpusId"),
            external_ids.get("DOI"),
            external_ids.get("ArXiv"),
        ),
    )
    conn.commit()


def save_citations(
    cited_paper_id: str,
    citations: list[dict],
    conn: sqlite3.Connection,
):
    conn.executemany(
        """INSERT OR IGNORE INTO citations (citing_paper_id, cited_paper_id, is_influential)
           VALUES (?,?,?)""",
        [
            (
                entry.get("citingPaper", {}).get("paperId"),
                cited_paper_id,
                int(entry.get("isInfluential", False)),
            )
            for entry in citations
            if entry.get("citingPaper", {}).get("paperId")
        ],
    )
    conn.commit()

Error Handling

The Semantic Scholar API returns structured errors that need different handling:

import asyncio

class SemanticScholarClient:
    """Client with retry logic and rate limit handling."""

    def __init__(self, api_key: str = None, requests_per_second: float = 0.9):
        self.headers = {"x-api-key": api_key} if api_key else {}
        self.semaphore = asyncio.Semaphore(1)
        self.interval = 1.0 / requests_per_second

    async def get(self, path: str, **kwargs) -> dict:
        """GET request with exponential backoff retry."""
        url = f"{BASE_URL}{path}"
        max_retries = 5

        for attempt in range(max_retries):
            async with self.semaphore:
                await asyncio.sleep(self.interval)
                try:
                    async with httpx.AsyncClient(
                        headers=self.headers, timeout=30
                    ) as client:
                        resp = await client.get(url, **kwargs)

                        if resp.status_code == 429:
                            # Rate limited — check Retry-After header
                            retry_after = int(resp.headers.get("Retry-After", 60))
                            print(f"  Rate limited. Waiting {retry_after}s...")
                            await asyncio.sleep(retry_after)
                            continue

                        if resp.status_code == 404:
                            return {}

                        resp.raise_for_status()
                        return resp.json()

                except httpx.HTTPStatusError as e:
                    if e.response.status_code in (500, 502, 503) and attempt < max_retries - 1:
                        wait = 2 ** attempt * 5
                        print(f"  Server error {e.response.status_code}, retry in {wait}s")
                        await asyncio.sleep(wait)
                        continue
                    raise

        return {}

Citation Graph Traversal

A common use case: map the citation network for an entire research area, starting from a set of seed papers:

async def build_citation_graph(
    seed_paper_ids: list[str],
    max_depth: int = 2,
    min_citations: int = 10,
    db_path: str = "semantic_scholar.db",
):
    """
    BFS traversal of the citation graph starting from seed papers.
    Fetches all papers that cite the seeds, then papers that cite those, etc.
    Filters by min_citations to avoid pulling in obscure papers.
    """
    conn = init_db(db_path)
    client = SemanticScholarClient(api_key=API_KEY)

    visited = set()
    current_frontier = set(seed_paper_ids)

    for depth in range(max_depth + 1):
        print(f"\nDepth {depth}: {len(current_frontier)} papers to process")
        next_frontier = set()

        # Batch fetch paper details
        ids_list = list(current_frontier - visited)
        papers = await batch_get_papers(ids_list)

        for paper in papers:
            if not paper:
                continue
            save_paper(paper, conn)
            visited.add(paper["paperId"])

        # Fetch citations for each paper
        for paper_id in ids_list:
            if depth < max_depth:
                citations = await get_all_citations(paper_id)
                save_citations(paper_id, citations, conn)

                # Add highly-cited papers to next frontier
                for entry in citations:
                    citing_paper = entry.get("citingPaper", {})
                    pid = citing_paper.get("paperId")
                    if pid and pid not in visited:
                        if citing_paper.get("citationCount", 0) >= min_citations:
                            next_frontier.add(pid)

        current_frontier = next_frontier

    conn.close()
    print(f"\nCitation graph built. Visited {len(visited)} papers.")

Practical Tips

Use CorpusId for stable references. The paperId hex string is stable, but CorpusId (integer) is shorter and more convenient for join-heavy SQL queries. It lives in externalIds.CorpusId.

Search pagination caps at 10,000. The /paper/search endpoint lets you paginate up to offset 9,999. Beyond that, split your query with tighter constraints — add date ranges like year:2020-2023 in the query string, or use more specific terms.

TLDR coverage is patchy. The TLDR field is model-generated and only exists for a subset of papers. Do not build pipeline logic that assumes it will be present.

Influential citation count is the interesting metric. citationCount tells you how often a paper is cited. influentialCitationCount tells you how often it was cited as foundational work. The ratio between them signals whether a paper is a cornerstone or just frequently mentioned in passing.

Scaling horizontally. For citation graph traversals across thousands of seed papers — say, mapping an entire subfield like "transformer architectures" — you will exhaust the per-key rate limit quickly. The right approach is multiple workers, each with its own API key (Semantic Scholar allows this for research use), running behind ThorData residential proxies to avoid IP-level throttling across workers. One key plus one IP per worker keeps you within limits without building exponential backoff into every call.

Open access PDFs. The openAccessPdf.url field gives you direct download links for papers with GREEN or GOLD status. These are hosted on ArXiv, PubMed Central, or publisher sites. You can download these directly without going through any paywall.

Complete Example

async def main():
    """Pull papers about a topic and their citation network."""
    conn = init_db("ml_papers.db")

    # Search for papers
    results = await search_papers_paginated(
        "attention mechanism transformer", max_results=50
    )
    print(f"Found {len(results)} papers")

    # Batch enrich with full details
    paper_ids = [p["paperId"] for p in results]
    papers = await batch_get_papers(paper_ids, fields=PAPER_FIELDS)

    for paper in papers:
        if paper:
            save_paper(paper, conn)

    # Fetch citations for top papers
    top_papers = sorted(
        [p for p in papers if p],
        key=lambda x: x.get("citationCount", 0),
        reverse=True,
    )[:10]

    for paper in top_papers:
        print(f"\nFetching citations for: {paper['title'][:60]}")
        citations = await get_all_citations(paper["paperId"])
        save_citations(paper["paperId"], citations, conn)
        print(f"  {len(citations)} citations saved")

    conn.close()


asyncio.run(main())

Analyzing the Data with SQL

With papers, citations, and authors in SQLite, you can run meaningful research analysis:

conn = sqlite3.connect("semantic_scholar.db")

# Most cited papers in the dataset
most_cited = conn.execute("""
    SELECT title, year, citation_count, influential_citation_count,
           ROUND(influential_citation_count * 1.0 / citation_count * 100, 1) AS influential_pct
    FROM papers
    WHERE citation_count > 0
    ORDER BY citation_count DESC
    LIMIT 20
""").fetchall()

# Papers with highest ratio of influential citations (foundational work)
most_influential = conn.execute("""
    SELECT title, year, citation_count, influential_citation_count,
           ROUND(influential_citation_count * 1.0 / citation_count * 100, 1) AS influential_pct
    FROM papers
    WHERE citation_count >= 50 AND influential_citation_count > 0
    ORDER BY influential_pct DESC
    LIMIT 20
""").fetchall()

# Citation velocity by year (how quickly papers accumulate citations)
velocity_by_year = conn.execute("""
    SELECT year,
           COUNT(*) AS paper_count,
           AVG(citation_count) AS avg_citations,
           MAX(citation_count) AS max_citations
    FROM papers
    WHERE year >= 2015 AND year <= 2024
    GROUP BY year
    ORDER BY year
""").fetchall()

# Open access coverage
oa_stats = conn.execute("""
    SELECT
        COALESCE(open_access_status, 'CLOSED') AS status,
        COUNT(*) AS count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM papers), 1) AS pct
    FROM papers
    GROUP BY status
    ORDER BY count DESC
""").fetchall()

# Papers per field of study
field_counts = conn.execute("""
    SELECT
        json_each.value AS field,
        COUNT(*) AS paper_count,
        AVG(citation_count) AS avg_citations
    FROM papers, json_each(fields_of_study)
    WHERE fields_of_study IS NOT NULL AND fields_of_study != '[]'
    GROUP BY field
    ORDER BY paper_count DESC
    LIMIT 15
""").fetchall()

for field, count, avg_cit in field_counts:
    print(f"  {field:<30} {count:>5} papers  avg {avg_cit:.0f} citations")

Finding Author Collaboration Networks

async def build_author_coauthorship(
    paper_ids: list[str],
    db_path: str = "semantic_scholar.db",
):
    """
    Build a coauthorship network from a set of papers.
    Stores author pairs who have co-authored at least one paper.
    """
    conn = sqlite3.connect(db_path)

    # Create coauthorship table
    conn.execute("""
        CREATE TABLE IF NOT EXISTS coauthorships (
            author1_id TEXT NOT NULL,
            author2_id TEXT NOT NULL,
            paper_count INTEGER DEFAULT 1,
            PRIMARY KEY (author1_id, author2_id)
        )
    """)

    # Fetch full author data for all papers
    papers = await batch_get_papers(
        paper_ids,
        fields="paperId,authors"
    )

    for paper in papers:
        if not paper:
            continue
        authors = paper.get("authors", [])
        author_ids = [a.get("authorId") for a in authors if a.get("authorId")]

        # Create pairs
        for i, a1 in enumerate(author_ids):
            for a2 in author_ids[i+1:]:
                pair = tuple(sorted([a1, a2]))
                conn.execute(
                    """INSERT INTO coauthorships (author1_id, author2_id, paper_count)
                       VALUES (?, ?, 1)
                       ON CONFLICT(author1_id, author2_id)
                       DO UPDATE SET paper_count = paper_count + 1""",
                    pair,
                )

    conn.commit()

    # Find most collaborative authors
    top_collaborators = conn.execute("""
        SELECT author1_id, COUNT(DISTINCT author2_id) AS collaborator_count,
               SUM(paper_count) AS total_papers
        FROM coauthorships
        GROUP BY author1_id
        ORDER BY collaborator_count DESC
        LIMIT 10
    """).fetchall()

    conn.close()
    return top_collaborators