Scraping npm Security Advisories: CVE Tracking & Vulnerability Data with Python (2026)
Scraping npm Security Advisories: CVE Tracking & Vulnerability Data with Python (2026)
Supply chain attacks hit npm hard in recent years. Malicious packages, compromised maintainer accounts, and quietly-introduced vulnerabilities have burned teams who assumed "installed from npm" meant "safe." Manual checking doesn't scale: the average Node.js project has hundreds of transitive dependencies, each a potential attack surface.
Automated tracking — pulling advisory data into a local database you can query, diff, and alert on — is the only way to stay on top of this at any reasonable scale. This guide covers three complementary data sources and shows how to combine them into a practical vulnerability monitoring pipeline.
Data Sources Overview
Three sources together cover npm vulnerabilities comprehensively:
- npm audit API — the same endpoint
npm audithits. No auth required. Returns advisory data cross-referenced against specific package versions in your dependency tree. - GitHub Advisory Database (GHSA) — GraphQL API with rich CVE cross-references, CVSS scores, CWE classifications, and affected version ranges. Updated frequently by GitHub's security team and the broader security community.
- OSS Index (Sonatype) — REST API for component vulnerability lookups using package coordinates (purl format). 128 requests/period anonymous, higher limits with a free account. Strong coverage of transitive dependency chains.
Each source has different strengths. npm's API is the most direct for per-project auditing. GHSA has the best CVE metadata and is maintained by people who care about accuracy. OSS Index excels at batch lookups when you have a large manifest to check.
Querying the npm Audit Endpoint
The npm audit API accepts a POST body that mimics a package-lock.json structure. You can construct minimal payloads for targeted package lookups without a full project context.
import requests
import json
import time
NPM_AUDIT_URL = "https://registry.npmjs.org/-/npm/v1/security/audits"
def query_npm_audit(package_name: str, version: str) -> dict:
"""Query the npm audit API for a specific package version."""
payload = {
"name": "audit-check",
"version": "1.0.0",
"requires": {package_name: version},
"dependencies": {
package_name: {
"version": version,
"integrity": "",
"requires": {}
}
}
}
headers = {
"Content-Type": "application/json",
"npm-command": "audit",
}
resp = requests.post(
NPM_AUDIT_URL,
headers=headers,
data=json.dumps(payload),
timeout=15
)
resp.raise_for_status()
return resp.json()
def query_npm_audit_bulk(packages: dict) -> dict:
"""
Query the npm audit API for multiple packages at once.
packages: dict of {name: version}
"""
deps = {
name: {"version": ver, "integrity": "", "requires": {}}
for name, ver in packages.items()
}
payload = {
"name": "bulk-audit",
"version": "1.0.0",
"requires": packages,
"dependencies": deps,
}
headers = {
"Content-Type": "application/json",
"npm-command": "audit",
}
resp = requests.post(
NPM_AUDIT_URL,
headers=headers,
data=json.dumps(payload),
timeout=30
)
resp.raise_for_status()
return resp.json()
def extract_advisories(audit_result: dict) -> list[dict]:
"""Parse advisory objects from an npm audit response."""
advisories = []
for adv_id, adv in audit_result.get("advisories", {}).items():
advisories.append({
"id": adv_id,
"module_name": adv.get("module_name"),
"severity": adv.get("severity"),
"title": adv.get("title"),
"cves": adv.get("cves", []),
"cwe": adv.get("cwe"),
"vulnerable_versions": adv.get("vulnerable_versions"),
"patched_versions": adv.get("patched_versions"),
"recommendation": adv.get("recommendation"),
"references": adv.get("references"),
"url": adv.get("url"),
"updated_at": adv.get("updated"),
})
return sorted(advisories, key=lambda x: (
{"critical": 0, "high": 1, "moderate": 2, "low": 3}.get(x["severity"], 4)
))
if __name__ == "__main__":
# Single package check
result = query_npm_audit("lodash", "4.17.15")
for adv in extract_advisories(result):
print(f"[{adv['severity'].upper()}] {adv['title']}")
print(f" CVEs: {', '.join(adv['cves']) or 'none'}")
print(f" Affected: {adv['vulnerable_versions']}")
print(f" Fix: {adv['patched_versions']}")
print()
Querying the npm Security Advisory REST API
Separately from the audit endpoint, npm exposes a security advisories REST API that lets you query advisories directly by ID or package name:
NPM_SECURITY_API = "https://registry.npmjs.org/-/npm/v1/security/advisories"
def get_advisory_by_id(advisory_id: str) -> dict:
"""Fetch a single npm security advisory by its numeric ID."""
url = f"{NPM_SECURITY_API}/{advisory_id}"
resp = requests.get(url, timeout=15)
resp.raise_for_status()
return resp.json()
def search_advisories_for_package(package_name: str) -> list[dict]:
"""Search all published advisories affecting a package."""
url = f"{NPM_SECURITY_API}/search"
params = {"package": package_name}
resp = requests.get(url, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
return data.get("objects", [])
def paginate_all_advisories(page_size: int = 100, max_pages: int = 50) -> list[dict]:
"""Walk through all npm security advisories with pagination."""
all_advisories = []
page = 0
while page < max_pages:
params = {"page": page, "perPage": page_size}
resp = requests.get(NPM_SECURITY_API, params=params, timeout=20)
resp.raise_for_status()
data = resp.json()
batch = data.get("objects", [])
if not batch:
break
all_advisories.extend(batch)
print(f" Page {page}: fetched {len(batch)} advisories (total: {len(all_advisories)})")
page += 1
time.sleep(0.5)
return all_advisories
GitHub Advisory Database (GraphQL)
GHSA gives you richer metadata than the npm audit API alone. The GraphQL endpoint lets you filter by ecosystem and paginate through thousands of records with full CVE cross-references and CVSS scoring.
import requests
GITHUB_GRAPHQL_URL = "https://api.github.com/graphql"
GITHUB_TOKEN = "ghp_your_token_here" # needs security_events or public_repo scope
ADVISORY_QUERY = """
query($cursor: String) {
securityAdvisories(ecosystem: NPM, first: 50, after: $cursor) {
pageInfo {
hasNextPage
endCursor
}
nodes {
ghsaId
summary
description
severity
publishedAt
updatedAt
withdrawnAt
cvss {
score
vectorString
}
cwes(first: 5) {
nodes {
cweId
name
}
}
identifiers {
type
value
}
references {
url
}
vulnerabilities(first: 10) {
nodes {
package {
name
ecosystem
}
vulnerableVersionRange
firstPatchedVersion {
identifier
}
}
}
}
}
}
"""
def fetch_github_advisories(max_pages: int = 50) -> list[dict]:
"""Paginate through all npm security advisories in GitHub's database."""
headers = {
"Authorization": f"Bearer {GITHUB_TOKEN}",
"Content-Type": "application/json",
"X-Github-Next-Global-ID": "1",
}
cursor = None
results = []
for page_num in range(max_pages):
variables = {"cursor": cursor}
resp = requests.post(
GITHUB_GRAPHQL_URL,
headers=headers,
json={"query": ADVISORY_QUERY, "variables": variables},
timeout=30
)
# Check rate limit headers
remaining = int(resp.headers.get("X-RateLimit-Remaining", 5000))
if remaining < 100:
reset_ts = int(resp.headers.get("X-RateLimit-Reset", 0))
wait = max(0, reset_ts - int(time.time())) + 5
print(f" Rate limit low ({remaining} remaining). Sleeping {wait}s.")
time.sleep(wait)
resp.raise_for_status()
data = resp.json()
if "errors" in data:
print(f" GraphQL errors: {data['errors']}")
break
page = data["data"]["securityAdvisories"]
results.extend(page["nodes"])
print(f" Page {page_num + 1}: {len(page['nodes'])} advisories")
if not page["pageInfo"]["hasNextPage"]:
break
cursor = page["pageInfo"]["endCursor"]
time.sleep(0.3) # Polite pause between pages
return results
def parse_github_advisory(node: dict) -> dict:
"""Normalize a GHSA advisory node into a flat dict."""
cves = [i["value"] for i in node["identifiers"] if i["type"] == "CVE"]
ghsa_ids = [i["value"] for i in node["identifiers"] if i["type"] == "GHSA"]
packages = []
for vuln in node.get("vulnerabilities", {}).get("nodes", []):
pkg = vuln.get("package", {})
patched = vuln.get("firstPatchedVersion")
packages.append({
"name": pkg.get("name"),
"vulnerable_range": vuln.get("vulnerableVersionRange"),
"first_patch": patched["identifier"] if patched else None,
})
cwes = [f"{c['cweId']} ({c['name']})" for c in node.get("cwes", {}).get("nodes", [])]
cvss = node.get("cvss", {}) or {}
return {
"ghsa_id": node.get("ghsaId"),
"cve_ids": cves,
"summary": node.get("summary"),
"severity": node.get("severity"),
"cvss_score": cvss.get("score"),
"cvss_vector": cvss.get("vectorString"),
"cwes": cwes,
"published": node.get("publishedAt"),
"updated": node.get("updatedAt"),
"withdrawn": node.get("withdrawnAt"),
"packages": packages,
"references": [r["url"] for r in node.get("references", [])],
}
if __name__ == "__main__":
advisories = fetch_github_advisories(max_pages=3)
for raw in advisories[:5]:
adv = parse_github_advisory(raw)
print(f"{adv['ghsa_id']} | {adv['severity']} | CVSS: {adv['cvss_score']} | "
f"CVEs: {adv['cve_ids']} | {adv['summary'][:60]}")
OSS Index (Sonatype)
OSS Index uses package coordinates — for npm, that's pkg:npm/package-name@version. You can batch up to 128 coordinates per request, making it efficient for checking a full lockfile.
OSS_INDEX_URL = "https://ossindex.sonatype.org/api/v3/component-report"
def query_oss_index(
packages: list[tuple[str, str]],
username: str = "",
token: str = "",
) -> list[dict]:
"""
Check vulnerabilities for a list of packages via OSS Index.
packages: list of (name, version) tuples
"""
# OSS Index allows max 128 coords per request
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
coordinates = [f"pkg:npm/{name}@{version}" for name, version in packages]
auth = (username, token) if username else None
all_results = []
for batch in chunks(coordinates, 128):
resp = requests.post(
OSS_INDEX_URL,
json={"coordinates": batch},
auth=auth,
timeout=30
)
if resp.status_code == 429:
retry_after = int(resp.headers.get("X-RateLimit-Reset", 60))
print(f" Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
resp = requests.post(
OSS_INDEX_URL,
json={"coordinates": batch},
auth=auth,
timeout=30
)
resp.raise_for_status()
all_results.extend(resp.json())
time.sleep(0.5)
return all_results
def parse_oss_results(reports: list[dict]) -> list[dict]:
"""Extract vulnerability data from OSS Index component reports."""
vulnerable = []
for report in reports:
vulns = report.get("vulnerabilities", [])
if not vulns:
continue
coord = report.get("coordinates", "")
# Parse pkg:npm/name@version
pkg_part = coord.replace("pkg:npm/", "")
name, _, version = pkg_part.partition("@")
for v in vulns:
vulnerable.append({
"package": name,
"version": version,
"cve": v.get("cve"),
"cvss_score": v.get("cvssScore"),
"cvss_vector": v.get("cvssVector"),
"title": v.get("title"),
"description": v.get("description", "")[:200],
"reference": v.get("reference"),
"excluded": v.get("excluded", False),
})
return sorted(vulnerable, key=lambda x: float(x["cvss_score"] or 0), reverse=True)
if __name__ == "__main__":
packages = [
("express", "4.18.2"),
("axios", "0.21.1"),
("lodash", "4.17.15"),
("moment", "2.29.1"),
("underscore", "1.12.0"),
]
reports = query_oss_index(packages)
results = parse_oss_results(reports)
if results:
print(f"Found {len(results)} vulnerabilities:\n")
for r in results:
print(f" [{r['cvss_score']:>4}] {r['package']}@{r['version']}: {r['title']}")
else:
print("No known vulnerabilities found.")
Anti-Bot Measures and Rate Limiting in Production
These three APIs have distinct rate limit behaviors:
npm audit API: No documented rate limit for the audit endpoint, but aggressive use from a single IP triggers informal throttling. The npm registry itself limits to ~1000 req/hour per IP for unauthenticated calls. For bulk auditing of thousands of packages, distribute requests over time or across IPs.
GitHub GraphQL API: 5,000 points per hour per authenticated token. Each advisory node returned costs roughly 1 point. Requesting 50 advisories per page at 50 pages = ~2,500 points — half your budget for one full pull. Use conditional requests (If-Modified-Since) on subsequent runs to skip pages that haven't changed.
OSS Index: 128 coordinates per request, with a rate limit period of roughly 1 hour for anonymous use. Register a free account to get higher limits with HTTP Basic auth.
When you're auditing large monorepos — thousands of transitive dependencies, running multiple times per day — sequential requests from a single IP will hit ceilings. Distributing across IPs is the practical fix. ThorData offers residential proxy pools where each rotating exit IP gets its own rate limit counters, letting you run parallel workers:
import requests
import random
THORDATA_USER = "your_thordata_user"
THORDATA_PASS = "your_thordata_pass"
THORDATA_HOST = "proxy.thordata.net"
THORDATA_PORT = 10000
def make_proxy_session(session_id: str = None) -> requests.Session:
"""Create a requests session routed through a ThorData proxy."""
s = requests.Session()
if session_id:
# Sticky session — same exit IP for the life of this session_id
username = f"{THORDATA_USER}-session-{session_id}"
else:
# Rotating — new IP on each request
username = f"{THORDATA_USER}-rotate"
proxy_url = f"http://{username}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"
s.proxies = {
"http": proxy_url,
"https": proxy_url,
}
return s
def parallel_audit_packages(
package_batches: list[list[tuple]],
workers: int = 5,
) -> list[dict]:
"""Audit multiple batches of packages in parallel via proxied sessions."""
from concurrent.futures import ThreadPoolExecutor, as_completed
all_results = []
def audit_batch(batch_with_id):
batch, batch_id = batch_with_id
session = make_proxy_session(session_id=f"batch-{batch_id}")
# Use session for OSS Index call
coords = [f"pkg:npm/{name}@{ver}" for name, ver in batch]
resp = session.post(
OSS_INDEX_URL,
json={"coordinates": coords},
timeout=30
)
resp.raise_for_status()
return resp.json()
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(audit_batch, (batch, i)): i
for i, batch in enumerate(package_batches)
}
for future in as_completed(futures):
try:
results = future.result()
all_results.extend(results)
except Exception as e:
print(f" Batch failed: {e}")
return all_results
Building a Local CVE Database
Persist advisories in SQLite so you can track when new ones appear, diff between runs, and query by package or CVE ID:
import sqlite3
from datetime import datetime
def init_db(db_path: str = "npm_advisories.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS advisories (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
package_name TEXT,
severity TEXT,
cvss_score REAL,
cvss_vector TEXT,
title TEXT,
summary TEXT,
cve_ids TEXT,
cwe_ids TEXT,
vulnerable_versions TEXT,
patched_versions TEXT,
url TEXT,
references_json TEXT,
withdrawn BOOLEAN DEFAULT 0,
first_seen TEXT,
last_seen TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS advisory_packages (
advisory_id TEXT NOT NULL,
package_name TEXT NOT NULL,
vulnerable_range TEXT,
first_patch TEXT,
FOREIGN KEY (advisory_id) REFERENCES advisories(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS scan_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_at TEXT DEFAULT (datetime('now')),
source TEXT,
new_count INTEGER,
updated_count INTEGER
)
""")
conn.commit()
return conn
def upsert_advisory(
conn: sqlite3.Connection,
adv: dict,
source: str,
) -> tuple[bool, bool]:
"""
Insert or update an advisory. Returns (is_new, is_updated).
"""
now = datetime.utcnow().isoformat()
existing = conn.execute(
"SELECT id, severity, patched_versions FROM advisories WHERE id = ?",
(adv["id"],)
).fetchone()
if existing:
old_sev = existing[1]
old_patch = existing[2]
new_sev = adv.get("severity")
new_patch = adv.get("patched_versions")
changed = (old_sev != new_sev) or (old_patch != new_patch)
conn.execute(
"""UPDATE advisories SET
last_seen=?, severity=?, cvss_score=?, patched_versions=?,
summary=?, withdrawn=?
WHERE id=?""",
(now, new_sev, adv.get("cvss_score"), new_patch,
adv.get("summary"), adv.get("withdrawn", False), adv["id"])
)
conn.commit()
return False, changed
else:
conn.execute(
"""INSERT INTO advisories
(id, source, package_name, severity, cvss_score, cvss_vector,
title, summary, cve_ids, cwe_ids, vulnerable_versions,
patched_versions, url, references_json, withdrawn,
first_seen, last_seen)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
adv["id"], source,
adv.get("module_name") or adv.get("package_name"),
adv.get("severity"),
adv.get("cvss_score"),
adv.get("cvss_vector"),
adv.get("title"),
adv.get("summary"),
json.dumps(adv.get("cves") or adv.get("cve_ids") or []),
json.dumps(adv.get("cwes") or []),
adv.get("vulnerable_versions"),
adv.get("patched_versions"),
adv.get("url"),
json.dumps(adv.get("references") or []),
adv.get("withdrawn", False),
now, now
)
)
conn.commit()
return True, False
def get_new_since(conn: sqlite3.Connection, since: str) -> list:
"""Get all advisories first seen after a given ISO timestamp."""
rows = conn.execute(
"""SELECT id, source, package_name, severity, cvss_score, title, cve_ids
FROM advisories
WHERE first_seen > ? AND withdrawn = 0
ORDER BY
CASE severity
WHEN 'critical' THEN 0 WHEN 'high' THEN 1
WHEN 'moderate' THEN 2 WHEN 'low' THEN 3
ELSE 4 END,
cvss_score DESC NULLS LAST""",
(since,)
).fetchall()
return rows
def get_critical_unpatched(conn: sqlite3.Connection) -> list:
"""Find critical/high advisories where no patch version is available."""
rows = conn.execute(
"""SELECT package_name, severity, cvss_score, title, cve_ids, url
FROM advisories
WHERE severity IN ('critical', 'high')
AND (patched_versions IS NULL OR patched_versions = '')
AND withdrawn = 0
ORDER BY cvss_score DESC NULLS LAST""",
).fetchall()
return rows
Deduplication Across Sources
The same vulnerability appears across all three sources with different IDs. Use CVE IDs as the canonical key:
def deduplicate_by_cve(advisories: list[dict]) -> dict:
"""
Merge advisories from multiple sources using CVE ID as canonical key.
Returns dict keyed by CVE ID (or advisory ID if no CVE).
"""
canonical = {}
for adv in advisories:
cves = adv.get("cve_ids") or adv.get("cves") or []
if cves:
for cve in cves:
if cve not in canonical:
canonical[cve] = adv
else:
# Merge: keep highest CVSS score and most detailed description
existing = canonical[cve]
if (adv.get("cvss_score") or 0) > (existing.get("cvss_score") or 0):
canonical[cve] = {**existing, **adv}
else:
# No CVE — use advisory's own ID
key = adv.get("ghsa_id") or adv.get("id") or str(id(adv))
canonical[key] = adv
return canonical
def normalize_severity_to_cvss(severity: str, cvss_score: float = None) -> float:
"""Normalize string severity labels to a 0-10 CVSS float."""
if cvss_score is not None:
return float(cvss_score)
mapping = {
"critical": 9.5,
"high": 8.0,
"moderate": 5.0,
"medium": 5.0,
"low": 2.0,
}
return mapping.get((severity or "").lower(), 0.0)
Alerting and CI/CD Integration
The natural home for this pipeline is a CI check that fails the build when new critical advisories are found:
import sys
def ci_audit_check(
packages: dict,
db_path: str = "npm_advisories.db",
fail_on: list[str] = ("critical", "high"),
) -> int:
"""
Run a full audit check suitable for CI/CD integration.
Returns exit code: 0 = clean, 1 = vulnerabilities found.
"""
conn = init_db(db_path)
package_list = list(packages.items())
audit_result = query_npm_audit_bulk(packages)
advisories = extract_advisories(audit_result)
blocking = [
a for a in advisories
if a["severity"] in fail_on
]
if blocking:
print(f"\n{'='*60}")
print(f"VULNERABILITY CHECK FAILED: {len(blocking)} blocking issue(s)")
print(f"{'='*60}\n")
for adv in blocking:
print(f"[{adv['severity'].upper()}] {adv['module_name']}")
print(f" Title: {adv['title']}")
print(f" CVEs: {', '.join(adv['cves']) or 'none'}")
print(f" Affected: {adv['vulnerable_versions']}")
print(f" Fix: {adv['patched_versions'] or 'No patch available'}")
print(f" More: {adv['url']}\n")
conn.close()
return 1
print(f"Audit passed. Checked {len(package_list)} packages, "
f"{len(advisories)} advisory(ies) found (none blocking).")
conn.close()
return 0
if __name__ == "__main__":
packages_to_check = {
"express": "4.18.2",
"axios": "1.6.0",
"lodash": "4.17.21",
"webpack": "5.88.0",
}
exit_code = ci_audit_check(packages_to_check)
sys.exit(exit_code)
Practical Tips for Production Pipelines
Semver matching: Use the semver package to programmatically check whether your pinned version falls inside vulnerable_versions, rather than relying on string comparison:
import semver
def is_version_vulnerable(version: str, vulnerable_range: str) -> bool:
"""Check if a version satisfies a semver vulnerability range."""
try:
return semver.Version.parse(version).match(vulnerable_range)
except ValueError:
# Fallback to string contains for non-standard ranges
return version in (vulnerable_range or "")
Scheduling: Run the full scrape nightly. Store the run timestamp and diff first_seen between runs to generate "new this week" reports.
Slack / webhook alerting: When get_new_since() returns critical/high advisories, post to a Slack webhook with package name, CVE, severity, and upgrade instructions.
Transitive dependency expansion: The npm registry's package endpoint returns the full dependency tree for any package. Combine that with advisory lookups to flag vulnerabilities in transitive dependencies that don't appear directly in your package.json.
CVSS v3 vs v4: OSS Index returns CVSS v3 vectors. GitHub's database includes both v3 and v4 scores when available. Normalize to CVSS v3 for consistent sorting if you're mixing sources.
Running all three sources in sequence and merging by CVE ID into SQLite gives you a queryable audit trail that survives past any single npm audit run, lets you track when advisories get patched, and surfaces the intersection between what's vulnerable and what's actually in your codebase.