Scrape arXiv Research Papers: Metadata, Abstracts & Citation Data (2026)
Scrape arXiv Research Papers: Metadata, Abstracts & Citation Data (2026)
arXiv hosts over 2.5 million research papers across physics, mathematics, computer science, quantitative biology, economics, and statistics. If you're building a research tool, training models on academic text, tracking publication trends in a field, mapping author collaboration networks, or doing literature review automation — arXiv is the primary open data source.
The good news: arXiv provides an official API and bulk data access, and they actively encourage programmatic use. The data is open science by design. The bad news: rate limits are tighter than you'd expect, the XML responses require careful parsing, and for PDF downloads at scale you still need proxy rotation to avoid hitting bandwidth limits.
This guide covers the full stack: the Atom search API, the OAI-PMH bulk harvest protocol, author network analysis, citation data via Semantic Scholar, PDF download pipelines, and proxy setup for large-scale collection.
What Data arXiv Exposes
Each paper record contains:
- arXiv ID — unique identifier like
2301.13688(format:YYMM.NNNNN) - Title — cleaned, whitespace-normalized
- Abstract — full summary text
- Authors — names and optional affiliations
- Submission date — original submission
- Last updated — latest version date
- Categories — primary and cross-listed (e.g.,
cs.LG,stat.ML,cs.AI) - Journal reference — if published in a journal
- DOI — Digital Object Identifier if available
- PDF URL — direct link to PDF
- HTML URL — for recent papers with HTML versions
The API also supports field-specific search across title, author, abstract, comment, journal_reference, subject_category, and all.
arXiv Category Reference
Common categories you'll search across:
| Category | Field |
|---|---|
cs.AI |
Artificial Intelligence |
cs.LG |
Machine Learning |
cs.CL |
Computation and Language (NLP) |
cs.CV |
Computer Vision |
cs.RO |
Robotics |
cs.SE |
Software Engineering |
cs.CR |
Cryptography and Security |
stat.ML |
Machine Learning (Statistics) |
math.OC |
Optimization and Control |
q-bio.NC |
Neurons and Cognition |
econ.GN |
General Economics |
physics.comp-ph |
Computational Physics |
arXiv Search API
The API uses Atom XML and supports complex boolean queries:
import requests
import xml.etree.ElementTree as ET
import time
import json
from datetime import datetime, timedelta
from typing import Optional, List
from dataclasses import dataclass, field
ARXIV_API = "http://export.arxiv.org/api/query"
ATOM_NS = "http://www.w3.org/2005/Atom"
ARXIV_NS = "http://arxiv.org/schemas/atom"
OPENSEARCH_NS = "http://a9.com/-/spec/opensearch/1.1/"
@dataclass
class ArxivPaper:
arxiv_id: str
title: str
abstract: str
authors: List[str] = field(default_factory=list)
affiliations: List[str] = field(default_factory=list)
categories: List[str] = field(default_factory=list)
primary_category: str = ""
published: str = ""
updated: str = ""
journal_ref: Optional[str] = None
doi: Optional[str] = None
comment: Optional[str] = None
pdf_url: Optional[str] = None
html_url: Optional[str] = None
def parse_atom_entry(entry, ns):
"""Parse a single Atom entry into an ArxivPaper."""
def get_text(tag, default=""):
el = entry.find(tag, ns)
return " ".join(el.text.split()) if el is not None and el.text else default
# arXiv ID from the <id> field (URL like http://arxiv.org/abs/2301.13688v2)
id_el = entry.find(f"{{{ATOM_NS}}}id")
raw_id = id_el.text if id_el is not None else ""
arxiv_id = raw_id.split("/abs/")[-1].split("v")[0] # Strip version
# Authors
authors = []
affiliations = []
for author_el in entry.findall(f"{{{ATOM_NS}}}author"):
name_el = author_el.find(f"{{{ATOM_NS}}}name")
if name_el is not None and name_el.text:
authors.append(name_el.text.strip())
affil_el = author_el.find(f"{{{ARXIV_NS}}}affiliation")
if affil_el is not None and affil_el.text:
affiliations.append(affil_el.text.strip())
# Categories
categories = [
c.get("term", "") for c in entry.findall(f"{{{ATOM_NS}}}category")
]
primary_cat_el = entry.find(f"{{{ARXIV_NS}}}primary_category")
primary_category = primary_cat_el.get("term", "") if primary_cat_el is not None else ""
# Links
pdf_url = None
html_url = None
for link in entry.findall(f"{{{ATOM_NS}}}link"):
href = link.get("href", "")
title = link.get("title", "")
rel = link.get("rel", "")
if title == "pdf":
pdf_url = href
elif title == "html":
html_url = href
elif rel == "related" and "pdf" in href:
pdf_url = href
# Optional fields
journal_ref_el = entry.find(f"{{{ARXIV_NS}}}journal_ref")
doi_el = entry.find(f"{{{ARXIV_NS}}}doi")
comment_el = entry.find(f"{{{ARXIV_NS}}}comment")
return ArxivPaper(
arxiv_id=arxiv_id,
title=get_text(f"{{{ATOM_NS}}}title"),
abstract=get_text(f"{{{ATOM_NS}}}summary"),
authors=authors,
affiliations=affiliations,
categories=categories,
primary_category=primary_category,
published=get_text(f"{{{ATOM_NS}}}published"),
updated=get_text(f"{{{ATOM_NS}}}updated"),
journal_ref=journal_ref_el.text.strip() if journal_ref_el is not None and journal_ref_el.text else None,
doi=doi_el.text.strip() if doi_el is not None and doi_el.text else None,
comment=comment_el.text.strip() if comment_el is not None and comment_el.text else None,
pdf_url=pdf_url,
html_url=html_url,
)
def search_arxiv(
query: str,
start: int = 0,
max_results: int = 25,
sort_by: str = "submittedDate",
sort_order: str = "descending",
proxy_url: str = None,
) -> tuple:
"""
Search arXiv API.
Returns (papers, total_results).
sort_by: submittedDate, lastUpdatedDate, relevance
sort_order: ascending, descending
"""
params = {
"search_query": query,
"start": start,
"max_results": min(max_results, 2000),
"sortBy": sort_by,
"sortOrder": sort_order,
}
proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
headers = {
"User-Agent": "ArxivResearchBot/1.0 (research data collection; [email protected])"
}
resp = requests.get(ARXIV_API, params=params, headers=headers,
proxies=proxies, timeout=60)
resp.raise_for_status()
root = ET.fromstring(resp.content)
ns = {"atom": ATOM_NS, "arxiv": ARXIV_NS}
# Total results count
total_el = root.find(f"{{{OPENSEARCH_NS}}}totalResults")
total = int(total_el.text) if total_el is not None else 0
papers = []
for entry in root.findall(f"{{{ATOM_NS}}}entry"):
try:
papers.append(parse_atom_entry(entry, ns))
except Exception as e:
print(f"Parse error: {e}")
return papers, total
# Example: recent LLM papers
papers, total = search_arxiv(
query="ti:large language model OR abs:large language model",
max_results=10,
sort_by="submittedDate",
)
print(f"Total papers matching query: {total:,}")
for p in papers:
print(f"\n[{p.arxiv_id}] {p.title[:80]}")
print(f" Authors: {', '.join(p.authors[:3])}{'...' if len(p.authors) > 3 else ''}")
print(f" Categories: {', '.join(p.categories[:3])}")
if p.journal_ref:
print(f" Published in: {p.journal_ref}")
Query Syntax
arXiv supports rich boolean query syntax:
# Field-specific searches
QUERY_EXAMPLES = {
# By field
"by_title": "ti:transformer attention",
"by_abstract": "abs:reinforcement learning reward",
"by_author": "au:Bengio_Y",
"by_category": "cat:cs.LG",
"by_all": "all:diffusion model image generation",
# Boolean combinations
"combined": "ti:BERT AND abs:fine-tuning",
"multiple_authors": "au:Lecun AND au:Bengio",
"recent_category": "cat:cs.CL AND submittedDate:[202501 TO 202506]",
# Date range (YYYYMM format)
"date_range": "cat:cs.AI AND submittedDate:[20250101 TO 20250631]",
# Exclude terms
"exclude": "ti:neural network AND NOT ti:deep",
}
Paginated Batch Collection
For collecting more than 2,000 papers, paginate with the start parameter:
def fetch_papers_paginated(
query: str,
total_target: int = 5000,
batch_size: int = 100,
proxy_url: str = None,
save_path: str = None,
) -> List[ArxivPaper]:
"""
Fetch papers in batches, respecting arXiv's 3-second rate limit.
Saves progress to JSONL file to allow resumption.
"""
all_papers = []
seen_ids = set()
# Load previously saved if resuming
if save_path:
from pathlib import Path
path = Path(save_path)
if path.exists():
for line in path.read_text().splitlines():
try:
paper_dict = json.loads(line)
seen_ids.add(paper_dict.get("arxiv_id", ""))
except json.JSONDecodeError:
continue
print(f"Resuming: {len(seen_ids)} papers already collected")
start = len(seen_ids)
while len(all_papers) + len(seen_ids) < total_target:
try:
batch, total = search_arxiv(
query, start=start, max_results=batch_size, proxy_url=proxy_url
)
except Exception as e:
print(f"Fetch error at start={start}: {e}")
time.sleep(10)
continue
if not batch:
print("Empty batch, done.")
break
new_papers = [p for p in batch if p.arxiv_id not in seen_ids]
if save_path:
with open(save_path, "a") as f:
for p in new_papers:
f.write(json.dumps(vars(p)) + "\n")
all_papers.extend(new_papers)
for p in new_papers:
seen_ids.add(p.arxiv_id)
current = len(all_papers) + (start - len(new_papers))
print(f"Collected {len(seen_ids)}/{total} total | batch: {len(new_papers)} new papers")
if start >= total:
print(f"Reached end of results ({total} total).")
break
start += batch_size
time.sleep(3.5) # arXiv requires >= 3s between requests
return all_papers
OAI-PMH Bulk Harvest
For entire categories or date ranges — much faster than the search API for bulk work:
OAI_BASE = "http://export.arxiv.org/oai2"
def parse_oai_record(record, ns):
"""Parse an OAI-PMH arXiv record."""
meta = record.find(".//arxiv:arXiv", ns)
if meta is None:
return None
def get_text(tag):
el = meta.find(tag, ns)
return " ".join(el.text.split()) if el is not None and el.text else ""
# Authors from nested structure
authors = []
authors_el = meta.find("arxiv:authors", ns)
if authors_el is not None:
for author_el in authors_el.findall("arxiv:author", ns):
name_parts = []
fn = author_el.find("arxiv:forenames", ns)
ln = author_el.find("arxiv:keyname", ns)
if fn is not None and fn.text:
name_parts.append(fn.text.strip())
if ln is not None and ln.text:
name_parts.append(ln.text.strip())
if name_parts:
authors.append(" ".join(name_parts))
# Categories as space-separated string
cats_el = meta.find("arxiv:categories", ns)
categories = cats_el.text.split() if cats_el is not None and cats_el.text else []
return {
"arxiv_id": get_text("arxiv:id"),
"title": get_text("arxiv:title"),
"abstract": get_text("arxiv:abstract"),
"authors": authors,
"categories": categories,
"created": get_text("arxiv:created"),
"updated": get_text("arxiv:updated"),
"doi": get_text("arxiv:doi") or None,
"journal_ref": get_text("arxiv:journal-ref") or None,
"msc_class": get_text("arxiv:msc-class") or None,
}
def harvest_oai(
category: str,
from_date: str = None,
until_date: str = None,
proxy_url: str = None,
save_path: str = None,
) -> List[dict]:
"""
Harvest all papers from an arXiv category using OAI-PMH.
from_date / until_date: YYYY-MM-DD format
category: e.g., "cs.AI", "cs.LG", "math.OC"
"""
if from_date is None:
from_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
ns = {
"oai": "http://www.openarchives.org/OAI/2.0/",
"arxiv": "http://arxiv.org/OAI/arXiv/",
}
params = {
"verb": "ListRecords",
"metadataPrefix": "arXiv",
"set": category,
"from": from_date,
}
if until_date:
params["until"] = until_date
proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
headers = {"User-Agent": "ArxivHarvester/1.0 (academic research)"}
all_records = []
page_num = 0
resumption_token = None
while True:
page_num += 1
if resumption_token:
fetch_params = {"verb": "ListRecords", "resumptionToken": resumption_token}
else:
fetch_params = params
try:
resp = requests.get(OAI_BASE, params=fetch_params, headers=headers,
proxies=proxies, timeout=120)
resp.raise_for_status()
except requests.RequestException as e:
print(f"OAI fetch error on page {page_num}: {e}")
time.sleep(15)
continue
root = ET.fromstring(resp.content)
# Check for OAI errors
error_el = root.find(".//oai:error", ns)
if error_el is not None:
error_code = error_el.get("code", "unknown")
if error_code == "noRecordsMatch":
print("No records match the query.")
break
print(f"OAI error: {error_code} — {error_el.text}")
break
# Parse records
records = root.findall(".//oai:record", ns)
page_records = []
for record in records:
# Skip deleted records
header = record.find("oai:header", ns)
if header is not None and header.get("status") == "deleted":
continue
parsed = parse_oai_record(record, ns)
if parsed:
page_records.append(parsed)
all_records.extend(page_records)
# Save incrementally
if save_path and page_records:
with open(save_path, "a") as f:
for r in page_records:
f.write(json.dumps(r, default=str) + "\n")
# Get resumption token for next page
token_el = root.find(".//{http://www.openarchives.org/OAI/2.0/}resumptionToken")
total_records = int(token_el.get("completeListSize", 0)) if token_el is not None else 0
resumption_token = token_el.text if token_el is not None else None
print(f"Page {page_num}: {len(page_records)} records | "
f"Total: {len(all_records)}/{total_records or '?'}")
if not resumption_token:
print("Harvest complete.")
break
time.sleep(3.5)
return all_records
# Harvest all cs.AI papers from the last month
papers = harvest_oai(
"cs.AI",
from_date=(datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d"),
save_path="cs_ai_papers.jsonl",
)
print(f"Harvested {len(papers)} papers from cs.AI")
Author Network Analysis
Build co-authorship and citation networks from harvested data:
from collections import defaultdict
from itertools import combinations
import json
def build_coauthor_network(papers):
"""
Build a weighted co-authorship graph.
Returns edges with weights (number of shared papers) and node stats.
"""
edges = defaultdict(int)
author_papers = defaultdict(list)
author_citations = defaultdict(int)
for paper in papers:
authors = paper.get("authors", []) if isinstance(paper, dict) else paper.authors
paper_id = paper.get("arxiv_id", "") if isinstance(paper, dict) else paper.arxiv_id
for author in authors:
author_papers[author].append(paper_id)
# Create weighted edges for all co-author pairs
for a1, a2 in combinations(sorted(authors), 2):
edges[(a1, a2)] += 1
# Author prolificacy stats
author_stats = {
author: {
"paper_count": len(pids),
"collaborator_count": len([
e for e in edges if author in e
]),
}
for author, pids in author_papers.items()
}
return {
"edges": dict(edges),
"author_stats": author_stats,
"total_authors": len(author_papers),
"total_edges": len(edges),
"total_papers": len(papers),
}
def find_key_authors(network, top_n=20):
"""Find the most connected/prolific authors."""
stats = network["author_stats"]
# By paper count
by_papers = sorted(stats.items(), key=lambda x: -x[1]["paper_count"])[:top_n]
# By collaboration count
by_collabs = sorted(stats.items(), key=lambda x: -x[1]["collaborator_count"])[:top_n]
return {
"most_prolific": by_papers,
"most_collaborative": by_collabs,
}
def find_research_clusters(network, min_edge_weight=3):
"""Identify author clusters with strong collaboration ties."""
strong_edges = {
edge: weight
for edge, weight in network["edges"].items()
if weight >= min_edge_weight
}
# Build adjacency list
adj = defaultdict(set)
for (a1, a2) in strong_edges:
adj[a1].add(a2)
adj[a2].add(a1)
# Simple connected components via BFS
visited = set()
clusters = []
for node in adj:
if node in visited:
continue
cluster = set()
queue = [node]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
cluster.add(current)
queue.extend(adj[current] - visited)
if len(cluster) > 2:
clusters.append(sorted(cluster))
return sorted(clusters, key=len, reverse=True)
# Usage
papers_data = [vars(p) for p in papers] # Convert dataclasses to dicts
network = build_coauthor_network(papers_data)
print(f"Network: {network['total_authors']} authors, {network['total_edges']} co-author pairs")
key_authors = find_key_authors(network)
print("\nMost prolific authors:")
for author, stats in key_authors["most_prolific"][:10]:
print(f" {author}: {stats['paper_count']} papers")
clusters = find_research_clusters(network)
print(f"\nFound {len(clusters)} research clusters")
for i, cluster in enumerate(clusters[:3]):
print(f" Cluster {i+1} ({len(cluster)} authors): {cluster[:3]}...")
Citation Data via Semantic Scholar
arXiv doesn't provide citation counts. Use Semantic Scholar's free API:
S2_BASE = "https://api.semanticscholar.org/graph/v1"
def get_citations(
arxiv_id: str,
s2_api_key: str = None,
proxy_url: str = None,
) -> dict:
"""
Get citation data from Semantic Scholar for an arXiv paper.
Free tier: 100 req/5min unauthenticated, higher with API key.
"""
url = f"{S2_BASE}/paper/ARXIV:{arxiv_id}"
params = {
"fields": (
"citationCount,influentialCitationCount,"
"citations.title,citations.year,citations.externalIds,"
"references.title,references.year,"
"authors.name,authors.hIndex"
)
}
headers = {"User-Agent": "ResearchTool/1.0"}
if s2_api_key:
headers["x-api-key"] = s2_api_key
proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
try:
resp = requests.get(url, params=params, headers=headers,
proxies=proxies, timeout=30)
if resp.status_code == 404:
return None
if resp.status_code == 429:
print("S2 rate limited, waiting...")
time.sleep(30)
return None
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
print(f"S2 error for {arxiv_id}: {e}")
return None
recent_citations = [
c for c in data.get("citations", [])
if isinstance(c, dict) and c.get("year") and c["year"] >= 2024
]
return {
"arxiv_id": arxiv_id,
"s2_paper_id": data.get("paperId"),
"citation_count": data.get("citationCount", 0),
"influential_citations": data.get("influentialCitationCount", 0),
"recent_citation_count": len(recent_citations),
"recent_citations": recent_citations[:10],
"reference_count": len(data.get("references", [])),
"author_h_indices": [
{"name": a["name"], "h_index": a.get("hIndex")}
for a in data.get("authors", [])[:5]
],
}
def enrich_with_citations(papers, max_papers=100, s2_api_key=None):
"""Add citation data to a list of papers."""
enriched = []
for i, paper in enumerate(papers[:max_papers]):
arxiv_id = paper.get("arxiv_id") if isinstance(paper, dict) else paper.arxiv_id
cite_data = get_citations(arxiv_id, s2_api_key)
if cite_data:
if isinstance(paper, dict):
paper["citations"] = cite_data
else:
paper = vars(paper)
paper["citations"] = cite_data
enriched.append(paper)
print(f" [{i+1}/{min(len(papers), max_papers)}] {arxiv_id}: "
f"{cite_data.get('citation_count', 0) if cite_data else 'N/A'} citations")
time.sleep(1.5)
return enriched
PDF Download Pipeline
Download PDFs at scale with proxy rotation:
import hashlib
from pathlib import Path
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
def get_proxy(session_id=None):
"""ThorData residential proxy URL."""
user = THORDATA_USER
if session_id:
user = f"{THORDATA_USER}-session-{session_id}"
return f"http://{user}:{THORDATA_PASS}@proxy.thordata.com:9000"
def download_pdf(
arxiv_id: str,
output_dir: str = "papers",
proxy_url: str = None,
skip_existing: bool = True,
) -> dict:
"""
Download an arXiv PDF.
Returns {"arxiv_id": ..., "path": ..., "size_bytes": ..., "status": ...}
"""
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
output_path = out_dir / f"{arxiv_id.replace('/', '_')}.pdf"
if skip_existing and output_path.exists() and output_path.stat().st_size > 1000:
return {"arxiv_id": arxiv_id, "path": str(output_path),
"size_bytes": output_path.stat().st_size, "status": "skipped"}
url = f"https://arxiv.org/pdf/{arxiv_id}.pdf"
headers = {"User-Agent": "ArxivPDFDownloader/1.0 (academic research)"}
proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
try:
resp = requests.get(url, headers=headers, proxies=proxies,
timeout=120, stream=True)
resp.raise_for_status()
# Check it's actually a PDF
content_type = resp.headers.get("Content-Type", "")
if "pdf" not in content_type.lower() and "html" in content_type.lower():
return {"arxiv_id": arxiv_id, "status": "rate_limited",
"path": None, "size_bytes": 0}
with open(output_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)
size = output_path.stat().st_size
return {"arxiv_id": arxiv_id, "path": str(output_path),
"size_bytes": size, "status": "downloaded"}
except requests.RequestException as e:
return {"arxiv_id": arxiv_id, "status": f"error: {e}",
"path": None, "size_bytes": 0}
def download_pdf_batch(
arxiv_ids: list,
output_dir: str = "papers",
proxy_url: str = None,
requests_per_minute: int = 10,
) -> list:
"""
Download PDFs for a list of arXiv IDs with rate limiting.
arXiv recommends max 4 requests/second; be conservative.
"""
results = []
delay = 60.0 / requests_per_minute
for i, arxiv_id in enumerate(arxiv_ids):
# Rotate proxy every 20 downloads to avoid bandwidth-based blocks
if i % 20 == 0 and i > 0:
import random
proxy_url = get_proxy(session_id=random.randint(10000, 99999))
result = download_pdf(arxiv_id, output_dir, proxy_url)
results.append(result)
print(f"[{i+1}/{len(arxiv_ids)}] {arxiv_id}: {result['status']} "
f"({result.get('size_bytes', 0) // 1024} KB)")
time.sleep(delay + random.uniform(0, delay * 0.3))
return results
# Download PDFs for the top 50 cited papers
top_papers = sorted(
[p for p in papers_data if p.get("citations", {}).get("citation_count", 0) > 0],
key=lambda x: -x.get("citations", {}).get("citation_count", 0),
)[:50]
pdf_ids = [p["arxiv_id"] for p in top_papers]
proxy = get_proxy()
download_results = download_pdf_batch(pdf_ids, proxy_url=proxy, requests_per_minute=8)
Data Storage
import sqlite3
import json
def init_db(db_path="arxiv_papers.db"):
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS papers (
arxiv_id TEXT PRIMARY KEY,
title TEXT,
abstract TEXT,
authors TEXT,
affiliations TEXT,
categories TEXT,
primary_category TEXT,
published TEXT,
updated TEXT,
journal_ref TEXT,
doi TEXT,
comment TEXT,
pdf_url TEXT,
html_url TEXT,
scraped_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS citations (
arxiv_id TEXT PRIMARY KEY,
citation_count INTEGER DEFAULT 0,
influential_citations INTEGER DEFAULT 0,
recent_citation_count INTEGER DEFAULT 0,
reference_count INTEGER DEFAULT 0,
fetched_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (arxiv_id) REFERENCES papers(arxiv_id)
);
CREATE TABLE IF NOT EXISTS pdf_downloads (
arxiv_id TEXT PRIMARY KEY,
file_path TEXT,
size_bytes INTEGER,
downloaded_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (arxiv_id) REFERENCES papers(arxiv_id)
);
CREATE INDEX IF NOT EXISTS idx_category ON papers(primary_category);
CREATE INDEX IF NOT EXISTS idx_published ON papers(published);
CREATE INDEX IF NOT EXISTS idx_citations ON citations(citation_count);
""")
conn.commit()
return conn
def save_papers(conn, papers):
"""Save a batch of papers to SQLite."""
rows = []
for p in papers:
if isinstance(p, ArxivPaper):
p = vars(p)
rows.append((
p.get("arxiv_id"), p.get("title"), p.get("abstract"),
json.dumps(p.get("authors", [])),
json.dumps(p.get("affiliations", [])),
json.dumps(p.get("categories", [])),
p.get("primary_category"), p.get("published"), p.get("updated"),
p.get("journal_ref"), p.get("doi"), p.get("comment"),
p.get("pdf_url"), p.get("html_url"),
))
conn.executemany("""
INSERT OR REPLACE INTO papers
(arxiv_id, title, abstract, authors, affiliations, categories,
primary_category, published, updated, journal_ref, doi,
comment, pdf_url, html_url)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", rows)
conn.commit()
print(f"Saved {len(rows)} papers")
def save_citations(conn, citations_list):
"""Save citation data batch."""
rows = [
(
c["arxiv_id"],
c.get("citation_count", 0),
c.get("influential_citations", 0),
c.get("recent_citation_count", 0),
c.get("reference_count", 0),
)
for c in citations_list if c
]
conn.executemany("""
INSERT OR REPLACE INTO citations
(arxiv_id, citation_count, influential_citations, recent_citation_count, reference_count)
VALUES (?,?,?,?,?)
""", rows)
conn.commit()
def query_trending_papers(conn, category=None, days_back=7, min_citations=5):
"""Find recently submitted papers gaining traction."""
params = [days_back, min_citations]
cat_filter = ""
if category:
cat_filter = "AND p.primary_category = ?"
params.append(category)
cursor = conn.execute(f"""
SELECT p.arxiv_id, p.title, p.authors, p.published,
c.citation_count, c.influential_citations
FROM papers p
LEFT JOIN citations c ON p.arxiv_id = c.arxiv_id
WHERE p.published > datetime('now', '-' || ? || ' days')
AND c.citation_count >= ?
{cat_filter}
ORDER BY c.citation_count DESC
LIMIT 20
""", params)
return cursor.fetchall()
Complete Research Pipeline
def run_research_pipeline(
categories: list = None,
days_back: int = 30,
db_path: str = "arxiv_research.db",
download_pdfs: bool = False,
proxy_url: str = None,
):
"""
Full arXiv data collection pipeline.
Harvests papers, fetches citations, optionally downloads PDFs.
"""
if categories is None:
categories = ["cs.AI", "cs.LG", "cs.CL"]
conn = init_db(db_path)
from_date = (datetime.now() - timedelta(days=days_back)).strftime("%Y-%m-%d")
total_papers = 0
# Phase 1: Harvest metadata via OAI-PMH
for category in categories:
print(f"\n=== Harvesting {category} ===")
papers = harvest_oai(category, from_date=from_date, proxy_url=proxy_url)
save_papers(conn, papers)
total_papers += len(papers)
print(f" Saved {len(papers)} papers for {category}")
time.sleep(3)
# Phase 2: Enrich with citation data
print(f"\n=== Fetching citation data ===")
cursor = conn.execute("""
SELECT arxiv_id FROM papers
WHERE arxiv_id NOT IN (SELECT arxiv_id FROM citations)
LIMIT 500
""")
ids_to_enrich = [row[0] for row in cursor.fetchall()]
print(f"Papers to enrich: {len(ids_to_enrich)}")
citation_data = []
for i, arxiv_id in enumerate(ids_to_enrich):
cites = get_citations(arxiv_id, proxy_url=proxy_url)
if cites:
citation_data.append(cites)
if (i + 1) % 50 == 0:
save_citations(conn, citation_data)
citation_data = []
print(f" Progress: {i+1}/{len(ids_to_enrich)}")
time.sleep(1.5)
if citation_data:
save_citations(conn, citation_data)
# Phase 3: PDF downloads (optional)
if download_pdfs:
print("\n=== Downloading PDFs ===")
cursor = conn.execute("""
SELECT arxiv_id FROM papers
WHERE arxiv_id NOT IN (SELECT arxiv_id FROM pdf_downloads)
ORDER BY published DESC LIMIT 100
""")
pdf_ids = [row[0] for row in cursor.fetchall()]
results = download_pdf_batch(pdf_ids, proxy_url=proxy_url)
for r in results:
if r["status"] == "downloaded":
conn.execute("""
INSERT OR REPLACE INTO pdf_downloads (arxiv_id, file_path, size_bytes)
VALUES (?, ?, ?)
""", (r["arxiv_id"], r["path"], r["size_bytes"]))
conn.commit()
# Summary
cursor = conn.execute("SELECT COUNT(*) FROM papers")
total_db = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM citations")
total_cites = cursor.fetchone()[0]
print(f"\nPipeline complete: {total_db} papers, {total_cites} with citation data")
# Print trending papers
print("\nTop trending recent papers:")
trending = query_trending_papers(conn, days_back=days_back)
for arxiv_id, title, authors_json, published, cites, influential in trending[:5]:
authors = json.loads(authors_json or "[]")[:2]
author_str = ", ".join(authors) + ("..." if len(json.loads(authors_json or "[]")) > 2 else "")
print(f" [{arxiv_id}] {title[:60]}...")
print(f" By: {author_str} | {cites} citations ({influential} influential)")
if __name__ == "__main__":
proxy = get_proxy()
run_research_pipeline(
categories=["cs.AI", "cs.LG", "cs.CL"],
days_back=30,
proxy_url=proxy,
)
Rate Limit Strategy
arXiv is explicit about their rate limits:
import time
import random
from functools import wraps
class ArxivRateLimiter:
"""
Enforces arXiv's rate limiting policies.
- API: 3 seconds between requests
- PDF downloads: max 4 requests/second, max 1 simultaneous download
- OAI-PMH: 3 seconds between resumption token requests
"""
API_MIN_DELAY = 3.0
PDF_MIN_DELAY = 0.25
OAI_MIN_DELAY = 3.0
def __init__(self):
self._last_api = 0
self._last_pdf = 0
self._last_oai = 0
def api_wait(self):
elapsed = time.time() - self._last_api
required = self.API_MIN_DELAY + random.uniform(0, 1)
if elapsed < required:
time.sleep(required - elapsed)
self._last_api = time.time()
def pdf_wait(self):
elapsed = time.time() - self._last_pdf
if elapsed < self.PDF_MIN_DELAY:
time.sleep(self.PDF_MIN_DELAY - elapsed)
self._last_pdf = time.time()
def oai_wait(self):
elapsed = time.time() - self._last_oai
required = self.OAI_MIN_DELAY + random.uniform(0, 1)
if elapsed < required:
time.sleep(required - elapsed)
self._last_oai = time.time()
rate_limiter = ArxivRateLimiter()
ThorData Proxy Integration
For PDF downloads and large-scale API usage, ThorData's residential proxies prevent arXiv from rate-limiting based on your IP:
def get_proxy_for_arxiv(session_id=None):
"""
ThorData proxy for arXiv.
Use sticky sessions for sustained PDF download sessions.
Rotate between papers to distribute bandwidth.
"""
if session_id:
user = f"{THORDATA_USER}-session-{session_id}-country-us"
else:
user = f"{THORDATA_USER}-country-us"
return f"http://{user}:{THORDATA_PASS}@proxy.thordata.com:9000"
# Download large batches without hitting bandwidth limits
import random
def download_with_rotation(arxiv_ids, output_dir="papers"):
"""Download PDFs with automatic proxy rotation."""
results = []
for i, arxiv_id in enumerate(arxiv_ids):
# New sticky session every 15 downloads
session_id = (i // 15) * 1000 + random.randint(1, 999)
proxy = get_proxy_for_arxiv(session_id=session_id)
result = download_pdf(arxiv_id, output_dir, proxy)
results.append(result)
rate_limiter.pdf_wait()
return results
Summary
arXiv offers two access paths:
- Search API — targeted queries, up to 2,000 results per query, 3-second delay between requests
- OAI-PMH harvest — entire categories or date ranges, resumption token pagination, best for bulk collection
Pair it with Semantic Scholar for citation data (free API, no rate limits with API key) and you have a complete research intelligence stack.
Key rules: - Respect the 3-second API delay — arXiv will ban your IP if you ignore it - Use the OAI-PMH protocol for anything over a few hundred papers - For PDF downloads at scale, ThorData residential proxies distribute bandwidth across a pool of IPs - Store everything in SQLite from the start — paper metadata, citation data, and download logs - Build resumable pipelines — OAI-PMH harvests can run for hours; save after every page
The data is legitimately open. arXiv explicitly supports bulk access for research purposes. Use it responsibly and you have access to one of the richest academic datasets in existence.