← Back to blog

Scraping npm Security Advisories: CVE Tracking & Vulnerability Data with Python (2026)

Scraping npm Security Advisories: CVE Tracking & Vulnerability Data with Python (2026)

Supply chain attacks hit npm hard in recent years. Malicious packages, compromised maintainer accounts, and quietly-introduced vulnerabilities have burned teams who assumed "installed from npm" meant "safe." Manual checking doesn't scale: the average Node.js project has hundreds of transitive dependencies, each a potential attack surface.

Automated tracking — pulling advisory data into a local database you can query, diff, and alert on — is the only way to stay on top of this at any reasonable scale. This guide covers three complementary data sources and shows how to combine them into a practical vulnerability monitoring pipeline.

Data Sources Overview

Three sources together cover npm vulnerabilities comprehensively:

Each source has different strengths. npm's API is the most direct for per-project auditing. GHSA has the best CVE metadata and is maintained by people who care about accuracy. OSS Index excels at batch lookups when you have a large manifest to check.

Querying the npm Audit Endpoint

The npm audit API accepts a POST body that mimics a package-lock.json structure. You can construct minimal payloads for targeted package lookups without a full project context.

import requests
import json
import time

NPM_AUDIT_URL = "https://registry.npmjs.org/-/npm/v1/security/audits"

def query_npm_audit(package_name: str, version: str) -> dict:
    """Query the npm audit API for a specific package version."""
    payload = {
        "name": "audit-check",
        "version": "1.0.0",
        "requires": {package_name: version},
        "dependencies": {
            package_name: {
                "version": version,
                "integrity": "",
                "requires": {}
            }
        }
    }

    headers = {
        "Content-Type": "application/json",
        "npm-command": "audit",
    }

    resp = requests.post(
        NPM_AUDIT_URL,
        headers=headers,
        data=json.dumps(payload),
        timeout=15
    )
    resp.raise_for_status()
    return resp.json()


def query_npm_audit_bulk(packages: dict) -> dict:
    """
    Query the npm audit API for multiple packages at once.
    packages: dict of {name: version}
    """
    deps = {
        name: {"version": ver, "integrity": "", "requires": {}}
        for name, ver in packages.items()
    }

    payload = {
        "name": "bulk-audit",
        "version": "1.0.0",
        "requires": packages,
        "dependencies": deps,
    }

    headers = {
        "Content-Type": "application/json",
        "npm-command": "audit",
    }

    resp = requests.post(
        NPM_AUDIT_URL,
        headers=headers,
        data=json.dumps(payload),
        timeout=30
    )
    resp.raise_for_status()
    return resp.json()


def extract_advisories(audit_result: dict) -> list[dict]:
    """Parse advisory objects from an npm audit response."""
    advisories = []
    for adv_id, adv in audit_result.get("advisories", {}).items():
        advisories.append({
            "id": adv_id,
            "module_name": adv.get("module_name"),
            "severity": adv.get("severity"),
            "title": adv.get("title"),
            "cves": adv.get("cves", []),
            "cwe": adv.get("cwe"),
            "vulnerable_versions": adv.get("vulnerable_versions"),
            "patched_versions": adv.get("patched_versions"),
            "recommendation": adv.get("recommendation"),
            "references": adv.get("references"),
            "url": adv.get("url"),
            "updated_at": adv.get("updated"),
        })
    return sorted(advisories, key=lambda x: (
        {"critical": 0, "high": 1, "moderate": 2, "low": 3}.get(x["severity"], 4)
    ))


if __name__ == "__main__":
    # Single package check
    result = query_npm_audit("lodash", "4.17.15")
    for adv in extract_advisories(result):
        print(f"[{adv['severity'].upper()}] {adv['title']}")
        print(f"  CVEs: {', '.join(adv['cves']) or 'none'}")
        print(f"  Affected: {adv['vulnerable_versions']}")
        print(f"  Fix: {adv['patched_versions']}")
        print()

Querying the npm Security Advisory REST API

Separately from the audit endpoint, npm exposes a security advisories REST API that lets you query advisories directly by ID or package name:

NPM_SECURITY_API = "https://registry.npmjs.org/-/npm/v1/security/advisories"

def get_advisory_by_id(advisory_id: str) -> dict:
    """Fetch a single npm security advisory by its numeric ID."""
    url = f"{NPM_SECURITY_API}/{advisory_id}"
    resp = requests.get(url, timeout=15)
    resp.raise_for_status()
    return resp.json()


def search_advisories_for_package(package_name: str) -> list[dict]:
    """Search all published advisories affecting a package."""
    url = f"{NPM_SECURITY_API}/search"
    params = {"package": package_name}
    resp = requests.get(url, params=params, timeout=15)
    resp.raise_for_status()

    data = resp.json()
    return data.get("objects", [])


def paginate_all_advisories(page_size: int = 100, max_pages: int = 50) -> list[dict]:
    """Walk through all npm security advisories with pagination."""
    all_advisories = []
    page = 0

    while page < max_pages:
        params = {"page": page, "perPage": page_size}
        resp = requests.get(NPM_SECURITY_API, params=params, timeout=20)
        resp.raise_for_status()
        data = resp.json()

        batch = data.get("objects", [])
        if not batch:
            break

        all_advisories.extend(batch)
        print(f"  Page {page}: fetched {len(batch)} advisories (total: {len(all_advisories)})")
        page += 1
        time.sleep(0.5)

    return all_advisories

GitHub Advisory Database (GraphQL)

GHSA gives you richer metadata than the npm audit API alone. The GraphQL endpoint lets you filter by ecosystem and paginate through thousands of records with full CVE cross-references and CVSS scoring.

import requests

GITHUB_GRAPHQL_URL = "https://api.github.com/graphql"
GITHUB_TOKEN = "ghp_your_token_here"  # needs security_events or public_repo scope

ADVISORY_QUERY = """
query($cursor: String) {
  securityAdvisories(ecosystem: NPM, first: 50, after: $cursor) {
    pageInfo {
      hasNextPage
      endCursor
    }
    nodes {
      ghsaId
      summary
      description
      severity
      publishedAt
      updatedAt
      withdrawnAt
      cvss {
        score
        vectorString
      }
      cwes(first: 5) {
        nodes {
          cweId
          name
        }
      }
      identifiers {
        type
        value
      }
      references {
        url
      }
      vulnerabilities(first: 10) {
        nodes {
          package {
            name
            ecosystem
          }
          vulnerableVersionRange
          firstPatchedVersion {
            identifier
          }
        }
      }
    }
  }
}
"""


def fetch_github_advisories(max_pages: int = 50) -> list[dict]:
    """Paginate through all npm security advisories in GitHub's database."""
    headers = {
        "Authorization": f"Bearer {GITHUB_TOKEN}",
        "Content-Type": "application/json",
        "X-Github-Next-Global-ID": "1",
    }
    cursor = None
    results = []

    for page_num in range(max_pages):
        variables = {"cursor": cursor}
        resp = requests.post(
            GITHUB_GRAPHQL_URL,
            headers=headers,
            json={"query": ADVISORY_QUERY, "variables": variables},
            timeout=30
        )

        # Check rate limit headers
        remaining = int(resp.headers.get("X-RateLimit-Remaining", 5000))
        if remaining < 100:
            reset_ts = int(resp.headers.get("X-RateLimit-Reset", 0))
            wait = max(0, reset_ts - int(time.time())) + 5
            print(f"  Rate limit low ({remaining} remaining). Sleeping {wait}s.")
            time.sleep(wait)

        resp.raise_for_status()
        data = resp.json()

        if "errors" in data:
            print(f"  GraphQL errors: {data['errors']}")
            break

        page = data["data"]["securityAdvisories"]
        results.extend(page["nodes"])
        print(f"  Page {page_num + 1}: {len(page['nodes'])} advisories")

        if not page["pageInfo"]["hasNextPage"]:
            break

        cursor = page["pageInfo"]["endCursor"]
        time.sleep(0.3)  # Polite pause between pages

    return results


def parse_github_advisory(node: dict) -> dict:
    """Normalize a GHSA advisory node into a flat dict."""
    cves = [i["value"] for i in node["identifiers"] if i["type"] == "CVE"]
    ghsa_ids = [i["value"] for i in node["identifiers"] if i["type"] == "GHSA"]

    packages = []
    for vuln in node.get("vulnerabilities", {}).get("nodes", []):
        pkg = vuln.get("package", {})
        patched = vuln.get("firstPatchedVersion")
        packages.append({
            "name": pkg.get("name"),
            "vulnerable_range": vuln.get("vulnerableVersionRange"),
            "first_patch": patched["identifier"] if patched else None,
        })

    cwes = [f"{c['cweId']} ({c['name']})" for c in node.get("cwes", {}).get("nodes", [])]

    cvss = node.get("cvss", {}) or {}

    return {
        "ghsa_id": node.get("ghsaId"),
        "cve_ids": cves,
        "summary": node.get("summary"),
        "severity": node.get("severity"),
        "cvss_score": cvss.get("score"),
        "cvss_vector": cvss.get("vectorString"),
        "cwes": cwes,
        "published": node.get("publishedAt"),
        "updated": node.get("updatedAt"),
        "withdrawn": node.get("withdrawnAt"),
        "packages": packages,
        "references": [r["url"] for r in node.get("references", [])],
    }


if __name__ == "__main__":
    advisories = fetch_github_advisories(max_pages=3)
    for raw in advisories[:5]:
        adv = parse_github_advisory(raw)
        print(f"{adv['ghsa_id']} | {adv['severity']} | CVSS: {adv['cvss_score']} | "
              f"CVEs: {adv['cve_ids']} | {adv['summary'][:60]}")

OSS Index (Sonatype)

OSS Index uses package coordinates — for npm, that's pkg:npm/package-name@version. You can batch up to 128 coordinates per request, making it efficient for checking a full lockfile.

OSS_INDEX_URL = "https://ossindex.sonatype.org/api/v3/component-report"


def query_oss_index(
    packages: list[tuple[str, str]],
    username: str = "",
    token: str = "",
) -> list[dict]:
    """
    Check vulnerabilities for a list of packages via OSS Index.
    packages: list of (name, version) tuples
    """
    # OSS Index allows max 128 coords per request
    def chunks(lst, n):
        for i in range(0, len(lst), n):
            yield lst[i:i + n]

    coordinates = [f"pkg:npm/{name}@{version}" for name, version in packages]
    auth = (username, token) if username else None

    all_results = []
    for batch in chunks(coordinates, 128):
        resp = requests.post(
            OSS_INDEX_URL,
            json={"coordinates": batch},
            auth=auth,
            timeout=30
        )

        if resp.status_code == 429:
            retry_after = int(resp.headers.get("X-RateLimit-Reset", 60))
            print(f"  Rate limited. Waiting {retry_after}s...")
            time.sleep(retry_after)
            resp = requests.post(
                OSS_INDEX_URL,
                json={"coordinates": batch},
                auth=auth,
                timeout=30
            )

        resp.raise_for_status()
        all_results.extend(resp.json())
        time.sleep(0.5)

    return all_results


def parse_oss_results(reports: list[dict]) -> list[dict]:
    """Extract vulnerability data from OSS Index component reports."""
    vulnerable = []
    for report in reports:
        vulns = report.get("vulnerabilities", [])
        if not vulns:
            continue

        coord = report.get("coordinates", "")
        # Parse pkg:npm/name@version
        pkg_part = coord.replace("pkg:npm/", "")
        name, _, version = pkg_part.partition("@")

        for v in vulns:
            vulnerable.append({
                "package": name,
                "version": version,
                "cve": v.get("cve"),
                "cvss_score": v.get("cvssScore"),
                "cvss_vector": v.get("cvssVector"),
                "title": v.get("title"),
                "description": v.get("description", "")[:200],
                "reference": v.get("reference"),
                "excluded": v.get("excluded", False),
            })

    return sorted(vulnerable, key=lambda x: float(x["cvss_score"] or 0), reverse=True)


if __name__ == "__main__":
    packages = [
        ("express", "4.18.2"),
        ("axios", "0.21.1"),
        ("lodash", "4.17.15"),
        ("moment", "2.29.1"),
        ("underscore", "1.12.0"),
    ]

    reports = query_oss_index(packages)
    results = parse_oss_results(reports)

    if results:
        print(f"Found {len(results)} vulnerabilities:\n")
        for r in results:
            print(f"  [{r['cvss_score']:>4}] {r['package']}@{r['version']}: {r['title']}")
    else:
        print("No known vulnerabilities found.")

Anti-Bot Measures and Rate Limiting in Production

These three APIs have distinct rate limit behaviors:

npm audit API: No documented rate limit for the audit endpoint, but aggressive use from a single IP triggers informal throttling. The npm registry itself limits to ~1000 req/hour per IP for unauthenticated calls. For bulk auditing of thousands of packages, distribute requests over time or across IPs.

GitHub GraphQL API: 5,000 points per hour per authenticated token. Each advisory node returned costs roughly 1 point. Requesting 50 advisories per page at 50 pages = ~2,500 points — half your budget for one full pull. Use conditional requests (If-Modified-Since) on subsequent runs to skip pages that haven't changed.

OSS Index: 128 coordinates per request, with a rate limit period of roughly 1 hour for anonymous use. Register a free account to get higher limits with HTTP Basic auth.

When you're auditing large monorepos — thousands of transitive dependencies, running multiple times per day — sequential requests from a single IP will hit ceilings. Distributing across IPs is the practical fix. ThorData offers residential proxy pools where each rotating exit IP gets its own rate limit counters, letting you run parallel workers:

import requests
import random

THORDATA_USER = "your_thordata_user"
THORDATA_PASS = "your_thordata_pass"
THORDATA_HOST = "proxy.thordata.net"
THORDATA_PORT = 10000


def make_proxy_session(session_id: str = None) -> requests.Session:
    """Create a requests session routed through a ThorData proxy."""
    s = requests.Session()

    if session_id:
        # Sticky session — same exit IP for the life of this session_id
        username = f"{THORDATA_USER}-session-{session_id}"
    else:
        # Rotating — new IP on each request
        username = f"{THORDATA_USER}-rotate"

    proxy_url = f"http://{username}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"
    s.proxies = {
        "http": proxy_url,
        "https": proxy_url,
    }
    return s


def parallel_audit_packages(
    package_batches: list[list[tuple]],
    workers: int = 5,
) -> list[dict]:
    """Audit multiple batches of packages in parallel via proxied sessions."""
    from concurrent.futures import ThreadPoolExecutor, as_completed

    all_results = []

    def audit_batch(batch_with_id):
        batch, batch_id = batch_with_id
        session = make_proxy_session(session_id=f"batch-{batch_id}")
        # Use session for OSS Index call
        coords = [f"pkg:npm/{name}@{ver}" for name, ver in batch]
        resp = session.post(
            OSS_INDEX_URL,
            json={"coordinates": coords},
            timeout=30
        )
        resp.raise_for_status()
        return resp.json()

    with ThreadPoolExecutor(max_workers=workers) as pool:
        futures = {
            pool.submit(audit_batch, (batch, i)): i
            for i, batch in enumerate(package_batches)
        }
        for future in as_completed(futures):
            try:
                results = future.result()
                all_results.extend(results)
            except Exception as e:
                print(f"  Batch failed: {e}")

    return all_results

Building a Local CVE Database

Persist advisories in SQLite so you can track when new ones appear, diff between runs, and query by package or CVE ID:

import sqlite3
from datetime import datetime


def init_db(db_path: str = "npm_advisories.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS advisories (
            id              TEXT PRIMARY KEY,
            source          TEXT NOT NULL,
            package_name    TEXT,
            severity        TEXT,
            cvss_score      REAL,
            cvss_vector     TEXT,
            title           TEXT,
            summary         TEXT,
            cve_ids         TEXT,
            cwe_ids         TEXT,
            vulnerable_versions TEXT,
            patched_versions    TEXT,
            url             TEXT,
            references_json TEXT,
            withdrawn       BOOLEAN DEFAULT 0,
            first_seen      TEXT,
            last_seen       TEXT
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS advisory_packages (
            advisory_id     TEXT NOT NULL,
            package_name    TEXT NOT NULL,
            vulnerable_range TEXT,
            first_patch     TEXT,
            FOREIGN KEY (advisory_id) REFERENCES advisories(id)
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS scan_runs (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            run_at      TEXT DEFAULT (datetime('now')),
            source      TEXT,
            new_count   INTEGER,
            updated_count INTEGER
        )
    """)

    conn.commit()
    return conn


def upsert_advisory(
    conn: sqlite3.Connection,
    adv: dict,
    source: str,
) -> tuple[bool, bool]:
    """
    Insert or update an advisory. Returns (is_new, is_updated).
    """
    now = datetime.utcnow().isoformat()
    existing = conn.execute(
        "SELECT id, severity, patched_versions FROM advisories WHERE id = ?",
        (adv["id"],)
    ).fetchone()

    if existing:
        old_sev = existing[1]
        old_patch = existing[2]
        new_sev = adv.get("severity")
        new_patch = adv.get("patched_versions")

        changed = (old_sev != new_sev) or (old_patch != new_patch)

        conn.execute(
            """UPDATE advisories SET
               last_seen=?, severity=?, cvss_score=?, patched_versions=?,
               summary=?, withdrawn=?
               WHERE id=?""",
            (now, new_sev, adv.get("cvss_score"), new_patch,
             adv.get("summary"), adv.get("withdrawn", False), adv["id"])
        )
        conn.commit()
        return False, changed

    else:
        conn.execute(
            """INSERT INTO advisories
               (id, source, package_name, severity, cvss_score, cvss_vector,
                title, summary, cve_ids, cwe_ids, vulnerable_versions,
                patched_versions, url, references_json, withdrawn,
                first_seen, last_seen)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                adv["id"], source,
                adv.get("module_name") or adv.get("package_name"),
                adv.get("severity"),
                adv.get("cvss_score"),
                adv.get("cvss_vector"),
                adv.get("title"),
                adv.get("summary"),
                json.dumps(adv.get("cves") or adv.get("cve_ids") or []),
                json.dumps(adv.get("cwes") or []),
                adv.get("vulnerable_versions"),
                adv.get("patched_versions"),
                adv.get("url"),
                json.dumps(adv.get("references") or []),
                adv.get("withdrawn", False),
                now, now
            )
        )
        conn.commit()
        return True, False


def get_new_since(conn: sqlite3.Connection, since: str) -> list:
    """Get all advisories first seen after a given ISO timestamp."""
    rows = conn.execute(
        """SELECT id, source, package_name, severity, cvss_score, title, cve_ids
           FROM advisories
           WHERE first_seen > ? AND withdrawn = 0
           ORDER BY
             CASE severity
               WHEN 'critical' THEN 0 WHEN 'high' THEN 1
               WHEN 'moderate' THEN 2 WHEN 'low' THEN 3
               ELSE 4 END,
             cvss_score DESC NULLS LAST""",
        (since,)
    ).fetchall()
    return rows


def get_critical_unpatched(conn: sqlite3.Connection) -> list:
    """Find critical/high advisories where no patch version is available."""
    rows = conn.execute(
        """SELECT package_name, severity, cvss_score, title, cve_ids, url
           FROM advisories
           WHERE severity IN ('critical', 'high')
             AND (patched_versions IS NULL OR patched_versions = '')
             AND withdrawn = 0
           ORDER BY cvss_score DESC NULLS LAST""",
    ).fetchall()
    return rows

Deduplication Across Sources

The same vulnerability appears across all three sources with different IDs. Use CVE IDs as the canonical key:

def deduplicate_by_cve(advisories: list[dict]) -> dict:
    """
    Merge advisories from multiple sources using CVE ID as canonical key.
    Returns dict keyed by CVE ID (or advisory ID if no CVE).
    """
    canonical = {}

    for adv in advisories:
        cves = adv.get("cve_ids") or adv.get("cves") or []

        if cves:
            for cve in cves:
                if cve not in canonical:
                    canonical[cve] = adv
                else:
                    # Merge: keep highest CVSS score and most detailed description
                    existing = canonical[cve]
                    if (adv.get("cvss_score") or 0) > (existing.get("cvss_score") or 0):
                        canonical[cve] = {**existing, **adv}
        else:
            # No CVE — use advisory's own ID
            key = adv.get("ghsa_id") or adv.get("id") or str(id(adv))
            canonical[key] = adv

    return canonical


def normalize_severity_to_cvss(severity: str, cvss_score: float = None) -> float:
    """Normalize string severity labels to a 0-10 CVSS float."""
    if cvss_score is not None:
        return float(cvss_score)

    mapping = {
        "critical": 9.5,
        "high": 8.0,
        "moderate": 5.0,
        "medium": 5.0,
        "low": 2.0,
    }
    return mapping.get((severity or "").lower(), 0.0)

Alerting and CI/CD Integration

The natural home for this pipeline is a CI check that fails the build when new critical advisories are found:

import sys

def ci_audit_check(
    packages: dict,
    db_path: str = "npm_advisories.db",
    fail_on: list[str] = ("critical", "high"),
) -> int:
    """
    Run a full audit check suitable for CI/CD integration.
    Returns exit code: 0 = clean, 1 = vulnerabilities found.
    """
    conn = init_db(db_path)

    package_list = list(packages.items())
    audit_result = query_npm_audit_bulk(packages)
    advisories = extract_advisories(audit_result)

    blocking = [
        a for a in advisories
        if a["severity"] in fail_on
    ]

    if blocking:
        print(f"\n{'='*60}")
        print(f"VULNERABILITY CHECK FAILED: {len(blocking)} blocking issue(s)")
        print(f"{'='*60}\n")

        for adv in blocking:
            print(f"[{adv['severity'].upper()}] {adv['module_name']}")
            print(f"  Title: {adv['title']}")
            print(f"  CVEs: {', '.join(adv['cves']) or 'none'}")
            print(f"  Affected: {adv['vulnerable_versions']}")
            print(f"  Fix: {adv['patched_versions'] or 'No patch available'}")
            print(f"  More: {adv['url']}\n")

        conn.close()
        return 1

    print(f"Audit passed. Checked {len(package_list)} packages, "
          f"{len(advisories)} advisory(ies) found (none blocking).")
    conn.close()
    return 0


if __name__ == "__main__":
    packages_to_check = {
        "express": "4.18.2",
        "axios": "1.6.0",
        "lodash": "4.17.21",
        "webpack": "5.88.0",
    }
    exit_code = ci_audit_check(packages_to_check)
    sys.exit(exit_code)

Practical Tips for Production Pipelines

Semver matching: Use the semver package to programmatically check whether your pinned version falls inside vulnerable_versions, rather than relying on string comparison:

import semver

def is_version_vulnerable(version: str, vulnerable_range: str) -> bool:
    """Check if a version satisfies a semver vulnerability range."""
    try:
        return semver.Version.parse(version).match(vulnerable_range)
    except ValueError:
        # Fallback to string contains for non-standard ranges
        return version in (vulnerable_range or "")

Scheduling: Run the full scrape nightly. Store the run timestamp and diff first_seen between runs to generate "new this week" reports.

Slack / webhook alerting: When get_new_since() returns critical/high advisories, post to a Slack webhook with package name, CVE, severity, and upgrade instructions.

Transitive dependency expansion: The npm registry's package endpoint returns the full dependency tree for any package. Combine that with advisory lookups to flag vulnerabilities in transitive dependencies that don't appear directly in your package.json.

CVSS v3 vs v4: OSS Index returns CVSS v3 vectors. GitHub's database includes both v3 and v4 scores when available. Normalize to CVSS v3 for consistent sorting if you're mixing sources.

Running all three sources in sequence and merging by CVE ID into SQLite gives you a queryable audit trail that survives past any single npm audit run, lets you track when advisories get patched, and surfaces the intersection between what's vulnerable and what's actually in your codebase.