← Back to blog

Scraping bioRxiv Preprints: Author Networks and Topic Clusters (2026)

Scraping bioRxiv Preprints: Author Networks and Topic Clusters (2026)

bioRxiv is one of the best targets for scraping scientific data. Unlike most academic publishers that lock everything behind paywalls and aggressive bot detection, bioRxiv actually wants you to access their content. They run a public API, serve clean HTML, and their robots.txt is surprisingly permissive. If you're doing bibliometrics, tracking research trends, or building datasets for ML training — this is where you start.

The interesting part isn't just grabbing abstracts. It's what you can build from the metadata: author collaboration graphs, institutional networks, topic clustering over time. That's where the real value sits.

What bioRxiv Exposes

bioRxiv's content API lives at api.biorxiv.org and returns JSON. No authentication needed. You can query by date range, server (biorxiv or medrxiv), and get back structured metadata for every preprint.

Each record includes:

The API returns up to 100 results per call with cursor-based pagination. For bulk historical data you'll want to hit the content endpoint day by day or month by month.

Why Scrape bioRxiv?

Beyond the obvious "free science data" angle, there are high-value applications:

The API Approach

Start with the official API. It's rate-limited but generous — sustained queries at one request per second work fine without issues.

import httpx
import time
import json
import sqlite3
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from pathlib import Path


class BioRxivAPI:
    """Client for the bioRxiv/medRxiv content API."""

    BASE_URL = "https://api.biorxiv.org"

    def __init__(self, delay: float = 1.0, server: str = "biorxiv"):
        self.client = httpx.Client(timeout=30)
        self.delay = delay
        self.server = server  # 'biorxiv' or 'medrxiv'
        self._last_request = 0.0

    def _rate_limit(self):
        elapsed = time.time() - self._last_request
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)
        self._last_request = time.time()

    def fetch_date_range(
        self,
        start: str,
        end: str,
        cursor: int = 0,
        verbose: bool = True,
    ) -> List[Dict]:
        """Fetch all preprints in a date range.

        start, end: 'YYYY-MM-DD' format
        """
        all_papers = []
        current_cursor = cursor

        while True:
            self._rate_limit()
            url = f"{self.BASE_URL}/details/{self.server}/{start}/{end}/{current_cursor}"

            try:
                resp = self.client.get(url)
                resp.raise_for_status()
                data = resp.json()
            except httpx.HTTPError as e:
                print(f"[ERROR] API request failed: {e}")
                break
            except json.JSONDecodeError:
                print(f"[ERROR] Invalid JSON response")
                break

            messages = data.get("messages", [{}])
            if not messages:
                break

            total = int(messages[0].get("total", 0))
            papers = data.get("collection", [])

            if not papers:
                break

            all_papers.extend(papers)
            current_cursor += len(papers)

            if verbose:
                print(f"  {self.server}: {current_cursor}/{total} preprints")

            if current_cursor >= total:
                break

        return all_papers

    def fetch_month(self, year: int, month: int, verbose: bool = True) -> List[Dict]:
        """Fetch all preprints for a calendar month."""
        start = f"{year}-{month:02d}-01"
        if month == 12:
            end = f"{year}-12-31"
        else:
            last_day = (datetime(year, month + 1, 1) - timedelta(days=1))
            end = last_day.strftime("%Y-%m-%d")

        if verbose:
            print(f"Fetching {self.server} {year}-{month:02d} ({start} to {end})")

        return self.fetch_date_range(start, end, verbose=verbose)

    def fetch_recent(self, days: int = 7, verbose: bool = True) -> List[Dict]:
        """Fetch preprints from the last N days."""
        end = datetime.utcnow().strftime("%Y-%m-%d")
        start = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%d")
        return self.fetch_date_range(start, end, verbose=verbose)

    def get_paper_details(self, doi: str) -> Optional[Dict]:
        """Get all versions of a specific paper by DOI."""
        self._rate_limit()
        url = f"{self.BASE_URL}/details/{self.server}/{doi}"

        try:
            resp = self.client.get(url)
            resp.raise_for_status()
            data = resp.json()
            return data.get("collection", [None])[0]
        except Exception as e:
            print(f"[ERROR] Failed to get {doi}: {e}")
            return None

    def fetch_published_papers(
        self,
        start: str,
        end: str,
        verbose: bool = True,
    ) -> List[Dict]:
        """Fetch preprints that have been formally published."""
        all_papers = []
        cursor = 0

        while True:
            self._rate_limit()
            url = f"{self.BASE_URL}/pub/{self.server}/{start}/{end}/{cursor}"

            try:
                resp = self.client.get(url)
                resp.raise_for_status()
                data = resp.json()
            except Exception as e:
                print(f"[ERROR] {e}")
                break

            messages = data.get("messages", [{}])
            total = int((messages[0] if messages else {}).get("total", 0))
            papers = data.get("collection", [])

            if not papers:
                break

            all_papers.extend(papers)
            cursor += len(papers)

            if verbose:
                print(f"  Published: {cursor}/{total}")

            if cursor >= total:
                break

        return all_papers

Enriching With Web Scraping

The API gives you metadata but not everything. Full author ORCID identifiers, supplementary file links, view/download counts, and inline figures require scraping the preprint HTML pages.

from bs4 import BeautifulSoup


def scrape_preprint_page(
    doi: str,
    server: str = "biorxiv",
    proxy: Optional[str] = None,
) -> Dict:
    """Scrape additional details from a preprint's HTML page."""
    url = f"https://www.{server}.org/content/{doi}v1"
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.5",
    }
    proxies = {"http": proxy, "https": proxy} if proxy else None

    try:
        resp = httpx.get(
            url,
            headers=headers,
            follow_redirects=True,
            timeout=30,
        )
        resp.raise_for_status()
    except httpx.HTTPError as e:
        print(f"[ERROR] Failed to fetch {doi}: {e}")
        return {"doi": doi, "error": str(e)}

    soup = BeautifulSoup(resp.text, "html.parser")
    details = {"doi": doi}

    # Extract structured author list with affiliations and ORCID
    authors_detailed = []
    for author_el in soup.select(".highwire-citation-authors .highwire-citation-author"):
        given = author_el.select_one(".nlm-given-names")
        surname = author_el.select_one(".nlm-surname")
        orcid_link = author_el.select_one('a[href*="orcid.org"]')

        # Affiliation via data attributes
        aff_id = author_el.get("data-aff-id", "")
        affil_el = soup.select_one(f"#{aff_id}") if aff_id else None

        authors_detailed.append({
            "given": given.text.strip() if given else "",
            "surname": surname.text.strip() if surname else "",
            "orcid": orcid_link["href"].split("/")[-1] if orcid_link else None,
            "affiliation": affil_el.get_text(strip=True) if affil_el else "",
        })
    details["authors_detailed"] = authors_detailed

    # Extract view/download metrics
    metrics_el = soup.select_one(".article-metrics-block, .highwire-article-metrics")
    if metrics_el:
        for stat in metrics_el.select("[data-stat]"):
            stat_name = stat.get("data-stat")
            try:
                details[stat_name] = int(stat.text.replace(",", "").strip())
            except (ValueError, AttributeError):
                pass

    # Subject area tags (may differ from API category)
    subjects = [s.get_text(strip=True) for s in soup.select(".highwire-article-collection-term")]
    details["subjects"] = subjects

    # Corresponding author email (sometimes exposed)
    corresp = soup.select_one(".corresp")
    if corresp:
        email_match = re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", corresp.get_text())
        if email_match:
            details["corresponding_email"] = email_match.group(0)

    # Figure count
    figures = soup.select(".fig, figure, .highwire-figure")
    details["figure_count"] = len(figures)

    # Supplementary data links
    supp_links = [
        a["href"] for a in soup.select("a[href*='supplementary'], a[href*='supp']")
        if a.get("href")
    ]
    details["supplementary_links"] = supp_links[:10]

    return details


def enrich_papers(
    papers: List[Dict],
    sample_size: Optional[int] = None,
    delay: float = 1.5,
    proxy: Optional[str] = None,
) -> List[Dict]:
    """Add web-scraped enrichment data to API paper records."""
    if sample_size:
        import random
        papers = random.sample(papers, min(sample_size, len(papers)))

    enriched = []
    for i, paper in enumerate(papers):
        doi = paper.get("doi", "")
        if not doi:
            enriched.append(paper)
            continue

        print(f"  [{i+1}/{len(papers)}] {doi}")
        web_data = scrape_preprint_page(doi, proxy=proxy)

        # Merge API and web data
        merged = {**paper, **web_data}
        enriched.append(merged)

        time.sleep(delay + random.uniform(0, 0.5))

    return enriched

Building Author Collaboration Networks

Author co-authorship networks are one of the most revealing structures you can extract from preprint metadata.

try:
    import networkx as nx
    HAS_NETWORKX = True
except ImportError:
    HAS_NETWORKX = False
    print("Install networkx for network analysis: pip install networkx")

from collections import defaultdict


def build_author_network(papers: List[Dict]) -> Any:
    """Build a weighted co-authorship network from preprint metadata."""
    if not HAS_NETWORKX:
        raise ImportError("networkx required: pip install networkx")

    G = nx.Graph()
    author_stats = defaultdict(lambda: {"papers": 0, "categories": set(), "dois": []})

    for paper in papers:
        doi = paper.get("doi", "")
        category = paper.get("category", "unknown")

        # Parse semicolon-separated author string from API
        authors_raw = paper.get("authors", "")
        authors = [a.strip() for a in authors_raw.split(";") if a.strip()]

        # Deduplicate authors (sometimes listed twice in API)
        seen = set()
        unique_authors = []
        for a in authors:
            if a not in seen:
                seen.add(a)
                unique_authors.append(a)
        authors = unique_authors

        # Update node stats
        for author in authors:
            if not G.has_node(author):
                G.add_node(author)

            author_stats[author]["papers"] += 1
            author_stats[author]["categories"].add(category)
            author_stats[author]["dois"].append(doi)

        # Add weighted edges between all co-authors
        for i, a1 in enumerate(authors):
            for a2 in authors[i + 1:]:
                if G.has_edge(a1, a2):
                    G[a1][a2]["weight"] += 1
                    G[a1][a2]["papers"].append(doi)
                else:
                    G.add_edge(a1, a2, weight=1, papers=[doi])

    # Attach stats to nodes
    for author, stats in author_stats.items():
        if G.has_node(author):
            G.nodes[author]["paper_count"] = stats["papers"]
            G.nodes[author]["categories"] = list(stats["categories"])
            G.nodes[author]["dois"] = stats["dois"][:20]  # Cap for memory

    return G


def analyze_network(G: Any) -> Dict:
    """Compute key network statistics."""
    if not HAS_NETWORKX:
        return {}

    stats = {
        "nodes": G.number_of_nodes(),
        "edges": G.number_of_edges(),
        "density": nx.density(G),
        "connected_components": nx.number_connected_components(G),
    }

    # Largest connected component
    if G.number_of_nodes() > 0:
        largest_cc = max(nx.connected_components(G), key=len)
        lcc = G.subgraph(largest_cc)
        stats["largest_component_size"] = len(largest_cc)
        stats["largest_component_pct"] = round(len(largest_cc) / G.number_of_nodes() * 100, 1)

    # Top authors by degree (number of collaborators)
    top_by_degree = sorted(
        [(n, d) for n, d in G.degree()],
        key=lambda x: x[1],
        reverse=True,
    )[:20]
    stats["top_collaborators"] = [
        {
            "author": name,
            "collaborators": degree,
            "papers": G.nodes[name].get("paper_count", 0),
            "categories": G.nodes[name].get("categories", []),
        }
        for name, degree in top_by_degree
    ]

    # Top edges by co-authorship weight
    top_pairs = sorted(
        [(u, v, d["weight"]) for u, v, d in G.edges(data=True)],
        key=lambda x: x[2],
        reverse=True,
    )[:20]
    stats["strongest_collaborations"] = [
        {"author1": u, "author2": v, "joint_papers": w}
        for u, v, w in top_pairs
    ]

    return stats


def find_bridge_researchers(G: Any, min_categories: int = 2) -> List[Dict]:
    """Find researchers who work across multiple research areas."""
    if not HAS_NETWORKX:
        return []

    bridges = []
    for node, data in G.nodes(data=True):
        categories = data.get("categories", [])
        if len(categories) >= min_categories:
            bridges.append({
                "author": node,
                "categories": categories,
                "n_categories": len(categories),
                "paper_count": data.get("paper_count", 0),
                "collaborators": G.degree(node),
            })

    return sorted(bridges, key=lambda x: (x["n_categories"], x["paper_count"]), reverse=True)

Topic Clustering

The combination of abstracts and category labels makes topic modeling straightforward. Even without fancy ML, TF-IDF + K-Means reveals meaningful research clusters.

def cluster_preprints_by_abstract(
    papers: List[Dict],
    n_clusters: int = 15,
    max_features: int = 5000,
) -> Optional[Dict]:
    """Cluster preprints by abstract content using TF-IDF + K-Means."""
    try:
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.cluster import KMeans
        import numpy as np
    except ImportError:
        print("Install scikit-learn: pip install scikit-learn numpy")
        return None

    # Filter papers with abstracts
    valid = [(p.get("doi", ""), p.get("abstract", "")) for p in papers if len(p.get("abstract", "")) > 100]
    if len(valid) < n_clusters:
        print(f"[WARN] Not enough papers with abstracts ({len(valid)} < {n_clusters})")
        return None

    dois, abstracts = zip(*valid)

    # TF-IDF vectorization
    vectorizer = TfidfVectorizer(
        max_features=max_features,
        stop_words="english",
        ngram_range=(1, 2),
        min_df=3,
        max_df=0.95,
    )
    tfidf = vectorizer.fit_transform(abstracts)

    # K-Means clustering
    km = KMeans(n_clusters=n_clusters, random_state=42, n_init=10, max_iter=300)
    labels = km.fit_predict(tfidf)

    feature_names = vectorizer.get_feature_names_out()
    clusters = {}

    for cluster_id in range(n_clusters):
        center = km.cluster_centers_[cluster_id]
        top_indices = center.argsort()[-15:][::-1]
        top_terms = [feature_names[j] for j in top_indices]

        cluster_dois = [dois[j] for j, l in enumerate(labels) if l == cluster_id]

        # Sample paper titles for the cluster
        cluster_papers = [
            p for p in papers
            if p.get("doi") in set(cluster_dois[:50])
        ]
        sample_titles = [p.get("title", "")[:80] for p in cluster_papers[:5]]

        clusters[cluster_id] = {
            "top_terms": top_terms,
            "size": len(cluster_dois),
            "sample_titles": sample_titles,
            "dois": cluster_dois[:20],
        }

    # Assign cluster labels back to papers
    doi_to_cluster = {doi: int(label) for doi, label in zip(dois, labels)}

    return {
        "n_clusters": n_clusters,
        "n_papers": len(valid),
        "clusters": clusters,
        "doi_to_cluster": doi_to_cluster,
    }


def find_emerging_topics(
    papers_period1: List[Dict],
    papers_period2: List[Dict],
    top_n: int = 20,
) -> List[Dict]:
    """Find terms that increased in frequency between two time periods."""
    try:
        from sklearn.feature_extraction.text import CountVectorizer
    except ImportError:
        return []

    def term_frequencies(papers: List[Dict]) -> Dict[str, float]:
        abstracts = [p.get("abstract", "") for p in papers if p.get("abstract")]
        if not abstracts:
            return {}
        vec = CountVectorizer(
            stop_words="english",
            ngram_range=(1, 2),
            max_features=10000,
            min_df=2,
        )
        counts = vec.fit_transform(abstracts)
        total = counts.sum()
        freqs = {}
        for term, idx in vec.vocabulary_.items():
            freqs[term] = float(counts[:, idx].sum()) / total
        return freqs

    freq1 = term_frequencies(papers_period1)
    freq2 = term_frequencies(papers_period2)

    emerging = []
    for term in set(freq2.keys()) & set(freq1.keys()):
        if freq1[term] > 0:
            growth = (freq2[term] - freq1[term]) / freq1[term]
            if freq2[term] > 0.0001:  # Minimum frequency threshold
                emerging.append({
                    "term": term,
                    "freq_period1": freq1[term],
                    "freq_period2": freq2[term],
                    "growth_pct": round(growth * 100, 1),
                })

    return sorted(emerging, key=lambda x: x["growth_pct"], reverse=True)[:top_n]

Storage and Database Design

SQLite handles bioRxiv data well at multi-year scale. A typical year of bioRxiv data is ~100MB uncompressed.

def init_database(path: str = "biorxiv.db") -> sqlite3.Connection:
    """Initialize the bioRxiv database."""
    conn = sqlite3.connect(path)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")

    conn.executescript("""
        CREATE TABLE IF NOT EXISTS preprints (
            doi TEXT PRIMARY KEY,
            title TEXT,
            authors TEXT,          -- Semicolon-separated
            abstract TEXT,
            category TEXT,
            server TEXT DEFAULT 'biorxiv',
            date_posted TEXT,
            date_revised TEXT,
            version INTEGER DEFAULT 1,
            published_journal TEXT,
            published_doi TEXT,
            license TEXT,
            abstract_views INTEGER DEFAULT 0,
            pdf_downloads INTEGER DEFAULT 0,
            figure_count INTEGER,
            has_supplementary INTEGER DEFAULT 0,
            corresponding_email TEXT,
            scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS authors (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            preprint_doi TEXT,
            position INTEGER,
            given_name TEXT,
            surname TEXT,
            orcid TEXT,
            affiliation TEXT,
            FOREIGN KEY (preprint_doi) REFERENCES preprints(doi)
        );

        CREATE TABLE IF NOT EXISTS author_stats (
            name TEXT PRIMARY KEY,
            paper_count INTEGER DEFAULT 0,
            categories TEXT,       -- JSON array
            first_paper TEXT,
            last_paper TEXT,
            coauthor_count INTEGER DEFAULT 0
        );

        CREATE TABLE IF NOT EXISTS scrape_log (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            start_date TEXT,
            end_date TEXT,
            server TEXT,
            papers_fetched INTEGER,
            scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_preprints_date ON preprints(date_posted DESC);
        CREATE INDEX IF NOT EXISTS idx_preprints_category ON preprints(category);
        CREATE INDEX IF NOT EXISTS idx_preprints_journal ON preprints(published_journal);
        CREATE INDEX IF NOT EXISTS idx_authors_doi ON authors(preprint_doi);
        CREATE INDEX IF NOT EXISTS idx_authors_orcid ON authors(orcid);
    """)

    conn.commit()
    return conn


def save_papers(conn: sqlite3.Connection, papers: List[Dict]) -> int:
    """Save a list of preprints to the database."""
    saved = 0

    for paper in papers:
        doi = paper.get("doi")
        if not doi:
            continue

        try:
            conn.execute(
                """INSERT OR REPLACE INTO preprints
                   (doi, title, authors, abstract, category, server,
                    date_posted, date_revised, version, published_journal,
                    published_doi, license, abstract_views, pdf_downloads,
                    figure_count, scraped_at)
                   VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
                (
                    doi,
                    paper.get("title", "")[:500],
                    paper.get("authors", ""),
                    paper.get("abstract", ""),
                    paper.get("category", ""),
                    paper.get("server", "biorxiv"),
                    paper.get("date"),
                    paper.get("date_revised") or paper.get("date"),
                    int(paper.get("version", 1)),
                    paper.get("published_journal"),
                    paper.get("published_doi"),
                    paper.get("license", ""),
                    paper.get("abstract_views", 0),
                    paper.get("pdf_downloads", 0),
                    paper.get("figure_count"),
                    datetime.utcnow().isoformat(),
                )
            )
            saved += 1
        except Exception as e:
            print(f"[ERROR] Save failed for {doi}: {e}")

    conn.commit()
    return saved


def query_papers_by_category(
    conn: sqlite3.Connection,
    category: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    limit: int = 1000,
) -> List[Dict]:
    """Query papers by category with optional date filter."""
    query = "SELECT doi, title, authors, abstract, date_posted FROM preprints WHERE category = ?"
    params = [category]

    if start_date:
        query += " AND date_posted >= ?"
        params.append(start_date)
    if end_date:
        query += " AND date_posted <= ?"
        params.append(end_date)

    query += " ORDER BY date_posted DESC LIMIT ?"
    params.append(limit)

    rows = conn.execute(query, params).fetchall()
    return [
        {"doi": r[0], "title": r[1], "authors": r[2], "abstract": r[3], "date_posted": r[4]}
        for r in rows
    ]

ThorData Proxy Integration

bioRxiv is a nonprofit running on grant money — they don't have heavy bot detection. But if you're processing thousands of preprint HTML pages for enrichment data (view counts, author ORCIDs, figure counts), rotating proxies help distribute the load and avoid triggering any IP-based limits.

ThorData's residential proxies work well here. Unlike datacenter proxies that get flagged even on open-access sites, residential IPs blend in naturally.

class ThorDataProxyPool:
    """ThorData residential proxy pool for bioRxiv scraping."""

    def __init__(self, username: str, password: str):
        self.username = username
        self.password = password
        self.host = "gate.thordata.com"
        self.port = 9000

    def get_proxy(self, country: str = "US") -> str:
        return f"http://{self.username}-country-{country}:{self.password}@{self.host}:{self.port}"

    def get_rotating(self) -> str:
        """Fresh IP each call."""
        return self.get_proxy()


def enrich_with_proxy(
    papers: List[Dict],
    proxy_pool: ThorDataProxyPool,
    batch_size: int = 50,
    delay: float = 1.5,
) -> List[Dict]:
    """Enrich papers with web data using proxy rotation."""
    enriched = []

    for i, paper in enumerate(papers):
        doi = paper.get("doi", "")
        if not doi:
            enriched.append(paper)
            continue

        # Rotate proxy every batch
        proxy = proxy_pool.get_rotating() if i % batch_size == 0 else None

        web_data = scrape_preprint_page(doi, proxy=proxy)
        enriched.append({**paper, **web_data})

        if (i + 1) % 10 == 0:
            print(f"  Enriched {i + 1}/{len(papers)} papers")

        time.sleep(delay + random.uniform(0, 0.5))

    return enriched

Complete Production Pipeline

import random

def run_biorxiv_pipeline(
    start_date: str,
    end_date: str,
    db_path: str = "biorxiv.db",
    server: str = "biorxiv",
    enrich_sample: Optional[int] = None,
    proxy_pool: Optional[ThorDataProxyPool] = None,
    build_network: bool = True,
    cluster_topics: bool = True,
) -> Dict:
    """Complete bioRxiv data collection and analysis pipeline."""
    conn = init_database(db_path)
    api = BioRxivAPI(delay=1.0, server=server)
    results = {}

    # Step 1: Fetch paper metadata via API
    print(f"\n[STEP 1] Fetching {server} papers {start_date} to {end_date}")
    papers = api.fetch_date_range(start_date, end_date)
    print(f"  Retrieved {len(papers)} papers")

    # Step 2: Save to database
    print("\n[STEP 2] Saving to database")
    saved = save_papers(conn, papers)
    print(f"  Saved {saved} papers")

    # Step 3: Enrich sample with web data
    if enrich_sample and enrich_sample > 0:
        print(f"\n[STEP 3] Enriching {enrich_sample} papers with web data")
        sample = random.sample(papers, min(enrich_sample, len(papers)))
        proxy = proxy_pool.get_rotating() if proxy_pool else None
        enriched = enrich_papers(sample, delay=1.5, proxy=proxy)
        save_papers(conn, enriched)
        print(f"  Enriched {len(enriched)} papers")

    # Step 4: Build author network
    if build_network and HAS_NETWORKX:
        print("\n[STEP 4] Building author collaboration network")
        network = build_author_network(papers)
        network_stats = analyze_network(network)
        results["network"] = network_stats
        print(f"  Network: {network_stats['nodes']:,} authors, {network_stats['edges']:,} collaborations")

        bridge_researchers = find_bridge_researchers(network)
        results["bridge_researchers"] = bridge_researchers[:20]
        print(f"  Bridge researchers: {len(bridge_researchers)} found")

    # Step 5: Topic clustering
    if cluster_topics:
        print("\n[STEP 5] Clustering research topics")
        clustering = cluster_preprints_by_abstract(papers, n_clusters=20)
        if clustering:
            results["clustering"] = clustering
            print(f"  Created {clustering['n_clusters']} topic clusters from {clustering['n_papers']} papers")
            for cid, cluster in sorted(clustering["clusters"].items(), key=lambda x: x[1]["size"], reverse=True)[:5]:
                print(f"  Cluster {cid} ({cluster['size']} papers): {', '.join(cluster['top_terms'][:5])}")

    conn.execute(
        "INSERT INTO scrape_log (start_date, end_date, server, papers_fetched) VALUES (?,?,?,?)",
        (start_date, end_date, server, len(papers))
    )
    conn.commit()
    conn.close()

    results["papers_fetched"] = len(papers)
    results["papers_saved"] = saved
    return results


# Example: Fetch September 2026 neuroscience papers
if __name__ == "__main__":
    # pool = ThorDataProxyPool("YOUR_USER", "YOUR_PASS")
    results = run_biorxiv_pipeline(
        start_date="2026-09-01",
        end_date="2026-09-30",
        server="biorxiv",
        enrich_sample=100,
        # proxy_pool=pool,
        build_network=True,
        cluster_topics=True,
    )
    print(f"\nFinal results summary:")
    print(f"  Papers: {results['papers_fetched']}")
    if "network" in results:
        net = results["network"]
        print(f"  Network: {net['nodes']:,} authors, {net['edges']:,} edges")
        print(f"  Top collaborator: {net['top_collaborators'][0]['author']}")

Rate Limiting and Being Respectful

Keep API calls to 1/second and HTML scrapes to one every 2 seconds. bioRxiv is a nonprofit running on grant money — don't be the reason they add Cloudflare to a scientific preprint server.

The API has no official rate limit documentation, but from testing, anything under 60 requests per minute is fine. Daily pipelines that grab the previous day's preprints finish in under a minute — it's only historical backfills (months or years of data) where pacing matters.

For the web scraping component — if you need to process thousands of preprint pages for enrichment data, spread it over time and use ThorData rotating residential proxies. The goal is to be invisible in their server logs, not to test their infrastructure limits.

A sustainable scraping schedule: run the API pipeline nightly to capture new papers (takes 2-5 minutes), and schedule enrichment scraping to run over several hours at low concurrency. Cache everything permanently — bioRxiv paper metadata doesn't change after initial posting (only view counts and publication status update).