Scraping bioRxiv Preprints: Author Networks and Topic Clusters (2026)
Scraping bioRxiv Preprints: Author Networks and Topic Clusters (2026)
bioRxiv is one of the best targets for scraping scientific data. Unlike most academic publishers that lock everything behind paywalls and aggressive bot detection, bioRxiv actually wants you to access their content. They run a public API, serve clean HTML, and their robots.txt is surprisingly permissive. If you're doing bibliometrics, tracking research trends, or building datasets for ML training — this is where you start.
The interesting part isn't just grabbing abstracts. It's what you can build from the metadata: author collaboration graphs, institutional networks, topic clustering over time. That's where the real value sits.
What bioRxiv Exposes
bioRxiv's content API lives at api.biorxiv.org and returns JSON. No authentication needed. You can query by date range, server (biorxiv or medrxiv), and get back structured metadata for every preprint.
Each record includes:
- DOI and title — unique identifier and full title text
- Authors — semicolon-separated list with institutional affiliations
- Abstract — full text, not truncated
- Category — one of ~30 subject areas (neuroscience, genomics, bioinformatics, etc.)
- Dates — posted date, revision dates, version history
- Publication status — whether it got published in a journal, and which one
- License — CC-BY, CC-BY-NC, etc.
The API returns up to 100 results per call with cursor-based pagination. For bulk historical data you'll want to hit the content endpoint day by day or month by month.
Why Scrape bioRxiv?
Beyond the obvious "free science data" angle, there are high-value applications:
- Bibliometric analysis: Track citation networks, collaboration patterns, and institutional output before formal publication
- Research trend detection: Identify emerging topics weeks or months before they appear in peer-reviewed journals
- ML training data: High-quality, domain-specific text with rich metadata labels (category, tools mentioned, methodology)
- Science journalism: Alert on preprints in specific areas as they're posted, before they hit mainstream coverage
- Grant intelligence: See what research directions are active at specific institutions or from specific funders
- Competitive research analysis: Track what competitors (academic groups, biotech companies) are working on
- Altmetrics: Combine preprint view counts with citation data for early impact prediction
The API Approach
Start with the official API. It's rate-limited but generous — sustained queries at one request per second work fine without issues.
import httpx
import time
import json
import sqlite3
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from pathlib import Path
class BioRxivAPI:
"""Client for the bioRxiv/medRxiv content API."""
BASE_URL = "https://api.biorxiv.org"
def __init__(self, delay: float = 1.0, server: str = "biorxiv"):
self.client = httpx.Client(timeout=30)
self.delay = delay
self.server = server # 'biorxiv' or 'medrxiv'
self._last_request = 0.0
def _rate_limit(self):
elapsed = time.time() - self._last_request
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
self._last_request = time.time()
def fetch_date_range(
self,
start: str,
end: str,
cursor: int = 0,
verbose: bool = True,
) -> List[Dict]:
"""Fetch all preprints in a date range.
start, end: 'YYYY-MM-DD' format
"""
all_papers = []
current_cursor = cursor
while True:
self._rate_limit()
url = f"{self.BASE_URL}/details/{self.server}/{start}/{end}/{current_cursor}"
try:
resp = self.client.get(url)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
print(f"[ERROR] API request failed: {e}")
break
except json.JSONDecodeError:
print(f"[ERROR] Invalid JSON response")
break
messages = data.get("messages", [{}])
if not messages:
break
total = int(messages[0].get("total", 0))
papers = data.get("collection", [])
if not papers:
break
all_papers.extend(papers)
current_cursor += len(papers)
if verbose:
print(f" {self.server}: {current_cursor}/{total} preprints")
if current_cursor >= total:
break
return all_papers
def fetch_month(self, year: int, month: int, verbose: bool = True) -> List[Dict]:
"""Fetch all preprints for a calendar month."""
start = f"{year}-{month:02d}-01"
if month == 12:
end = f"{year}-12-31"
else:
last_day = (datetime(year, month + 1, 1) - timedelta(days=1))
end = last_day.strftime("%Y-%m-%d")
if verbose:
print(f"Fetching {self.server} {year}-{month:02d} ({start} to {end})")
return self.fetch_date_range(start, end, verbose=verbose)
def fetch_recent(self, days: int = 7, verbose: bool = True) -> List[Dict]:
"""Fetch preprints from the last N days."""
end = datetime.utcnow().strftime("%Y-%m-%d")
start = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%d")
return self.fetch_date_range(start, end, verbose=verbose)
def get_paper_details(self, doi: str) -> Optional[Dict]:
"""Get all versions of a specific paper by DOI."""
self._rate_limit()
url = f"{self.BASE_URL}/details/{self.server}/{doi}"
try:
resp = self.client.get(url)
resp.raise_for_status()
data = resp.json()
return data.get("collection", [None])[0]
except Exception as e:
print(f"[ERROR] Failed to get {doi}: {e}")
return None
def fetch_published_papers(
self,
start: str,
end: str,
verbose: bool = True,
) -> List[Dict]:
"""Fetch preprints that have been formally published."""
all_papers = []
cursor = 0
while True:
self._rate_limit()
url = f"{self.BASE_URL}/pub/{self.server}/{start}/{end}/{cursor}"
try:
resp = self.client.get(url)
resp.raise_for_status()
data = resp.json()
except Exception as e:
print(f"[ERROR] {e}")
break
messages = data.get("messages", [{}])
total = int((messages[0] if messages else {}).get("total", 0))
papers = data.get("collection", [])
if not papers:
break
all_papers.extend(papers)
cursor += len(papers)
if verbose:
print(f" Published: {cursor}/{total}")
if cursor >= total:
break
return all_papers
Enriching With Web Scraping
The API gives you metadata but not everything. Full author ORCID identifiers, supplementary file links, view/download counts, and inline figures require scraping the preprint HTML pages.
from bs4 import BeautifulSoup
def scrape_preprint_page(
doi: str,
server: str = "biorxiv",
proxy: Optional[str] = None,
) -> Dict:
"""Scrape additional details from a preprint's HTML page."""
url = f"https://www.{server}.org/content/{doi}v1"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
proxies = {"http": proxy, "https": proxy} if proxy else None
try:
resp = httpx.get(
url,
headers=headers,
follow_redirects=True,
timeout=30,
)
resp.raise_for_status()
except httpx.HTTPError as e:
print(f"[ERROR] Failed to fetch {doi}: {e}")
return {"doi": doi, "error": str(e)}
soup = BeautifulSoup(resp.text, "html.parser")
details = {"doi": doi}
# Extract structured author list with affiliations and ORCID
authors_detailed = []
for author_el in soup.select(".highwire-citation-authors .highwire-citation-author"):
given = author_el.select_one(".nlm-given-names")
surname = author_el.select_one(".nlm-surname")
orcid_link = author_el.select_one('a[href*="orcid.org"]')
# Affiliation via data attributes
aff_id = author_el.get("data-aff-id", "")
affil_el = soup.select_one(f"#{aff_id}") if aff_id else None
authors_detailed.append({
"given": given.text.strip() if given else "",
"surname": surname.text.strip() if surname else "",
"orcid": orcid_link["href"].split("/")[-1] if orcid_link else None,
"affiliation": affil_el.get_text(strip=True) if affil_el else "",
})
details["authors_detailed"] = authors_detailed
# Extract view/download metrics
metrics_el = soup.select_one(".article-metrics-block, .highwire-article-metrics")
if metrics_el:
for stat in metrics_el.select("[data-stat]"):
stat_name = stat.get("data-stat")
try:
details[stat_name] = int(stat.text.replace(",", "").strip())
except (ValueError, AttributeError):
pass
# Subject area tags (may differ from API category)
subjects = [s.get_text(strip=True) for s in soup.select(".highwire-article-collection-term")]
details["subjects"] = subjects
# Corresponding author email (sometimes exposed)
corresp = soup.select_one(".corresp")
if corresp:
email_match = re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", corresp.get_text())
if email_match:
details["corresponding_email"] = email_match.group(0)
# Figure count
figures = soup.select(".fig, figure, .highwire-figure")
details["figure_count"] = len(figures)
# Supplementary data links
supp_links = [
a["href"] for a in soup.select("a[href*='supplementary'], a[href*='supp']")
if a.get("href")
]
details["supplementary_links"] = supp_links[:10]
return details
def enrich_papers(
papers: List[Dict],
sample_size: Optional[int] = None,
delay: float = 1.5,
proxy: Optional[str] = None,
) -> List[Dict]:
"""Add web-scraped enrichment data to API paper records."""
if sample_size:
import random
papers = random.sample(papers, min(sample_size, len(papers)))
enriched = []
for i, paper in enumerate(papers):
doi = paper.get("doi", "")
if not doi:
enriched.append(paper)
continue
print(f" [{i+1}/{len(papers)}] {doi}")
web_data = scrape_preprint_page(doi, proxy=proxy)
# Merge API and web data
merged = {**paper, **web_data}
enriched.append(merged)
time.sleep(delay + random.uniform(0, 0.5))
return enriched
Building Author Collaboration Networks
Author co-authorship networks are one of the most revealing structures you can extract from preprint metadata.
try:
import networkx as nx
HAS_NETWORKX = True
except ImportError:
HAS_NETWORKX = False
print("Install networkx for network analysis: pip install networkx")
from collections import defaultdict
def build_author_network(papers: List[Dict]) -> Any:
"""Build a weighted co-authorship network from preprint metadata."""
if not HAS_NETWORKX:
raise ImportError("networkx required: pip install networkx")
G = nx.Graph()
author_stats = defaultdict(lambda: {"papers": 0, "categories": set(), "dois": []})
for paper in papers:
doi = paper.get("doi", "")
category = paper.get("category", "unknown")
# Parse semicolon-separated author string from API
authors_raw = paper.get("authors", "")
authors = [a.strip() for a in authors_raw.split(";") if a.strip()]
# Deduplicate authors (sometimes listed twice in API)
seen = set()
unique_authors = []
for a in authors:
if a not in seen:
seen.add(a)
unique_authors.append(a)
authors = unique_authors
# Update node stats
for author in authors:
if not G.has_node(author):
G.add_node(author)
author_stats[author]["papers"] += 1
author_stats[author]["categories"].add(category)
author_stats[author]["dois"].append(doi)
# Add weighted edges between all co-authors
for i, a1 in enumerate(authors):
for a2 in authors[i + 1:]:
if G.has_edge(a1, a2):
G[a1][a2]["weight"] += 1
G[a1][a2]["papers"].append(doi)
else:
G.add_edge(a1, a2, weight=1, papers=[doi])
# Attach stats to nodes
for author, stats in author_stats.items():
if G.has_node(author):
G.nodes[author]["paper_count"] = stats["papers"]
G.nodes[author]["categories"] = list(stats["categories"])
G.nodes[author]["dois"] = stats["dois"][:20] # Cap for memory
return G
def analyze_network(G: Any) -> Dict:
"""Compute key network statistics."""
if not HAS_NETWORKX:
return {}
stats = {
"nodes": G.number_of_nodes(),
"edges": G.number_of_edges(),
"density": nx.density(G),
"connected_components": nx.number_connected_components(G),
}
# Largest connected component
if G.number_of_nodes() > 0:
largest_cc = max(nx.connected_components(G), key=len)
lcc = G.subgraph(largest_cc)
stats["largest_component_size"] = len(largest_cc)
stats["largest_component_pct"] = round(len(largest_cc) / G.number_of_nodes() * 100, 1)
# Top authors by degree (number of collaborators)
top_by_degree = sorted(
[(n, d) for n, d in G.degree()],
key=lambda x: x[1],
reverse=True,
)[:20]
stats["top_collaborators"] = [
{
"author": name,
"collaborators": degree,
"papers": G.nodes[name].get("paper_count", 0),
"categories": G.nodes[name].get("categories", []),
}
for name, degree in top_by_degree
]
# Top edges by co-authorship weight
top_pairs = sorted(
[(u, v, d["weight"]) for u, v, d in G.edges(data=True)],
key=lambda x: x[2],
reverse=True,
)[:20]
stats["strongest_collaborations"] = [
{"author1": u, "author2": v, "joint_papers": w}
for u, v, w in top_pairs
]
return stats
def find_bridge_researchers(G: Any, min_categories: int = 2) -> List[Dict]:
"""Find researchers who work across multiple research areas."""
if not HAS_NETWORKX:
return []
bridges = []
for node, data in G.nodes(data=True):
categories = data.get("categories", [])
if len(categories) >= min_categories:
bridges.append({
"author": node,
"categories": categories,
"n_categories": len(categories),
"paper_count": data.get("paper_count", 0),
"collaborators": G.degree(node),
})
return sorted(bridges, key=lambda x: (x["n_categories"], x["paper_count"]), reverse=True)
Topic Clustering
The combination of abstracts and category labels makes topic modeling straightforward. Even without fancy ML, TF-IDF + K-Means reveals meaningful research clusters.
def cluster_preprints_by_abstract(
papers: List[Dict],
n_clusters: int = 15,
max_features: int = 5000,
) -> Optional[Dict]:
"""Cluster preprints by abstract content using TF-IDF + K-Means."""
try:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import numpy as np
except ImportError:
print("Install scikit-learn: pip install scikit-learn numpy")
return None
# Filter papers with abstracts
valid = [(p.get("doi", ""), p.get("abstract", "")) for p in papers if len(p.get("abstract", "")) > 100]
if len(valid) < n_clusters:
print(f"[WARN] Not enough papers with abstracts ({len(valid)} < {n_clusters})")
return None
dois, abstracts = zip(*valid)
# TF-IDF vectorization
vectorizer = TfidfVectorizer(
max_features=max_features,
stop_words="english",
ngram_range=(1, 2),
min_df=3,
max_df=0.95,
)
tfidf = vectorizer.fit_transform(abstracts)
# K-Means clustering
km = KMeans(n_clusters=n_clusters, random_state=42, n_init=10, max_iter=300)
labels = km.fit_predict(tfidf)
feature_names = vectorizer.get_feature_names_out()
clusters = {}
for cluster_id in range(n_clusters):
center = km.cluster_centers_[cluster_id]
top_indices = center.argsort()[-15:][::-1]
top_terms = [feature_names[j] for j in top_indices]
cluster_dois = [dois[j] for j, l in enumerate(labels) if l == cluster_id]
# Sample paper titles for the cluster
cluster_papers = [
p for p in papers
if p.get("doi") in set(cluster_dois[:50])
]
sample_titles = [p.get("title", "")[:80] for p in cluster_papers[:5]]
clusters[cluster_id] = {
"top_terms": top_terms,
"size": len(cluster_dois),
"sample_titles": sample_titles,
"dois": cluster_dois[:20],
}
# Assign cluster labels back to papers
doi_to_cluster = {doi: int(label) for doi, label in zip(dois, labels)}
return {
"n_clusters": n_clusters,
"n_papers": len(valid),
"clusters": clusters,
"doi_to_cluster": doi_to_cluster,
}
def find_emerging_topics(
papers_period1: List[Dict],
papers_period2: List[Dict],
top_n: int = 20,
) -> List[Dict]:
"""Find terms that increased in frequency between two time periods."""
try:
from sklearn.feature_extraction.text import CountVectorizer
except ImportError:
return []
def term_frequencies(papers: List[Dict]) -> Dict[str, float]:
abstracts = [p.get("abstract", "") for p in papers if p.get("abstract")]
if not abstracts:
return {}
vec = CountVectorizer(
stop_words="english",
ngram_range=(1, 2),
max_features=10000,
min_df=2,
)
counts = vec.fit_transform(abstracts)
total = counts.sum()
freqs = {}
for term, idx in vec.vocabulary_.items():
freqs[term] = float(counts[:, idx].sum()) / total
return freqs
freq1 = term_frequencies(papers_period1)
freq2 = term_frequencies(papers_period2)
emerging = []
for term in set(freq2.keys()) & set(freq1.keys()):
if freq1[term] > 0:
growth = (freq2[term] - freq1[term]) / freq1[term]
if freq2[term] > 0.0001: # Minimum frequency threshold
emerging.append({
"term": term,
"freq_period1": freq1[term],
"freq_period2": freq2[term],
"growth_pct": round(growth * 100, 1),
})
return sorted(emerging, key=lambda x: x["growth_pct"], reverse=True)[:top_n]
Storage and Database Design
SQLite handles bioRxiv data well at multi-year scale. A typical year of bioRxiv data is ~100MB uncompressed.
def init_database(path: str = "biorxiv.db") -> sqlite3.Connection:
"""Initialize the bioRxiv database."""
conn = sqlite3.connect(path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.executescript("""
CREATE TABLE IF NOT EXISTS preprints (
doi TEXT PRIMARY KEY,
title TEXT,
authors TEXT, -- Semicolon-separated
abstract TEXT,
category TEXT,
server TEXT DEFAULT 'biorxiv',
date_posted TEXT,
date_revised TEXT,
version INTEGER DEFAULT 1,
published_journal TEXT,
published_doi TEXT,
license TEXT,
abstract_views INTEGER DEFAULT 0,
pdf_downloads INTEGER DEFAULT 0,
figure_count INTEGER,
has_supplementary INTEGER DEFAULT 0,
corresponding_email TEXT,
scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS authors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
preprint_doi TEXT,
position INTEGER,
given_name TEXT,
surname TEXT,
orcid TEXT,
affiliation TEXT,
FOREIGN KEY (preprint_doi) REFERENCES preprints(doi)
);
CREATE TABLE IF NOT EXISTS author_stats (
name TEXT PRIMARY KEY,
paper_count INTEGER DEFAULT 0,
categories TEXT, -- JSON array
first_paper TEXT,
last_paper TEXT,
coauthor_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS scrape_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
start_date TEXT,
end_date TEXT,
server TEXT,
papers_fetched INTEGER,
scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_preprints_date ON preprints(date_posted DESC);
CREATE INDEX IF NOT EXISTS idx_preprints_category ON preprints(category);
CREATE INDEX IF NOT EXISTS idx_preprints_journal ON preprints(published_journal);
CREATE INDEX IF NOT EXISTS idx_authors_doi ON authors(preprint_doi);
CREATE INDEX IF NOT EXISTS idx_authors_orcid ON authors(orcid);
""")
conn.commit()
return conn
def save_papers(conn: sqlite3.Connection, papers: List[Dict]) -> int:
"""Save a list of preprints to the database."""
saved = 0
for paper in papers:
doi = paper.get("doi")
if not doi:
continue
try:
conn.execute(
"""INSERT OR REPLACE INTO preprints
(doi, title, authors, abstract, category, server,
date_posted, date_revised, version, published_journal,
published_doi, license, abstract_views, pdf_downloads,
figure_count, scraped_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
doi,
paper.get("title", "")[:500],
paper.get("authors", ""),
paper.get("abstract", ""),
paper.get("category", ""),
paper.get("server", "biorxiv"),
paper.get("date"),
paper.get("date_revised") or paper.get("date"),
int(paper.get("version", 1)),
paper.get("published_journal"),
paper.get("published_doi"),
paper.get("license", ""),
paper.get("abstract_views", 0),
paper.get("pdf_downloads", 0),
paper.get("figure_count"),
datetime.utcnow().isoformat(),
)
)
saved += 1
except Exception as e:
print(f"[ERROR] Save failed for {doi}: {e}")
conn.commit()
return saved
def query_papers_by_category(
conn: sqlite3.Connection,
category: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 1000,
) -> List[Dict]:
"""Query papers by category with optional date filter."""
query = "SELECT doi, title, authors, abstract, date_posted FROM preprints WHERE category = ?"
params = [category]
if start_date:
query += " AND date_posted >= ?"
params.append(start_date)
if end_date:
query += " AND date_posted <= ?"
params.append(end_date)
query += " ORDER BY date_posted DESC LIMIT ?"
params.append(limit)
rows = conn.execute(query, params).fetchall()
return [
{"doi": r[0], "title": r[1], "authors": r[2], "abstract": r[3], "date_posted": r[4]}
for r in rows
]
ThorData Proxy Integration
bioRxiv is a nonprofit running on grant money — they don't have heavy bot detection. But if you're processing thousands of preprint HTML pages for enrichment data (view counts, author ORCIDs, figure counts), rotating proxies help distribute the load and avoid triggering any IP-based limits.
ThorData's residential proxies work well here. Unlike datacenter proxies that get flagged even on open-access sites, residential IPs blend in naturally.
class ThorDataProxyPool:
"""ThorData residential proxy pool for bioRxiv scraping."""
def __init__(self, username: str, password: str):
self.username = username
self.password = password
self.host = "gate.thordata.com"
self.port = 9000
def get_proxy(self, country: str = "US") -> str:
return f"http://{self.username}-country-{country}:{self.password}@{self.host}:{self.port}"
def get_rotating(self) -> str:
"""Fresh IP each call."""
return self.get_proxy()
def enrich_with_proxy(
papers: List[Dict],
proxy_pool: ThorDataProxyPool,
batch_size: int = 50,
delay: float = 1.5,
) -> List[Dict]:
"""Enrich papers with web data using proxy rotation."""
enriched = []
for i, paper in enumerate(papers):
doi = paper.get("doi", "")
if not doi:
enriched.append(paper)
continue
# Rotate proxy every batch
proxy = proxy_pool.get_rotating() if i % batch_size == 0 else None
web_data = scrape_preprint_page(doi, proxy=proxy)
enriched.append({**paper, **web_data})
if (i + 1) % 10 == 0:
print(f" Enriched {i + 1}/{len(papers)} papers")
time.sleep(delay + random.uniform(0, 0.5))
return enriched
Complete Production Pipeline
import random
def run_biorxiv_pipeline(
start_date: str,
end_date: str,
db_path: str = "biorxiv.db",
server: str = "biorxiv",
enrich_sample: Optional[int] = None,
proxy_pool: Optional[ThorDataProxyPool] = None,
build_network: bool = True,
cluster_topics: bool = True,
) -> Dict:
"""Complete bioRxiv data collection and analysis pipeline."""
conn = init_database(db_path)
api = BioRxivAPI(delay=1.0, server=server)
results = {}
# Step 1: Fetch paper metadata via API
print(f"\n[STEP 1] Fetching {server} papers {start_date} to {end_date}")
papers = api.fetch_date_range(start_date, end_date)
print(f" Retrieved {len(papers)} papers")
# Step 2: Save to database
print("\n[STEP 2] Saving to database")
saved = save_papers(conn, papers)
print(f" Saved {saved} papers")
# Step 3: Enrich sample with web data
if enrich_sample and enrich_sample > 0:
print(f"\n[STEP 3] Enriching {enrich_sample} papers with web data")
sample = random.sample(papers, min(enrich_sample, len(papers)))
proxy = proxy_pool.get_rotating() if proxy_pool else None
enriched = enrich_papers(sample, delay=1.5, proxy=proxy)
save_papers(conn, enriched)
print(f" Enriched {len(enriched)} papers")
# Step 4: Build author network
if build_network and HAS_NETWORKX:
print("\n[STEP 4] Building author collaboration network")
network = build_author_network(papers)
network_stats = analyze_network(network)
results["network"] = network_stats
print(f" Network: {network_stats['nodes']:,} authors, {network_stats['edges']:,} collaborations")
bridge_researchers = find_bridge_researchers(network)
results["bridge_researchers"] = bridge_researchers[:20]
print(f" Bridge researchers: {len(bridge_researchers)} found")
# Step 5: Topic clustering
if cluster_topics:
print("\n[STEP 5] Clustering research topics")
clustering = cluster_preprints_by_abstract(papers, n_clusters=20)
if clustering:
results["clustering"] = clustering
print(f" Created {clustering['n_clusters']} topic clusters from {clustering['n_papers']} papers")
for cid, cluster in sorted(clustering["clusters"].items(), key=lambda x: x[1]["size"], reverse=True)[:5]:
print(f" Cluster {cid} ({cluster['size']} papers): {', '.join(cluster['top_terms'][:5])}")
conn.execute(
"INSERT INTO scrape_log (start_date, end_date, server, papers_fetched) VALUES (?,?,?,?)",
(start_date, end_date, server, len(papers))
)
conn.commit()
conn.close()
results["papers_fetched"] = len(papers)
results["papers_saved"] = saved
return results
# Example: Fetch September 2026 neuroscience papers
if __name__ == "__main__":
# pool = ThorDataProxyPool("YOUR_USER", "YOUR_PASS")
results = run_biorxiv_pipeline(
start_date="2026-09-01",
end_date="2026-09-30",
server="biorxiv",
enrich_sample=100,
# proxy_pool=pool,
build_network=True,
cluster_topics=True,
)
print(f"\nFinal results summary:")
print(f" Papers: {results['papers_fetched']}")
if "network" in results:
net = results["network"]
print(f" Network: {net['nodes']:,} authors, {net['edges']:,} edges")
print(f" Top collaborator: {net['top_collaborators'][0]['author']}")
Rate Limiting and Being Respectful
Keep API calls to 1/second and HTML scrapes to one every 2 seconds. bioRxiv is a nonprofit running on grant money — don't be the reason they add Cloudflare to a scientific preprint server.
The API has no official rate limit documentation, but from testing, anything under 60 requests per minute is fine. Daily pipelines that grab the previous day's preprints finish in under a minute — it's only historical backfills (months or years of data) where pacing matters.
For the web scraping component — if you need to process thousands of preprint pages for enrichment data, spread it over time and use ThorData rotating residential proxies. The goal is to be invisible in their server logs, not to test their infrastructure limits.
A sustainable scraping schedule: run the API pipeline nightly to capture new papers (takes 2-5 minutes), and schedule enrichment scraping to run over several hours at low concurrency. Cache everything permanently — bioRxiv paper metadata doesn't change after initial posting (only view counts and publication status update).