Building a Multi-Source News Monitor: NewsAPI, GDELT, and CommonCrawl (2026)
Building a Multi-Source News Monitor: NewsAPI, GDELT, and CommonCrawl (2026)
Relying on a single news source for monitoring is fragile. APIs go down, rate limits hit at the worst times, and every provider has blind spots in their coverage. The practical solution is to pull from multiple sources and deduplicate. This guide builds a news monitoring pipeline that combines three complementary data sources: NewsAPI for real-time headlines, GDELT for global event data, and CommonCrawl for historical article text.
By the end, you'll have a system that catches stories across sources, deduplicates them, and outputs a clean feed you can filter by topic, region, or time window.
Architecture Overview
Each source has different strengths:
- NewsAPI — fast, well-structured, covers major English-language outlets with a simple REST API. Free tier gives 100 requests/day with articles up to a month old.
- GDELT Project — massive open dataset tracking news events globally in 65+ languages. Updated every 15 minutes. No API key required. Coverage is broad but data is noisier.
- CommonCrawl — petabyte-scale web archive. Not a news source per se, but contains the full text of articles that other APIs only give you headlines for. Good for backfilling and historical analysis.
The pipeline pulls from all three, normalizes the output into a common schema, and deduplicates by URL and title similarity.
Source 1: NewsAPI
NewsAPI is the quickest to get running. Sign up at newsapi.org for a free key.
import httpx
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict, field
@dataclass
class Article:
title: str
url: str
source: str
published_at: str
description: str = ""
content: str = ""
origin: str = "" # which pipeline source found this
def fetch_newsapi(
query: str,
api_key: str,
days_back: int = 7,
page_size: int = 100,
) -> list[Article]:
"""Fetch articles from NewsAPI's /everything endpoint."""
from_date = (datetime.utcnow() - timedelta(days=days_back)).strftime("%Y-%m-%d")
resp = httpx.get(
"https://newsapi.org/v2/everything",
params={
"q": query,
"from": from_date,
"sortBy": "publishedAt",
"pageSize": page_size,
"language": "en",
"apiKey": api_key,
},
timeout=20,
)
resp.raise_for_status()
data = resp.json()
articles = []
for item in data.get("articles", []):
articles.append(Article(
title=item.get("title", ""),
url=item.get("url", ""),
source=item.get("source", {}).get("name", ""),
published_at=item.get("publishedAt", ""),
description=item.get("description", ""),
content=item.get("content", ""),
origin="newsapi",
))
return articles
Limitations to know about. The free tier truncates article content at 200 characters. You only get articles from the last 30 days. And the 100 requests/day limit means you need to batch your queries carefully — don't poll every minute.
Source 2: GDELT Project
GDELT is a different beast entirely. It processes news from virtually every country and encodes events using the CAMEO taxonomy — a structured system that classifies who did what to whom. The DOC API lets you search for articles by keyword:
from urllib.parse import quote
def fetch_gdelt_articles(
query: str,
mode: str = "artlist",
max_records: int = 75,
timespan: str = "7d",
) -> list[Article]:
"""Search GDELT DOC API for news articles."""
encoded_query = quote(query)
url = (
f"https://api.gdeltproject.org/api/v2/doc/doc"
f"?query={encoded_query}"
f"&mode={mode}"
f"&maxrecords={max_records}"
f"×pan={timespan}"
f"&format=json"
f"&sort=datedesc"
)
resp = httpx.get(url, timeout=30)
resp.raise_for_status()
data = resp.json()
articles = []
for item in data.get("articles", []):
articles.append(Article(
title=item.get("title", ""),
url=item.get("url", ""),
source=item.get("domain", ""),
published_at=item.get("seendate", ""),
description="",
content="",
origin="gdelt",
))
return articles
GDELT also offers a GKG (Global Knowledge Graph) endpoint that extracts themes, people, organizations, and locations from each article. That's useful if you're building topic-level monitoring:
def fetch_gdelt_gkg(query: str, timespan: str = "7d") -> list[dict]:
"""Fetch GDELT Global Knowledge Graph data for richer entity extraction."""
encoded_query = quote(query)
url = (
f"https://api.gdeltproject.org/api/v2/doc/doc"
f"?query={encoded_query}"
f"&mode=artlist"
f"&maxrecords=50"
f"×pan={timespan}"
f"&format=json"
f"&sourcelang=english"
)
resp = httpx.get(url, timeout=30)
resp.raise_for_status()
return resp.json().get("articles", [])
No API key needed. GDELT is fully open. The catch is that it rate-limits aggressively — roughly 1 request per 5 seconds. Build in sleeps.
Source 3: CommonCrawl Index
CommonCrawl archives billions of web pages. You won't use it for real-time monitoring, but it's powerful for backfilling article text that other sources only give you titles for:
def search_commoncrawl(
domain: str,
index: str = "CC-MAIN-2026-22",
limit: int = 50,
) -> list[dict]:
"""Search CommonCrawl index for pages from a specific domain."""
url = f"https://index.commoncrawl.org/{index}-index"
params = {
"url": f"{domain}/*",
"output": "json",
"limit": limit,
}
resp = httpx.get(url, params=params, timeout=30)
resp.raise_for_status()
results = []
for line in resp.text.strip().split("\n"):
if line:
import json
record = json.loads(line)
results.append({
"url": record.get("url"),
"timestamp": record.get("timestamp"),
"status": record.get("status"),
"mime": record.get("mime"),
"length": record.get("length"),
})
return results
def fetch_commoncrawl_content(warc_info: dict) -> str:
"""Fetch actual page content from CommonCrawl WARC files."""
offset = int(warc_info.get("offset", 0))
length = int(warc_info.get("length", 0))
filename = warc_info.get("filename", "")
if not filename or not length:
return ""
url = f"https://data.commoncrawl.org/{filename}"
headers = {"Range": f"bytes={offset}-{offset + length - 1}"}
resp = httpx.get(url, headers=headers, timeout=30)
resp.raise_for_status()
import gzip
try:
raw = gzip.decompress(resp.content)
# WARC records have headers then content separated by double newline
parts = raw.split(b"\r\n\r\n", 2)
if len(parts) >= 3:
return parts[2].decode("utf-8", errors="replace")
except Exception:
pass
return ""
Deduplication
Multiple sources will find the same articles. Deduplicate by URL first, then by title similarity for cases where different sources link to different URLs for the same story:
from difflib import SequenceMatcher
from urllib.parse import urlparse
def normalize_url(url: str) -> str:
"""Strip tracking parameters and fragments for comparison."""
parsed = urlparse(url)
# Remove common tracking params
clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return clean.rstrip("/").lower()
def deduplicate_articles(articles: list[Article], similarity_threshold: float = 0.85) -> list[Article]:
"""Remove duplicate articles by URL and title similarity."""
seen_urls = set()
seen_titles = []
unique = []
for article in articles:
norm_url = normalize_url(article.url)
if norm_url in seen_urls:
continue
# Check title similarity against already-seen titles
is_dup = False
for existing_title in seen_titles:
ratio = SequenceMatcher(None, article.title.lower(), existing_title).ratio()
if ratio >= similarity_threshold:
is_dup = True
break
if not is_dup:
seen_urls.add(norm_url)
seen_titles.append(article.title.lower())
unique.append(article)
return unique
Handling Anti-Bot Measures
When you follow article URLs to fetch full text, you hit each publisher's own bot defenses. This is where the pipeline gets harder.
Cloudflare and Akamai protect most major news sites. Direct httpx.get() calls get blocked on roughly 40% of URLs in a typical news corpus. You have two options: use a headless browser for the blocked ones, or accept partial content from the API responses.
Rate limiting across domains. Even though you're hitting different publishers, your outbound IP is the constant. News sites share threat intelligence — if your IP gets flagged on one Cloudflare-protected site, others may block you preemptively.
Residential proxies for full-text fetching. If you need the complete article text (not just the truncated API excerpt), rotating residential IPs are the difference between a 40% success rate and a 90%+ one. ThorData's proxy service handles this well — their residential pool covers enough geographic diversity that you're not hammering any single exit node, and the rotation happens automatically per request.
def fetch_article_text(url: str, proxy: str = None) -> str:
"""Fetch full article text, handling common anti-bot responses."""
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "en-US,en;q=0.9",
}
transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
with httpx.Client(transport=transport, headers=headers, timeout=20, follow_redirects=True) as client:
resp = client.get(url)
if resp.status_code == 403:
return "" # blocked by bot detection
resp.raise_for_status()
# Basic content extraction (use trafilatura for production)
from html.parser import HTMLParser
# Quick paragraph extraction
import re
paragraphs = re.findall(r"<p[^>]*>(.*?)</p>", resp.text, re.DOTALL)
text = "\n".join(re.sub(r"<[^>]+>", "", p) for p in paragraphs)
return text[:5000]
Persistent Storage in SQLite
For production monitoring, store articles to SQLite so you can query across runs:
import sqlite3
import json
from datetime import datetime
def init_news_db(db_path: str = "news_monitor.db") -> sqlite3.Connection:
"""Initialize SQLite database for news storage."""
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE,
title TEXT,
source TEXT,
published_at TEXT,
description TEXT,
content TEXT,
origin TEXT,
full_text TEXT,
fetched_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS pipeline_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
query TEXT,
ran_at TEXT DEFAULT (datetime('now')),
newsapi_count INTEGER DEFAULT 0,
gdelt_count INTEGER DEFAULT 0,
total_unique INTEGER DEFAULT 0,
errors TEXT DEFAULT '[]'
);
CREATE INDEX IF NOT EXISTS idx_articles_source ON articles(source);
CREATE INDEX IF NOT EXISTS idx_articles_pub ON articles(published_at DESC);
CREATE INDEX IF NOT EXISTS idx_articles_origin ON articles(origin);
""")
conn.commit()
return conn
def save_articles(conn: sqlite3.Connection, articles: list[Article]) -> int:
"""Batch insert articles, skipping duplicates by URL."""
saved = 0
for a in articles:
try:
conn.execute("""
INSERT OR IGNORE INTO articles
(url, title, source, published_at, description, content, origin)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (a.url, a.title, a.source, a.published_at,
a.description, a.content, a.origin))
if conn.execute("SELECT changes()").fetchone()[0]:
saved += 1
except sqlite3.Error:
pass
conn.commit()
return saved
def query_articles_by_keyword(
conn: sqlite3.Connection,
keyword: str,
days_back: int = 7,
limit: int = 50,
) -> list[dict]:
"""Search stored articles for a keyword in title or description."""
rows = conn.execute("""
SELECT url, title, source, published_at, description, origin
FROM articles
WHERE (title LIKE ? OR description LIKE ?)
AND published_at >= datetime('now', '-' || ? || ' days')
ORDER BY published_at DESC
LIMIT ?
""", (f"%{keyword}%", f"%{keyword}%", days_back, limit)).fetchall()
return [
{"url": r[0], "title": r[1], "source": r[2],
"published_at": r[3], "description": r[4], "origin": r[5]}
for r in rows
]
The Complete Pipeline
Tie everything together into a single monitoring function:
import json
import time
def run_news_monitor(
query: str,
newsapi_key: str,
proxy: str = None,
db_path: str = "news_monitor.db",
) -> list[dict]:
"""Run the full multi-source news monitoring pipeline."""
conn = init_news_db(db_path)
all_articles = []
errors = []
counts = {"newsapi": 0, "gdelt": 0}
# Source 1: NewsAPI
print(f"[NewsAPI] Searching for: {query}")
try:
newsapi_results = fetch_newsapi(query, newsapi_key)
all_articles.extend(newsapi_results)
counts["newsapi"] = len(newsapi_results)
print(f" Found {len(newsapi_results)} articles")
except Exception as e:
errors.append(f"NewsAPI: {e}")
print(f" NewsAPI error: {e}")
time.sleep(2)
# Source 2: GDELT
print(f"[GDELT] Searching for: {query}")
try:
gdelt_results = fetch_gdelt_articles(query)
all_articles.extend(gdelt_results)
counts["gdelt"] = len(gdelt_results)
print(f" Found {len(gdelt_results)} articles")
except Exception as e:
errors.append(f"GDELT: {e}")
print(f" GDELT error: {e}")
# Deduplicate
unique = deduplicate_articles(all_articles)
print(f"\nTotal: {len(all_articles)} raw → {len(unique)} after dedup")
# Save to DB
saved = save_articles(conn, unique)
print(f"New articles stored: {saved}")
# Log pipeline run
conn.execute(
"INSERT INTO pipeline_runs (query, newsapi_count, gdelt_count, total_unique, errors) VALUES (?,?,?,?,?)",
(query, counts["newsapi"], counts["gdelt"], len(unique), json.dumps(errors)),
)
conn.commit()
conn.close()
output = [asdict(a) for a in unique]
return output
# Usage
if __name__ == "__main__":
results = run_news_monitor(
query="artificial intelligence regulation",
newsapi_key="YOUR_NEWSAPI_KEY",
)
for r in results[:5]:
print(f" [{r['origin']}] {r['title'][:70]}")
print(f" {r['url']}")
Entity Extraction for Topic Clustering
Once you have articles stored, extract named entities to cluster by topic:
import re
from collections import Counter
def extract_entities_simple(text: str) -> dict:
"""Simple regex-based entity extraction without spaCy."""
# Organizations (patterns like "OpenAI", "Google LLC", "U.S. Senate")
orgs = re.findall(r'\b[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)*(?:\s+(?:LLC|Inc|Corp|Ltd|Group|Committee|Senate|Congress|Agency|Department))\b', text)
# People (Title + Name patterns)
people = re.findall(r'\b(?:President|Senator|CEO|Director|Minister)\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?\b', text)
# Locations (Country/City patterns)
locations = re.findall(r'\b(?:United States|European Union|China|Russia|UK|Germany|France|Japan|India|Brazil)\b', text)
return {
"organizations": list(set(orgs)),
"people": list(set(people)),
"locations": list(set(locations)),
}
def cluster_articles_by_entity(
conn: sqlite3.Connection,
entity: str,
field: str = "title",
) -> list[dict]:
"""Find all articles mentioning a specific entity."""
rows = conn.execute(
f"SELECT url, title, source, published_at FROM articles WHERE {field} LIKE ? ORDER BY published_at DESC LIMIT 100",
(f"%{entity}%",),
).fetchall()
return [{"url": r[0], "title": r[1], "source": r[2], "published_at": r[3]} for r in rows]
def top_sources_by_volume(
conn: sqlite3.Connection,
days_back: int = 30,
) -> list[dict]:
"""Rank news sources by article count in a time window."""
rows = conn.execute("""
SELECT source, COUNT(*) as count
FROM articles
WHERE published_at >= datetime('now', '-' || ? || ' days')
GROUP BY source
ORDER BY count DESC
LIMIT 20
""", (days_back,)).fetchall()
return [{"source": r[0], "count": r[1]} for r in rows]
Practical Tips
Schedule, don't poll. Run the pipeline on a cron schedule (every 6-12 hours) rather than polling continuously. NewsAPI's free tier limit and GDELT's rate limiting make continuous polling wasteful.
Use trafilatura for text extraction. The regex approach in the example works for demos, but trafilatura handles the full spectrum of news site HTML structures far better. Install it with pip install trafilatura and replace the paragraph extraction with trafilatura.extract(html).
Store incrementally. Use SQLite with a unique constraint on the normalized URL. This gives you deduplication for free across pipeline runs and makes historical queries trivial.
Monitor source health. Each API will go down eventually. Log which sources returned results on each run. If one source consistently fails, you'll know immediately instead of wondering why your coverage dropped.
Residential proxies for full-text fetch. When fetching complete article text from publisher sites (beyond the API excerpt), use ThorData rotating residential proxies to avoid IP blocks. News sites aggressively block datacenter ranges — residential IPs blend in with normal reader traffic.
The real value of multi-source monitoring isn't just coverage — it's reliability. When NewsAPI goes down (and it will), GDELT keeps feeding you. When GDELT's data is too noisy for a specific topic, NewsAPI gives you cleaner results. The combination is more resilient than any single source, and the SQLite history lets you answer questions like "when did coverage of this topic spike?" without re-fetching data you've already collected.
Adding RSS Feed Support
Major publishers expose RSS feeds that update in real time and require no authentication. Adding RSS to your pipeline gives you a fourth source with no rate limits:
import xml.etree.ElementTree as ET
from urllib.parse import urlparse
import hashlib
RSS_FEEDS = {
"Reuters": "https://feeds.reuters.com/reuters/topNews",
"BBC": "https://feeds.bbci.co.uk/news/rss.xml",
"AP News": "https://rsshub.app/apnews/topics/apf-topnews",
"The Verge": "https://www.theverge.com/rss/index.xml",
"TechCrunch": "https://techcrunch.com/feed/",
"Ars Technica": "https://feeds.arstechnica.com/arstechnica/index",
"Wired": "https://www.wired.com/feed/rss",
"Hacker News": "https://news.ycombinator.com/rss",
}
def fetch_rss_feed(feed_url: str, source_name: str) -> list[Article]:
"""Fetch and parse an RSS feed into Article objects."""
try:
resp = httpx.get(feed_url, timeout=15, follow_redirects=True)
resp.raise_for_status()
except Exception as e:
print(f" RSS error for {source_name}: {e}")
return []
articles = []
try:
root = ET.fromstring(resp.content)
# Handle both RSS 2.0 and Atom feeds
ns = {"atom": "http://www.w3.org/2005/Atom"}
# RSS 2.0 items
for item in root.findall(".//item"):
title = item.findtext("title", "").strip()
url = item.findtext("link", "").strip()
pub_date = item.findtext("pubDate", "").strip()
description = item.findtext("description", "").strip()
if not url:
guid = item.findtext("guid", "")
url = guid if guid.startswith("http") else ""
if url and title:
articles.append(Article(
title=title,
url=url,
source=source_name,
published_at=pub_date,
description=description[:500],
origin="rss",
))
# Atom feed entries
if not articles:
for entry in root.findall(".//{http://www.w3.org/2005/Atom}entry"):
title_el = entry.find("{http://www.w3.org/2005/Atom}title")
link_el = entry.find("{http://www.w3.org/2005/Atom}link")
published_el = entry.find("{http://www.w3.org/2005/Atom}published")
summary_el = entry.find("{http://www.w3.org/2005/Atom}summary")
url = link_el.get("href", "") if link_el is not None else ""
title = title_el.text if title_el is not None else ""
pub_date = published_el.text if published_el is not None else ""
description = summary_el.text if summary_el is not None else ""
if url and title:
articles.append(Article(
title=title.strip(),
url=url,
source=source_name,
published_at=pub_date,
description=(description or "")[:500],
origin="rss",
))
except ET.ParseError as e:
print(f" XML parse error for {source_name}: {e}")
return articles
def fetch_all_rss_feeds(feeds: dict = None) -> list[Article]:
"""Fetch all configured RSS feeds."""
feeds = feeds or RSS_FEEDS
all_articles = []
for source_name, feed_url in feeds.items():
articles = fetch_rss_feed(feed_url, source_name)
all_articles.extend(articles)
print(f" [{source_name}] {len(articles)} articles")
return all_articles
Keyword Scoring and Relevance Ranking
Raw article counts don't tell you what matters. Score articles by keyword relevance to surface the most useful content:
import re
from collections import Counter
# Keywords organized by importance tier
KEYWORD_WEIGHTS = {
# High-value terms (3 points each)
"high": ["acquisition", "merger", "IPO", "breach", "recall", "bankruptcy", "lawsuit", "indictment"],
# Medium-value terms (2 points each)
"medium": ["funding", "launch", "partnership", "regulation", "ban", "fine", "settlement"],
# Low-value terms (1 point each)
"low": ["report", "study", "analysis", "survey", "statement"],
}
def score_article(article: Article, topic_keywords: list) -> float:
"""Score an article's relevance to a set of topic keywords."""
text = f"{article.title} {article.description}".lower()
# Base score: topic keyword matches in title (2x) vs description (1x)
topic_score = 0
title_lower = article.title.lower()
desc_lower = article.description.lower()
for kw in topic_keywords:
kw_lower = kw.lower()
if kw_lower in title_lower:
topic_score += 2.0
elif kw_lower in desc_lower:
topic_score += 1.0
# Bonus for high-importance news terms
importance_score = 0
for tier, keywords in KEYWORD_WEIGHTS.items():
weight = {"high": 3, "medium": 2, "low": 1}[tier]
for kw in keywords:
if kw in text:
importance_score += weight
# Recency bonus (decay over time — newer articles score higher)
recency_bonus = 0.0
try:
from email.utils import parsedate_to_datetime
pub_dt = parsedate_to_datetime(article.published_at)
hours_ago = (datetime.now(pub_dt.tzinfo) - pub_dt).total_seconds() / 3600
recency_bonus = max(0, 5 - hours_ago / 24) # Full bonus for <24h old
except Exception:
pass
return topic_score + importance_score * 0.5 + recency_bonus
def rank_articles(articles: list[Article], topic_keywords: list) -> list[tuple]:
"""Rank articles by relevance score."""
scored = [(article, score_article(article, topic_keywords)) for article in articles]
scored.sort(key=lambda x: x[1], reverse=True)
return scored
# Example: rank news about AI regulation
ai_keywords = ["artificial intelligence", "AI", "machine learning", "regulation", "ChatGPT", "LLM"]
all_articles = fetch_all_rss_feeds()
ranked = rank_articles(all_articles, ai_keywords)
print("Top 10 most relevant articles:")
for article, score in ranked[:10]:
print(f" [{score:.1f}] {article.title[:60]}")
print(f" {article.source} — {article.url[:60]}")
Error Handling and Source Health Monitoring
Production pipelines need to gracefully handle source failures without silently dropping coverage:
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class SourceHealth:
name: str
last_success: Optional[str] = None
consecutive_failures: int = 0
total_fetches: int = 0
total_articles: int = 0
errors: list = field(default_factory=list)
def record_success(self, article_count: int):
self.last_success = datetime.utcnow().isoformat()
self.consecutive_failures = 0
self.total_fetches += 1
self.total_articles += article_count
def record_failure(self, error: str):
self.consecutive_failures += 1
self.total_fetches += 1
self.errors.append({"time": datetime.utcnow().isoformat(), "error": error})
if len(self.errors) > 10:
self.errors = self.errors[-10:] # Keep last 10 errors
@property
def is_healthy(self) -> bool:
return self.consecutive_failures < 3
@property
def success_rate(self) -> float:
if self.total_fetches == 0:
return 0.0
return (self.total_fetches - len([e for e in self.errors if e])) / self.total_fetches
def run_monitored_pipeline(
query: str,
newsapi_key: str,
rss_feeds: dict = None,
proxy: str = None,
db_path: str = "news_monitor.db",
) -> dict:
"""Run pipeline with full source health tracking."""
conn = init_news_db(db_path)
all_articles = []
source_health = {}
# NewsAPI
health = SourceHealth("newsapi")
try:
results = fetch_newsapi(query, newsapi_key)
health.record_success(len(results))
all_articles.extend(results)
except Exception as e:
health.record_failure(str(e))
source_health["newsapi"] = health
time.sleep(2)
# GDELT
health = SourceHealth("gdelt")
try:
results = fetch_gdelt_articles(query)
health.record_success(len(results))
all_articles.extend(results)
except Exception as e:
health.record_failure(str(e))
source_health["gdelt"] = health
# RSS feeds
if rss_feeds:
for source_name, feed_url in rss_feeds.items():
health = SourceHealth(source_name)
try:
results = fetch_rss_feed(feed_url, source_name)
health.record_success(len(results))
all_articles.extend(results)
except Exception as e:
health.record_failure(str(e))
source_health[source_name] = health
# Dedup and store
unique = deduplicate_articles(all_articles)
saved = save_articles(conn, unique)
# Report
print(f"\nPipeline run complete:")
print(f" Raw articles: {len(all_articles)}")
print(f" After dedup: {len(unique)}")
print(f" New to DB: {saved}")
print(f"\nSource health:")
for name, h in source_health.items():
status = "OK" if h.is_healthy else "DEGRADED"
print(f" {name}: {status} (failures: {h.consecutive_failures})")
conn.close()
return {
"total_raw": len(all_articles),
"total_unique": len(unique),
"saved": saved,
"source_health": {k: {"healthy": v.is_healthy, "failures": v.consecutive_failures} for k, v in source_health.items()},
}