← Back to blog

Scrape arXiv Research Papers: Metadata, Abstracts & Citation Data (2026)

Scrape arXiv Research Papers: Metadata, Abstracts & Citation Data (2026)

arXiv hosts over 2.5 million research papers across physics, mathematics, computer science, quantitative biology, economics, and statistics. If you're building a research tool, training models on academic text, tracking publication trends in a field, mapping author collaboration networks, or doing literature review automation — arXiv is the primary open data source.

The good news: arXiv provides an official API and bulk data access, and they actively encourage programmatic use. The data is open science by design. The bad news: rate limits are tighter than you'd expect, the XML responses require careful parsing, and for PDF downloads at scale you still need proxy rotation to avoid hitting bandwidth limits.

This guide covers the full stack: the Atom search API, the OAI-PMH bulk harvest protocol, author network analysis, citation data via Semantic Scholar, PDF download pipelines, and proxy setup for large-scale collection.


What Data arXiv Exposes

Each paper record contains:

The API also supports field-specific search across title, author, abstract, comment, journal_reference, subject_category, and all.


arXiv Category Reference

Common categories you'll search across:

Category Field
cs.AI Artificial Intelligence
cs.LG Machine Learning
cs.CL Computation and Language (NLP)
cs.CV Computer Vision
cs.RO Robotics
cs.SE Software Engineering
cs.CR Cryptography and Security
stat.ML Machine Learning (Statistics)
math.OC Optimization and Control
q-bio.NC Neurons and Cognition
econ.GN General Economics
physics.comp-ph Computational Physics

arXiv Search API

The API uses Atom XML and supports complex boolean queries:

import requests
import xml.etree.ElementTree as ET
import time
import json
from datetime import datetime, timedelta
from typing import Optional, List
from dataclasses import dataclass, field

ARXIV_API = "http://export.arxiv.org/api/query"

ATOM_NS = "http://www.w3.org/2005/Atom"
ARXIV_NS = "http://arxiv.org/schemas/atom"
OPENSEARCH_NS = "http://a9.com/-/spec/opensearch/1.1/"

@dataclass
class ArxivPaper:
    arxiv_id: str
    title: str
    abstract: str
    authors: List[str] = field(default_factory=list)
    affiliations: List[str] = field(default_factory=list)
    categories: List[str] = field(default_factory=list)
    primary_category: str = ""
    published: str = ""
    updated: str = ""
    journal_ref: Optional[str] = None
    doi: Optional[str] = None
    comment: Optional[str] = None
    pdf_url: Optional[str] = None
    html_url: Optional[str] = None

def parse_atom_entry(entry, ns):
    """Parse a single Atom entry into an ArxivPaper."""
    def get_text(tag, default=""):
        el = entry.find(tag, ns)
        return " ".join(el.text.split()) if el is not None and el.text else default

    # arXiv ID from the <id> field (URL like http://arxiv.org/abs/2301.13688v2)
    id_el = entry.find(f"{{{ATOM_NS}}}id")
    raw_id = id_el.text if id_el is not None else ""
    arxiv_id = raw_id.split("/abs/")[-1].split("v")[0]  # Strip version

    # Authors
    authors = []
    affiliations = []
    for author_el in entry.findall(f"{{{ATOM_NS}}}author"):
        name_el = author_el.find(f"{{{ATOM_NS}}}name")
        if name_el is not None and name_el.text:
            authors.append(name_el.text.strip())
        affil_el = author_el.find(f"{{{ARXIV_NS}}}affiliation")
        if affil_el is not None and affil_el.text:
            affiliations.append(affil_el.text.strip())

    # Categories
    categories = [
        c.get("term", "") for c in entry.findall(f"{{{ATOM_NS}}}category")
    ]
    primary_cat_el = entry.find(f"{{{ARXIV_NS}}}primary_category")
    primary_category = primary_cat_el.get("term", "") if primary_cat_el is not None else ""

    # Links
    pdf_url = None
    html_url = None
    for link in entry.findall(f"{{{ATOM_NS}}}link"):
        href = link.get("href", "")
        title = link.get("title", "")
        rel = link.get("rel", "")
        if title == "pdf":
            pdf_url = href
        elif title == "html":
            html_url = href
        elif rel == "related" and "pdf" in href:
            pdf_url = href

    # Optional fields
    journal_ref_el = entry.find(f"{{{ARXIV_NS}}}journal_ref")
    doi_el = entry.find(f"{{{ARXIV_NS}}}doi")
    comment_el = entry.find(f"{{{ARXIV_NS}}}comment")

    return ArxivPaper(
        arxiv_id=arxiv_id,
        title=get_text(f"{{{ATOM_NS}}}title"),
        abstract=get_text(f"{{{ATOM_NS}}}summary"),
        authors=authors,
        affiliations=affiliations,
        categories=categories,
        primary_category=primary_category,
        published=get_text(f"{{{ATOM_NS}}}published"),
        updated=get_text(f"{{{ATOM_NS}}}updated"),
        journal_ref=journal_ref_el.text.strip() if journal_ref_el is not None and journal_ref_el.text else None,
        doi=doi_el.text.strip() if doi_el is not None and doi_el.text else None,
        comment=comment_el.text.strip() if comment_el is not None and comment_el.text else None,
        pdf_url=pdf_url,
        html_url=html_url,
    )

def search_arxiv(
    query: str,
    start: int = 0,
    max_results: int = 25,
    sort_by: str = "submittedDate",
    sort_order: str = "descending",
    proxy_url: str = None,
) -> tuple:
    """
    Search arXiv API.
    Returns (papers, total_results).
    sort_by: submittedDate, lastUpdatedDate, relevance
    sort_order: ascending, descending
    """
    params = {
        "search_query": query,
        "start": start,
        "max_results": min(max_results, 2000),
        "sortBy": sort_by,
        "sortOrder": sort_order,
    }

    proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
    headers = {
        "User-Agent": "ArxivResearchBot/1.0 (research data collection; [email protected])"
    }

    resp = requests.get(ARXIV_API, params=params, headers=headers,
                        proxies=proxies, timeout=60)
    resp.raise_for_status()

    root = ET.fromstring(resp.content)
    ns = {"atom": ATOM_NS, "arxiv": ARXIV_NS}

    # Total results count
    total_el = root.find(f"{{{OPENSEARCH_NS}}}totalResults")
    total = int(total_el.text) if total_el is not None else 0

    papers = []
    for entry in root.findall(f"{{{ATOM_NS}}}entry"):
        try:
            papers.append(parse_atom_entry(entry, ns))
        except Exception as e:
            print(f"Parse error: {e}")

    return papers, total

# Example: recent LLM papers
papers, total = search_arxiv(
    query="ti:large language model OR abs:large language model",
    max_results=10,
    sort_by="submittedDate",
)
print(f"Total papers matching query: {total:,}")
for p in papers:
    print(f"\n[{p.arxiv_id}] {p.title[:80]}")
    print(f"  Authors: {', '.join(p.authors[:3])}{'...' if len(p.authors) > 3 else ''}")
    print(f"  Categories: {', '.join(p.categories[:3])}")
    if p.journal_ref:
        print(f"  Published in: {p.journal_ref}")

Query Syntax

arXiv supports rich boolean query syntax:

# Field-specific searches
QUERY_EXAMPLES = {
    # By field
    "by_title": "ti:transformer attention",
    "by_abstract": "abs:reinforcement learning reward",
    "by_author": "au:Bengio_Y",
    "by_category": "cat:cs.LG",
    "by_all": "all:diffusion model image generation",

    # Boolean combinations
    "combined": "ti:BERT AND abs:fine-tuning",
    "multiple_authors": "au:Lecun AND au:Bengio",
    "recent_category": "cat:cs.CL AND submittedDate:[202501 TO 202506]",

    # Date range (YYYYMM format)
    "date_range": "cat:cs.AI AND submittedDate:[20250101 TO 20250631]",

    # Exclude terms
    "exclude": "ti:neural network AND NOT ti:deep",
}

Paginated Batch Collection

For collecting more than 2,000 papers, paginate with the start parameter:

def fetch_papers_paginated(
    query: str,
    total_target: int = 5000,
    batch_size: int = 100,
    proxy_url: str = None,
    save_path: str = None,
) -> List[ArxivPaper]:
    """
    Fetch papers in batches, respecting arXiv's 3-second rate limit.
    Saves progress to JSONL file to allow resumption.
    """
    all_papers = []
    seen_ids = set()

    # Load previously saved if resuming
    if save_path:
        from pathlib import Path
        path = Path(save_path)
        if path.exists():
            for line in path.read_text().splitlines():
                try:
                    paper_dict = json.loads(line)
                    seen_ids.add(paper_dict.get("arxiv_id", ""))
                except json.JSONDecodeError:
                    continue
            print(f"Resuming: {len(seen_ids)} papers already collected")

    start = len(seen_ids)

    while len(all_papers) + len(seen_ids) < total_target:
        try:
            batch, total = search_arxiv(
                query, start=start, max_results=batch_size, proxy_url=proxy_url
            )
        except Exception as e:
            print(f"Fetch error at start={start}: {e}")
            time.sleep(10)
            continue

        if not batch:
            print("Empty batch, done.")
            break

        new_papers = [p for p in batch if p.arxiv_id not in seen_ids]

        if save_path:
            with open(save_path, "a") as f:
                for p in new_papers:
                    f.write(json.dumps(vars(p)) + "\n")

        all_papers.extend(new_papers)
        for p in new_papers:
            seen_ids.add(p.arxiv_id)

        current = len(all_papers) + (start - len(new_papers))
        print(f"Collected {len(seen_ids)}/{total} total | batch: {len(new_papers)} new papers")

        if start >= total:
            print(f"Reached end of results ({total} total).")
            break

        start += batch_size
        time.sleep(3.5)  # arXiv requires >= 3s between requests

    return all_papers

OAI-PMH Bulk Harvest

For entire categories or date ranges — much faster than the search API for bulk work:

OAI_BASE = "http://export.arxiv.org/oai2"

def parse_oai_record(record, ns):
    """Parse an OAI-PMH arXiv record."""
    meta = record.find(".//arxiv:arXiv", ns)
    if meta is None:
        return None

    def get_text(tag):
        el = meta.find(tag, ns)
        return " ".join(el.text.split()) if el is not None and el.text else ""

    # Authors from nested structure
    authors = []
    authors_el = meta.find("arxiv:authors", ns)
    if authors_el is not None:
        for author_el in authors_el.findall("arxiv:author", ns):
            name_parts = []
            fn = author_el.find("arxiv:forenames", ns)
            ln = author_el.find("arxiv:keyname", ns)
            if fn is not None and fn.text:
                name_parts.append(fn.text.strip())
            if ln is not None and ln.text:
                name_parts.append(ln.text.strip())
            if name_parts:
                authors.append(" ".join(name_parts))

    # Categories as space-separated string
    cats_el = meta.find("arxiv:categories", ns)
    categories = cats_el.text.split() if cats_el is not None and cats_el.text else []

    return {
        "arxiv_id": get_text("arxiv:id"),
        "title": get_text("arxiv:title"),
        "abstract": get_text("arxiv:abstract"),
        "authors": authors,
        "categories": categories,
        "created": get_text("arxiv:created"),
        "updated": get_text("arxiv:updated"),
        "doi": get_text("arxiv:doi") or None,
        "journal_ref": get_text("arxiv:journal-ref") or None,
        "msc_class": get_text("arxiv:msc-class") or None,
    }

def harvest_oai(
    category: str,
    from_date: str = None,
    until_date: str = None,
    proxy_url: str = None,
    save_path: str = None,
) -> List[dict]:
    """
    Harvest all papers from an arXiv category using OAI-PMH.
    from_date / until_date: YYYY-MM-DD format
    category: e.g., "cs.AI", "cs.LG", "math.OC"
    """
    if from_date is None:
        from_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")

    ns = {
        "oai": "http://www.openarchives.org/OAI/2.0/",
        "arxiv": "http://arxiv.org/OAI/arXiv/",
    }

    params = {
        "verb": "ListRecords",
        "metadataPrefix": "arXiv",
        "set": category,
        "from": from_date,
    }
    if until_date:
        params["until"] = until_date

    proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
    headers = {"User-Agent": "ArxivHarvester/1.0 (academic research)"}

    all_records = []
    page_num = 0
    resumption_token = None

    while True:
        page_num += 1

        if resumption_token:
            fetch_params = {"verb": "ListRecords", "resumptionToken": resumption_token}
        else:
            fetch_params = params

        try:
            resp = requests.get(OAI_BASE, params=fetch_params, headers=headers,
                                proxies=proxies, timeout=120)
            resp.raise_for_status()
        except requests.RequestException as e:
            print(f"OAI fetch error on page {page_num}: {e}")
            time.sleep(15)
            continue

        root = ET.fromstring(resp.content)

        # Check for OAI errors
        error_el = root.find(".//oai:error", ns)
        if error_el is not None:
            error_code = error_el.get("code", "unknown")
            if error_code == "noRecordsMatch":
                print("No records match the query.")
                break
            print(f"OAI error: {error_code} — {error_el.text}")
            break

        # Parse records
        records = root.findall(".//oai:record", ns)
        page_records = []
        for record in records:
            # Skip deleted records
            header = record.find("oai:header", ns)
            if header is not None and header.get("status") == "deleted":
                continue

            parsed = parse_oai_record(record, ns)
            if parsed:
                page_records.append(parsed)

        all_records.extend(page_records)

        # Save incrementally
        if save_path and page_records:
            with open(save_path, "a") as f:
                for r in page_records:
                    f.write(json.dumps(r, default=str) + "\n")

        # Get resumption token for next page
        token_el = root.find(".//{http://www.openarchives.org/OAI/2.0/}resumptionToken")
        total_records = int(token_el.get("completeListSize", 0)) if token_el is not None else 0
        resumption_token = token_el.text if token_el is not None else None

        print(f"Page {page_num}: {len(page_records)} records | "
              f"Total: {len(all_records)}/{total_records or '?'}")

        if not resumption_token:
            print("Harvest complete.")
            break

        time.sleep(3.5)

    return all_records

# Harvest all cs.AI papers from the last month
papers = harvest_oai(
    "cs.AI",
    from_date=(datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d"),
    save_path="cs_ai_papers.jsonl",
)
print(f"Harvested {len(papers)} papers from cs.AI")

Author Network Analysis

Build co-authorship and citation networks from harvested data:

from collections import defaultdict
from itertools import combinations
import json

def build_coauthor_network(papers):
    """
    Build a weighted co-authorship graph.
    Returns edges with weights (number of shared papers) and node stats.
    """
    edges = defaultdict(int)
    author_papers = defaultdict(list)
    author_citations = defaultdict(int)

    for paper in papers:
        authors = paper.get("authors", []) if isinstance(paper, dict) else paper.authors
        paper_id = paper.get("arxiv_id", "") if isinstance(paper, dict) else paper.arxiv_id

        for author in authors:
            author_papers[author].append(paper_id)

        # Create weighted edges for all co-author pairs
        for a1, a2 in combinations(sorted(authors), 2):
            edges[(a1, a2)] += 1

    # Author prolificacy stats
    author_stats = {
        author: {
            "paper_count": len(pids),
            "collaborator_count": len([
                e for e in edges if author in e
            ]),
        }
        for author, pids in author_papers.items()
    }

    return {
        "edges": dict(edges),
        "author_stats": author_stats,
        "total_authors": len(author_papers),
        "total_edges": len(edges),
        "total_papers": len(papers),
    }

def find_key_authors(network, top_n=20):
    """Find the most connected/prolific authors."""
    stats = network["author_stats"]

    # By paper count
    by_papers = sorted(stats.items(), key=lambda x: -x[1]["paper_count"])[:top_n]
    # By collaboration count
    by_collabs = sorted(stats.items(), key=lambda x: -x[1]["collaborator_count"])[:top_n]

    return {
        "most_prolific": by_papers,
        "most_collaborative": by_collabs,
    }

def find_research_clusters(network, min_edge_weight=3):
    """Identify author clusters with strong collaboration ties."""
    strong_edges = {
        edge: weight
        for edge, weight in network["edges"].items()
        if weight >= min_edge_weight
    }

    # Build adjacency list
    adj = defaultdict(set)
    for (a1, a2) in strong_edges:
        adj[a1].add(a2)
        adj[a2].add(a1)

    # Simple connected components via BFS
    visited = set()
    clusters = []

    for node in adj:
        if node in visited:
            continue
        cluster = set()
        queue = [node]
        while queue:
            current = queue.pop(0)
            if current in visited:
                continue
            visited.add(current)
            cluster.add(current)
            queue.extend(adj[current] - visited)
        if len(cluster) > 2:
            clusters.append(sorted(cluster))

    return sorted(clusters, key=len, reverse=True)

# Usage
papers_data = [vars(p) for p in papers]  # Convert dataclasses to dicts
network = build_coauthor_network(papers_data)
print(f"Network: {network['total_authors']} authors, {network['total_edges']} co-author pairs")

key_authors = find_key_authors(network)
print("\nMost prolific authors:")
for author, stats in key_authors["most_prolific"][:10]:
    print(f"  {author}: {stats['paper_count']} papers")

clusters = find_research_clusters(network)
print(f"\nFound {len(clusters)} research clusters")
for i, cluster in enumerate(clusters[:3]):
    print(f"  Cluster {i+1} ({len(cluster)} authors): {cluster[:3]}...")

Citation Data via Semantic Scholar

arXiv doesn't provide citation counts. Use Semantic Scholar's free API:

S2_BASE = "https://api.semanticscholar.org/graph/v1"

def get_citations(
    arxiv_id: str,
    s2_api_key: str = None,
    proxy_url: str = None,
) -> dict:
    """
    Get citation data from Semantic Scholar for an arXiv paper.
    Free tier: 100 req/5min unauthenticated, higher with API key.
    """
    url = f"{S2_BASE}/paper/ARXIV:{arxiv_id}"
    params = {
        "fields": (
            "citationCount,influentialCitationCount,"
            "citations.title,citations.year,citations.externalIds,"
            "references.title,references.year,"
            "authors.name,authors.hIndex"
        )
    }

    headers = {"User-Agent": "ResearchTool/1.0"}
    if s2_api_key:
        headers["x-api-key"] = s2_api_key

    proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None

    try:
        resp = requests.get(url, params=params, headers=headers,
                            proxies=proxies, timeout=30)
        if resp.status_code == 404:
            return None
        if resp.status_code == 429:
            print("S2 rate limited, waiting...")
            time.sleep(30)
            return None
        resp.raise_for_status()
        data = resp.json()
    except requests.RequestException as e:
        print(f"S2 error for {arxiv_id}: {e}")
        return None

    recent_citations = [
        c for c in data.get("citations", [])
        if isinstance(c, dict) and c.get("year") and c["year"] >= 2024
    ]

    return {
        "arxiv_id": arxiv_id,
        "s2_paper_id": data.get("paperId"),
        "citation_count": data.get("citationCount", 0),
        "influential_citations": data.get("influentialCitationCount", 0),
        "recent_citation_count": len(recent_citations),
        "recent_citations": recent_citations[:10],
        "reference_count": len(data.get("references", [])),
        "author_h_indices": [
            {"name": a["name"], "h_index": a.get("hIndex")}
            for a in data.get("authors", [])[:5]
        ],
    }

def enrich_with_citations(papers, max_papers=100, s2_api_key=None):
    """Add citation data to a list of papers."""
    enriched = []

    for i, paper in enumerate(papers[:max_papers]):
        arxiv_id = paper.get("arxiv_id") if isinstance(paper, dict) else paper.arxiv_id

        cite_data = get_citations(arxiv_id, s2_api_key)
        if cite_data:
            if isinstance(paper, dict):
                paper["citations"] = cite_data
            else:
                paper = vars(paper)
                paper["citations"] = cite_data
        enriched.append(paper)

        print(f"  [{i+1}/{min(len(papers), max_papers)}] {arxiv_id}: "
              f"{cite_data.get('citation_count', 0) if cite_data else 'N/A'} citations")
        time.sleep(1.5)

    return enriched

PDF Download Pipeline

Download PDFs at scale with proxy rotation:

import hashlib
from pathlib import Path

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"

def get_proxy(session_id=None):
    """ThorData residential proxy URL."""
    user = THORDATA_USER
    if session_id:
        user = f"{THORDATA_USER}-session-{session_id}"
    return f"http://{user}:{THORDATA_PASS}@proxy.thordata.com:9000"

def download_pdf(
    arxiv_id: str,
    output_dir: str = "papers",
    proxy_url: str = None,
    skip_existing: bool = True,
) -> dict:
    """
    Download an arXiv PDF.
    Returns {"arxiv_id": ..., "path": ..., "size_bytes": ..., "status": ...}
    """
    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    output_path = out_dir / f"{arxiv_id.replace('/', '_')}.pdf"

    if skip_existing and output_path.exists() and output_path.stat().st_size > 1000:
        return {"arxiv_id": arxiv_id, "path": str(output_path),
                "size_bytes": output_path.stat().st_size, "status": "skipped"}

    url = f"https://arxiv.org/pdf/{arxiv_id}.pdf"
    headers = {"User-Agent": "ArxivPDFDownloader/1.0 (academic research)"}
    proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None

    try:
        resp = requests.get(url, headers=headers, proxies=proxies,
                            timeout=120, stream=True)
        resp.raise_for_status()

        # Check it's actually a PDF
        content_type = resp.headers.get("Content-Type", "")
        if "pdf" not in content_type.lower() and "html" in content_type.lower():
            return {"arxiv_id": arxiv_id, "status": "rate_limited",
                    "path": None, "size_bytes": 0}

        with open(output_path, "wb") as f:
            for chunk in resp.iter_content(chunk_size=65536):
                if chunk:
                    f.write(chunk)

        size = output_path.stat().st_size
        return {"arxiv_id": arxiv_id, "path": str(output_path),
                "size_bytes": size, "status": "downloaded"}

    except requests.RequestException as e:
        return {"arxiv_id": arxiv_id, "status": f"error: {e}",
                "path": None, "size_bytes": 0}

def download_pdf_batch(
    arxiv_ids: list,
    output_dir: str = "papers",
    proxy_url: str = None,
    requests_per_minute: int = 10,
) -> list:
    """
    Download PDFs for a list of arXiv IDs with rate limiting.
    arXiv recommends max 4 requests/second; be conservative.
    """
    results = []
    delay = 60.0 / requests_per_minute

    for i, arxiv_id in enumerate(arxiv_ids):
        # Rotate proxy every 20 downloads to avoid bandwidth-based blocks
        if i % 20 == 0 and i > 0:
            import random
            proxy_url = get_proxy(session_id=random.randint(10000, 99999))

        result = download_pdf(arxiv_id, output_dir, proxy_url)
        results.append(result)
        print(f"[{i+1}/{len(arxiv_ids)}] {arxiv_id}: {result['status']} "
              f"({result.get('size_bytes', 0) // 1024} KB)")
        time.sleep(delay + random.uniform(0, delay * 0.3))

    return results

# Download PDFs for the top 50 cited papers
top_papers = sorted(
    [p for p in papers_data if p.get("citations", {}).get("citation_count", 0) > 0],
    key=lambda x: -x.get("citations", {}).get("citation_count", 0),
)[:50]

pdf_ids = [p["arxiv_id"] for p in top_papers]
proxy = get_proxy()
download_results = download_pdf_batch(pdf_ids, proxy_url=proxy, requests_per_minute=8)

Data Storage

import sqlite3
import json

def init_db(db_path="arxiv_papers.db"):
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS papers (
            arxiv_id TEXT PRIMARY KEY,
            title TEXT,
            abstract TEXT,
            authors TEXT,
            affiliations TEXT,
            categories TEXT,
            primary_category TEXT,
            published TEXT,
            updated TEXT,
            journal_ref TEXT,
            doi TEXT,
            comment TEXT,
            pdf_url TEXT,
            html_url TEXT,
            scraped_at TEXT DEFAULT (datetime('now'))
        );

        CREATE TABLE IF NOT EXISTS citations (
            arxiv_id TEXT PRIMARY KEY,
            citation_count INTEGER DEFAULT 0,
            influential_citations INTEGER DEFAULT 0,
            recent_citation_count INTEGER DEFAULT 0,
            reference_count INTEGER DEFAULT 0,
            fetched_at TEXT DEFAULT (datetime('now')),
            FOREIGN KEY (arxiv_id) REFERENCES papers(arxiv_id)
        );

        CREATE TABLE IF NOT EXISTS pdf_downloads (
            arxiv_id TEXT PRIMARY KEY,
            file_path TEXT,
            size_bytes INTEGER,
            downloaded_at TEXT DEFAULT (datetime('now')),
            FOREIGN KEY (arxiv_id) REFERENCES papers(arxiv_id)
        );

        CREATE INDEX IF NOT EXISTS idx_category ON papers(primary_category);
        CREATE INDEX IF NOT EXISTS idx_published ON papers(published);
        CREATE INDEX IF NOT EXISTS idx_citations ON citations(citation_count);
    """)
    conn.commit()
    return conn

def save_papers(conn, papers):
    """Save a batch of papers to SQLite."""
    rows = []
    for p in papers:
        if isinstance(p, ArxivPaper):
            p = vars(p)

        rows.append((
            p.get("arxiv_id"), p.get("title"), p.get("abstract"),
            json.dumps(p.get("authors", [])),
            json.dumps(p.get("affiliations", [])),
            json.dumps(p.get("categories", [])),
            p.get("primary_category"), p.get("published"), p.get("updated"),
            p.get("journal_ref"), p.get("doi"), p.get("comment"),
            p.get("pdf_url"), p.get("html_url"),
        ))

    conn.executemany("""
        INSERT OR REPLACE INTO papers
        (arxiv_id, title, abstract, authors, affiliations, categories,
         primary_category, published, updated, journal_ref, doi,
         comment, pdf_url, html_url)
        VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    """, rows)
    conn.commit()
    print(f"Saved {len(rows)} papers")

def save_citations(conn, citations_list):
    """Save citation data batch."""
    rows = [
        (
            c["arxiv_id"],
            c.get("citation_count", 0),
            c.get("influential_citations", 0),
            c.get("recent_citation_count", 0),
            c.get("reference_count", 0),
        )
        for c in citations_list if c
    ]
    conn.executemany("""
        INSERT OR REPLACE INTO citations
        (arxiv_id, citation_count, influential_citations, recent_citation_count, reference_count)
        VALUES (?,?,?,?,?)
    """, rows)
    conn.commit()

def query_trending_papers(conn, category=None, days_back=7, min_citations=5):
    """Find recently submitted papers gaining traction."""
    params = [days_back, min_citations]
    cat_filter = ""
    if category:
        cat_filter = "AND p.primary_category = ?"
        params.append(category)

    cursor = conn.execute(f"""
        SELECT p.arxiv_id, p.title, p.authors, p.published,
               c.citation_count, c.influential_citations
        FROM papers p
        LEFT JOIN citations c ON p.arxiv_id = c.arxiv_id
        WHERE p.published > datetime('now', '-' || ? || ' days')
          AND c.citation_count >= ?
          {cat_filter}
        ORDER BY c.citation_count DESC
        LIMIT 20
    """, params)

    return cursor.fetchall()

Complete Research Pipeline

def run_research_pipeline(
    categories: list = None,
    days_back: int = 30,
    db_path: str = "arxiv_research.db",
    download_pdfs: bool = False,
    proxy_url: str = None,
):
    """
    Full arXiv data collection pipeline.
    Harvests papers, fetches citations, optionally downloads PDFs.
    """
    if categories is None:
        categories = ["cs.AI", "cs.LG", "cs.CL"]

    conn = init_db(db_path)
    from_date = (datetime.now() - timedelta(days=days_back)).strftime("%Y-%m-%d")
    total_papers = 0

    # Phase 1: Harvest metadata via OAI-PMH
    for category in categories:
        print(f"\n=== Harvesting {category} ===")
        papers = harvest_oai(category, from_date=from_date, proxy_url=proxy_url)
        save_papers(conn, papers)
        total_papers += len(papers)
        print(f"  Saved {len(papers)} papers for {category}")
        time.sleep(3)

    # Phase 2: Enrich with citation data
    print(f"\n=== Fetching citation data ===")
    cursor = conn.execute("""
        SELECT arxiv_id FROM papers
        WHERE arxiv_id NOT IN (SELECT arxiv_id FROM citations)
        LIMIT 500
    """)
    ids_to_enrich = [row[0] for row in cursor.fetchall()]
    print(f"Papers to enrich: {len(ids_to_enrich)}")

    citation_data = []
    for i, arxiv_id in enumerate(ids_to_enrich):
        cites = get_citations(arxiv_id, proxy_url=proxy_url)
        if cites:
            citation_data.append(cites)
        if (i + 1) % 50 == 0:
            save_citations(conn, citation_data)
            citation_data = []
            print(f"  Progress: {i+1}/{len(ids_to_enrich)}")
        time.sleep(1.5)

    if citation_data:
        save_citations(conn, citation_data)

    # Phase 3: PDF downloads (optional)
    if download_pdfs:
        print("\n=== Downloading PDFs ===")
        cursor = conn.execute("""
            SELECT arxiv_id FROM papers
            WHERE arxiv_id NOT IN (SELECT arxiv_id FROM pdf_downloads)
            ORDER BY published DESC LIMIT 100
        """)
        pdf_ids = [row[0] for row in cursor.fetchall()]
        results = download_pdf_batch(pdf_ids, proxy_url=proxy_url)
        for r in results:
            if r["status"] == "downloaded":
                conn.execute("""
                    INSERT OR REPLACE INTO pdf_downloads (arxiv_id, file_path, size_bytes)
                    VALUES (?, ?, ?)
                """, (r["arxiv_id"], r["path"], r["size_bytes"]))
        conn.commit()

    # Summary
    cursor = conn.execute("SELECT COUNT(*) FROM papers")
    total_db = cursor.fetchone()[0]
    cursor = conn.execute("SELECT COUNT(*) FROM citations")
    total_cites = cursor.fetchone()[0]
    print(f"\nPipeline complete: {total_db} papers, {total_cites} with citation data")

    # Print trending papers
    print("\nTop trending recent papers:")
    trending = query_trending_papers(conn, days_back=days_back)
    for arxiv_id, title, authors_json, published, cites, influential in trending[:5]:
        authors = json.loads(authors_json or "[]")[:2]
        author_str = ", ".join(authors) + ("..." if len(json.loads(authors_json or "[]")) > 2 else "")
        print(f"  [{arxiv_id}] {title[:60]}...")
        print(f"    By: {author_str} | {cites} citations ({influential} influential)")

if __name__ == "__main__":
    proxy = get_proxy()
    run_research_pipeline(
        categories=["cs.AI", "cs.LG", "cs.CL"],
        days_back=30,
        proxy_url=proxy,
    )

Rate Limit Strategy

arXiv is explicit about their rate limits:

import time
import random
from functools import wraps

class ArxivRateLimiter:
    """
    Enforces arXiv's rate limiting policies.
    - API: 3 seconds between requests
    - PDF downloads: max 4 requests/second, max 1 simultaneous download
    - OAI-PMH: 3 seconds between resumption token requests
    """

    API_MIN_DELAY = 3.0
    PDF_MIN_DELAY = 0.25
    OAI_MIN_DELAY = 3.0

    def __init__(self):
        self._last_api = 0
        self._last_pdf = 0
        self._last_oai = 0

    def api_wait(self):
        elapsed = time.time() - self._last_api
        required = self.API_MIN_DELAY + random.uniform(0, 1)
        if elapsed < required:
            time.sleep(required - elapsed)
        self._last_api = time.time()

    def pdf_wait(self):
        elapsed = time.time() - self._last_pdf
        if elapsed < self.PDF_MIN_DELAY:
            time.sleep(self.PDF_MIN_DELAY - elapsed)
        self._last_pdf = time.time()

    def oai_wait(self):
        elapsed = time.time() - self._last_oai
        required = self.OAI_MIN_DELAY + random.uniform(0, 1)
        if elapsed < required:
            time.sleep(required - elapsed)
        self._last_oai = time.time()

rate_limiter = ArxivRateLimiter()

ThorData Proxy Integration

For PDF downloads and large-scale API usage, ThorData's residential proxies prevent arXiv from rate-limiting based on your IP:

def get_proxy_for_arxiv(session_id=None):
    """
    ThorData proxy for arXiv.
    Use sticky sessions for sustained PDF download sessions.
    Rotate between papers to distribute bandwidth.
    """
    if session_id:
        user = f"{THORDATA_USER}-session-{session_id}-country-us"
    else:
        user = f"{THORDATA_USER}-country-us"

    return f"http://{user}:{THORDATA_PASS}@proxy.thordata.com:9000"

# Download large batches without hitting bandwidth limits
import random
def download_with_rotation(arxiv_ids, output_dir="papers"):
    """Download PDFs with automatic proxy rotation."""
    results = []
    for i, arxiv_id in enumerate(arxiv_ids):
        # New sticky session every 15 downloads
        session_id = (i // 15) * 1000 + random.randint(1, 999)
        proxy = get_proxy_for_arxiv(session_id=session_id)
        result = download_pdf(arxiv_id, output_dir, proxy)
        results.append(result)
        rate_limiter.pdf_wait()
    return results

Summary

arXiv offers two access paths:

  1. Search API — targeted queries, up to 2,000 results per query, 3-second delay between requests
  2. OAI-PMH harvest — entire categories or date ranges, resumption token pagination, best for bulk collection

Pair it with Semantic Scholar for citation data (free API, no rate limits with API key) and you have a complete research intelligence stack.

Key rules: - Respect the 3-second API delay — arXiv will ban your IP if you ignore it - Use the OAI-PMH protocol for anything over a few hundred papers - For PDF downloads at scale, ThorData residential proxies distribute bandwidth across a pool of IPs - Store everything in SQLite from the start — paper metadata, citation data, and download logs - Build resumable pipelines — OAI-PMH harvests can run for hours; save after every page

The data is legitimately open. arXiv explicitly supports bulk access for research purposes. Use it responsibly and you have access to one of the richest academic datasets in existence.