← Back to blog

Building a Multi-Source News Monitor: NewsAPI, GDELT, and CommonCrawl (2026)

Building a Multi-Source News Monitor: NewsAPI, GDELT, and CommonCrawl (2026)

Relying on a single news source for monitoring is fragile. APIs go down, rate limits hit at the worst times, and every provider has blind spots in their coverage. The practical solution is to pull from multiple sources and deduplicate. This guide builds a news monitoring pipeline that combines three complementary data sources: NewsAPI for real-time headlines, GDELT for global event data, and CommonCrawl for historical article text.

By the end, you'll have a system that catches stories across sources, deduplicates them, and outputs a clean feed you can filter by topic, region, or time window.

Architecture Overview

Each source has different strengths:

The pipeline pulls from all three, normalizes the output into a common schema, and deduplicates by URL and title similarity.

Source 1: NewsAPI

NewsAPI is the quickest to get running. Sign up at newsapi.org for a free key.

import httpx
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict, field

@dataclass
class Article:
    title: str
    url: str
    source: str
    published_at: str
    description: str = ""
    content: str = ""
    origin: str = ""  # which pipeline source found this

def fetch_newsapi(
    query: str,
    api_key: str,
    days_back: int = 7,
    page_size: int = 100,
) -> list[Article]:
    """Fetch articles from NewsAPI's /everything endpoint."""
    from_date = (datetime.utcnow() - timedelta(days=days_back)).strftime("%Y-%m-%d")

    resp = httpx.get(
        "https://newsapi.org/v2/everything",
        params={
            "q": query,
            "from": from_date,
            "sortBy": "publishedAt",
            "pageSize": page_size,
            "language": "en",
            "apiKey": api_key,
        },
        timeout=20,
    )
    resp.raise_for_status()
    data = resp.json()

    articles = []
    for item in data.get("articles", []):
        articles.append(Article(
            title=item.get("title", ""),
            url=item.get("url", ""),
            source=item.get("source", {}).get("name", ""),
            published_at=item.get("publishedAt", ""),
            description=item.get("description", ""),
            content=item.get("content", ""),
            origin="newsapi",
        ))

    return articles

Limitations to know about. The free tier truncates article content at 200 characters. You only get articles from the last 30 days. And the 100 requests/day limit means you need to batch your queries carefully — don't poll every minute.

Source 2: GDELT Project

GDELT is a different beast entirely. It processes news from virtually every country and encodes events using the CAMEO taxonomy — a structured system that classifies who did what to whom. The DOC API lets you search for articles by keyword:

from urllib.parse import quote

def fetch_gdelt_articles(
    query: str,
    mode: str = "artlist",
    max_records: int = 75,
    timespan: str = "7d",
) -> list[Article]:
    """Search GDELT DOC API for news articles."""
    encoded_query = quote(query)
    url = (
        f"https://api.gdeltproject.org/api/v2/doc/doc"
        f"?query={encoded_query}"
        f"&mode={mode}"
        f"&maxrecords={max_records}"
        f"&timespan={timespan}"
        f"&format=json"
        f"&sort=datedesc"
    )

    resp = httpx.get(url, timeout=30)
    resp.raise_for_status()
    data = resp.json()

    articles = []
    for item in data.get("articles", []):
        articles.append(Article(
            title=item.get("title", ""),
            url=item.get("url", ""),
            source=item.get("domain", ""),
            published_at=item.get("seendate", ""),
            description="",
            content="",
            origin="gdelt",
        ))

    return articles

GDELT also offers a GKG (Global Knowledge Graph) endpoint that extracts themes, people, organizations, and locations from each article. That's useful if you're building topic-level monitoring:

def fetch_gdelt_gkg(query: str, timespan: str = "7d") -> list[dict]:
    """Fetch GDELT Global Knowledge Graph data for richer entity extraction."""
    encoded_query = quote(query)
    url = (
        f"https://api.gdeltproject.org/api/v2/doc/doc"
        f"?query={encoded_query}"
        f"&mode=artlist"
        f"&maxrecords=50"
        f"&timespan={timespan}"
        f"&format=json"
        f"&sourcelang=english"
    )
    resp = httpx.get(url, timeout=30)
    resp.raise_for_status()
    return resp.json().get("articles", [])

No API key needed. GDELT is fully open. The catch is that it rate-limits aggressively — roughly 1 request per 5 seconds. Build in sleeps.

Source 3: CommonCrawl Index

CommonCrawl archives billions of web pages. You won't use it for real-time monitoring, but it's powerful for backfilling article text that other sources only give you titles for:

def search_commoncrawl(
    domain: str,
    index: str = "CC-MAIN-2026-22",
    limit: int = 50,
) -> list[dict]:
    """Search CommonCrawl index for pages from a specific domain."""
    url = f"https://index.commoncrawl.org/{index}-index"
    params = {
        "url": f"{domain}/*",
        "output": "json",
        "limit": limit,
    }

    resp = httpx.get(url, params=params, timeout=30)
    resp.raise_for_status()

    results = []
    for line in resp.text.strip().split("\n"):
        if line:
            import json
            record = json.loads(line)
            results.append({
                "url": record.get("url"),
                "timestamp": record.get("timestamp"),
                "status": record.get("status"),
                "mime": record.get("mime"),
                "length": record.get("length"),
            })

    return results

def fetch_commoncrawl_content(warc_info: dict) -> str:
    """Fetch actual page content from CommonCrawl WARC files."""
    offset = int(warc_info.get("offset", 0))
    length = int(warc_info.get("length", 0))
    filename = warc_info.get("filename", "")

    if not filename or not length:
        return ""

    url = f"https://data.commoncrawl.org/{filename}"
    headers = {"Range": f"bytes={offset}-{offset + length - 1}"}

    resp = httpx.get(url, headers=headers, timeout=30)
    resp.raise_for_status()

    import gzip
    try:
        raw = gzip.decompress(resp.content)
        # WARC records have headers then content separated by double newline
        parts = raw.split(b"\r\n\r\n", 2)
        if len(parts) >= 3:
            return parts[2].decode("utf-8", errors="replace")
    except Exception:
        pass
    return ""

Deduplication

Multiple sources will find the same articles. Deduplicate by URL first, then by title similarity for cases where different sources link to different URLs for the same story:

from difflib import SequenceMatcher
from urllib.parse import urlparse

def normalize_url(url: str) -> str:
    """Strip tracking parameters and fragments for comparison."""
    parsed = urlparse(url)
    # Remove common tracking params
    clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
    return clean.rstrip("/").lower()

def deduplicate_articles(articles: list[Article], similarity_threshold: float = 0.85) -> list[Article]:
    """Remove duplicate articles by URL and title similarity."""
    seen_urls = set()
    seen_titles = []
    unique = []

    for article in articles:
        norm_url = normalize_url(article.url)
        if norm_url in seen_urls:
            continue

        # Check title similarity against already-seen titles
        is_dup = False
        for existing_title in seen_titles:
            ratio = SequenceMatcher(None, article.title.lower(), existing_title).ratio()
            if ratio >= similarity_threshold:
                is_dup = True
                break

        if not is_dup:
            seen_urls.add(norm_url)
            seen_titles.append(article.title.lower())
            unique.append(article)

    return unique

Handling Anti-Bot Measures

When you follow article URLs to fetch full text, you hit each publisher's own bot defenses. This is where the pipeline gets harder.

Cloudflare and Akamai protect most major news sites. Direct httpx.get() calls get blocked on roughly 40% of URLs in a typical news corpus. You have two options: use a headless browser for the blocked ones, or accept partial content from the API responses.

Rate limiting across domains. Even though you're hitting different publishers, your outbound IP is the constant. News sites share threat intelligence — if your IP gets flagged on one Cloudflare-protected site, others may block you preemptively.

Residential proxies for full-text fetching. If you need the complete article text (not just the truncated API excerpt), rotating residential IPs are the difference between a 40% success rate and a 90%+ one. ThorData's proxy service handles this well — their residential pool covers enough geographic diversity that you're not hammering any single exit node, and the rotation happens automatically per request.

def fetch_article_text(url: str, proxy: str = None) -> str:
    """Fetch full article text, handling common anti-bot responses."""
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                       "AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml",
        "Accept-Language": "en-US,en;q=0.9",
    }

    transport = httpx.HTTPTransport(proxy=proxy) if proxy else None

    with httpx.Client(transport=transport, headers=headers, timeout=20, follow_redirects=True) as client:
        resp = client.get(url)

        if resp.status_code == 403:
            return ""  # blocked by bot detection
        resp.raise_for_status()

        # Basic content extraction (use trafilatura for production)
        from html.parser import HTMLParser
        # Quick paragraph extraction
        import re
        paragraphs = re.findall(r"<p[^>]*>(.*?)</p>", resp.text, re.DOTALL)
        text = "\n".join(re.sub(r"<[^>]+>", "", p) for p in paragraphs)
        return text[:5000]

Persistent Storage in SQLite

For production monitoring, store articles to SQLite so you can query across runs:

import sqlite3
import json
from datetime import datetime

def init_news_db(db_path: str = "news_monitor.db") -> sqlite3.Connection:
    """Initialize SQLite database for news storage."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS articles (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            url TEXT UNIQUE,
            title TEXT,
            source TEXT,
            published_at TEXT,
            description TEXT,
            content TEXT,
            origin TEXT,
            full_text TEXT,
            fetched_at TEXT DEFAULT (datetime('now'))
        );

        CREATE TABLE IF NOT EXISTS pipeline_runs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            query TEXT,
            ran_at TEXT DEFAULT (datetime('now')),
            newsapi_count INTEGER DEFAULT 0,
            gdelt_count INTEGER DEFAULT 0,
            total_unique INTEGER DEFAULT 0,
            errors TEXT DEFAULT '[]'
        );

        CREATE INDEX IF NOT EXISTS idx_articles_source ON articles(source);
        CREATE INDEX IF NOT EXISTS idx_articles_pub ON articles(published_at DESC);
        CREATE INDEX IF NOT EXISTS idx_articles_origin ON articles(origin);
    """)
    conn.commit()
    return conn


def save_articles(conn: sqlite3.Connection, articles: list[Article]) -> int:
    """Batch insert articles, skipping duplicates by URL."""
    saved = 0
    for a in articles:
        try:
            conn.execute("""
                INSERT OR IGNORE INTO articles
                    (url, title, source, published_at, description, content, origin)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (a.url, a.title, a.source, a.published_at,
                  a.description, a.content, a.origin))
            if conn.execute("SELECT changes()").fetchone()[0]:
                saved += 1
        except sqlite3.Error:
            pass
    conn.commit()
    return saved


def query_articles_by_keyword(
    conn: sqlite3.Connection,
    keyword: str,
    days_back: int = 7,
    limit: int = 50,
) -> list[dict]:
    """Search stored articles for a keyword in title or description."""
    rows = conn.execute("""
        SELECT url, title, source, published_at, description, origin
        FROM articles
        WHERE (title LIKE ? OR description LIKE ?)
          AND published_at >= datetime('now', '-' || ? || ' days')
        ORDER BY published_at DESC
        LIMIT ?
    """, (f"%{keyword}%", f"%{keyword}%", days_back, limit)).fetchall()

    return [
        {"url": r[0], "title": r[1], "source": r[2],
         "published_at": r[3], "description": r[4], "origin": r[5]}
        for r in rows
    ]

The Complete Pipeline

Tie everything together into a single monitoring function:

import json
import time

def run_news_monitor(
    query: str,
    newsapi_key: str,
    proxy: str = None,
    db_path: str = "news_monitor.db",
) -> list[dict]:
    """Run the full multi-source news monitoring pipeline."""
    conn = init_news_db(db_path)
    all_articles = []
    errors = []
    counts = {"newsapi": 0, "gdelt": 0}

    # Source 1: NewsAPI
    print(f"[NewsAPI] Searching for: {query}")
    try:
        newsapi_results = fetch_newsapi(query, newsapi_key)
        all_articles.extend(newsapi_results)
        counts["newsapi"] = len(newsapi_results)
        print(f"  Found {len(newsapi_results)} articles")
    except Exception as e:
        errors.append(f"NewsAPI: {e}")
        print(f"  NewsAPI error: {e}")

    time.sleep(2)

    # Source 2: GDELT
    print(f"[GDELT] Searching for: {query}")
    try:
        gdelt_results = fetch_gdelt_articles(query)
        all_articles.extend(gdelt_results)
        counts["gdelt"] = len(gdelt_results)
        print(f"  Found {len(gdelt_results)} articles")
    except Exception as e:
        errors.append(f"GDELT: {e}")
        print(f"  GDELT error: {e}")

    # Deduplicate
    unique = deduplicate_articles(all_articles)
    print(f"\nTotal: {len(all_articles)} raw → {len(unique)} after dedup")

    # Save to DB
    saved = save_articles(conn, unique)
    print(f"New articles stored: {saved}")

    # Log pipeline run
    conn.execute(
        "INSERT INTO pipeline_runs (query, newsapi_count, gdelt_count, total_unique, errors) VALUES (?,?,?,?,?)",
        (query, counts["newsapi"], counts["gdelt"], len(unique), json.dumps(errors)),
    )
    conn.commit()
    conn.close()

    output = [asdict(a) for a in unique]
    return output


# Usage
if __name__ == "__main__":
    results = run_news_monitor(
        query="artificial intelligence regulation",
        newsapi_key="YOUR_NEWSAPI_KEY",
    )
    for r in results[:5]:
        print(f"  [{r['origin']}] {r['title'][:70]}")
        print(f"    {r['url']}")

Entity Extraction for Topic Clustering

Once you have articles stored, extract named entities to cluster by topic:

import re
from collections import Counter

def extract_entities_simple(text: str) -> dict:
    """Simple regex-based entity extraction without spaCy."""
    # Organizations (patterns like "OpenAI", "Google LLC", "U.S. Senate")
    orgs = re.findall(r'\b[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+)*(?:\s+(?:LLC|Inc|Corp|Ltd|Group|Committee|Senate|Congress|Agency|Department))\b', text)

    # People (Title + Name patterns)
    people = re.findall(r'\b(?:President|Senator|CEO|Director|Minister)\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?\b', text)

    # Locations (Country/City patterns)
    locations = re.findall(r'\b(?:United States|European Union|China|Russia|UK|Germany|France|Japan|India|Brazil)\b', text)

    return {
        "organizations": list(set(orgs)),
        "people": list(set(people)),
        "locations": list(set(locations)),
    }


def cluster_articles_by_entity(
    conn: sqlite3.Connection,
    entity: str,
    field: str = "title",
) -> list[dict]:
    """Find all articles mentioning a specific entity."""
    rows = conn.execute(
        f"SELECT url, title, source, published_at FROM articles WHERE {field} LIKE ? ORDER BY published_at DESC LIMIT 100",
        (f"%{entity}%",),
    ).fetchall()
    return [{"url": r[0], "title": r[1], "source": r[2], "published_at": r[3]} for r in rows]


def top_sources_by_volume(
    conn: sqlite3.Connection,
    days_back: int = 30,
) -> list[dict]:
    """Rank news sources by article count in a time window."""
    rows = conn.execute("""
        SELECT source, COUNT(*) as count
        FROM articles
        WHERE published_at >= datetime('now', '-' || ? || ' days')
        GROUP BY source
        ORDER BY count DESC
        LIMIT 20
    """, (days_back,)).fetchall()
    return [{"source": r[0], "count": r[1]} for r in rows]

Practical Tips

Schedule, don't poll. Run the pipeline on a cron schedule (every 6-12 hours) rather than polling continuously. NewsAPI's free tier limit and GDELT's rate limiting make continuous polling wasteful.

Use trafilatura for text extraction. The regex approach in the example works for demos, but trafilatura handles the full spectrum of news site HTML structures far better. Install it with pip install trafilatura and replace the paragraph extraction with trafilatura.extract(html).

Store incrementally. Use SQLite with a unique constraint on the normalized URL. This gives you deduplication for free across pipeline runs and makes historical queries trivial.

Monitor source health. Each API will go down eventually. Log which sources returned results on each run. If one source consistently fails, you'll know immediately instead of wondering why your coverage dropped.

Residential proxies for full-text fetch. When fetching complete article text from publisher sites (beyond the API excerpt), use ThorData rotating residential proxies to avoid IP blocks. News sites aggressively block datacenter ranges — residential IPs blend in with normal reader traffic.

The real value of multi-source monitoring isn't just coverage — it's reliability. When NewsAPI goes down (and it will), GDELT keeps feeding you. When GDELT's data is too noisy for a specific topic, NewsAPI gives you cleaner results. The combination is more resilient than any single source, and the SQLite history lets you answer questions like "when did coverage of this topic spike?" without re-fetching data you've already collected.

Adding RSS Feed Support

Major publishers expose RSS feeds that update in real time and require no authentication. Adding RSS to your pipeline gives you a fourth source with no rate limits:

import xml.etree.ElementTree as ET
from urllib.parse import urlparse
import hashlib

RSS_FEEDS = {
    "Reuters": "https://feeds.reuters.com/reuters/topNews",
    "BBC": "https://feeds.bbci.co.uk/news/rss.xml",
    "AP News": "https://rsshub.app/apnews/topics/apf-topnews",
    "The Verge": "https://www.theverge.com/rss/index.xml",
    "TechCrunch": "https://techcrunch.com/feed/",
    "Ars Technica": "https://feeds.arstechnica.com/arstechnica/index",
    "Wired": "https://www.wired.com/feed/rss",
    "Hacker News": "https://news.ycombinator.com/rss",
}


def fetch_rss_feed(feed_url: str, source_name: str) -> list[Article]:
    """Fetch and parse an RSS feed into Article objects."""
    try:
        resp = httpx.get(feed_url, timeout=15, follow_redirects=True)
        resp.raise_for_status()
    except Exception as e:
        print(f"  RSS error for {source_name}: {e}")
        return []

    articles = []
    try:
        root = ET.fromstring(resp.content)
        # Handle both RSS 2.0 and Atom feeds
        ns = {"atom": "http://www.w3.org/2005/Atom"}

        # RSS 2.0 items
        for item in root.findall(".//item"):
            title = item.findtext("title", "").strip()
            url = item.findtext("link", "").strip()
            pub_date = item.findtext("pubDate", "").strip()
            description = item.findtext("description", "").strip()

            if not url:
                guid = item.findtext("guid", "")
                url = guid if guid.startswith("http") else ""

            if url and title:
                articles.append(Article(
                    title=title,
                    url=url,
                    source=source_name,
                    published_at=pub_date,
                    description=description[:500],
                    origin="rss",
                ))

        # Atom feed entries
        if not articles:
            for entry in root.findall(".//{http://www.w3.org/2005/Atom}entry"):
                title_el = entry.find("{http://www.w3.org/2005/Atom}title")
                link_el = entry.find("{http://www.w3.org/2005/Atom}link")
                published_el = entry.find("{http://www.w3.org/2005/Atom}published")
                summary_el = entry.find("{http://www.w3.org/2005/Atom}summary")

                url = link_el.get("href", "") if link_el is not None else ""
                title = title_el.text if title_el is not None else ""
                pub_date = published_el.text if published_el is not None else ""
                description = summary_el.text if summary_el is not None else ""

                if url and title:
                    articles.append(Article(
                        title=title.strip(),
                        url=url,
                        source=source_name,
                        published_at=pub_date,
                        description=(description or "")[:500],
                        origin="rss",
                    ))

    except ET.ParseError as e:
        print(f"  XML parse error for {source_name}: {e}")

    return articles


def fetch_all_rss_feeds(feeds: dict = None) -> list[Article]:
    """Fetch all configured RSS feeds."""
    feeds = feeds or RSS_FEEDS
    all_articles = []

    for source_name, feed_url in feeds.items():
        articles = fetch_rss_feed(feed_url, source_name)
        all_articles.extend(articles)
        print(f"  [{source_name}] {len(articles)} articles")

    return all_articles

Keyword Scoring and Relevance Ranking

Raw article counts don't tell you what matters. Score articles by keyword relevance to surface the most useful content:

import re
from collections import Counter

# Keywords organized by importance tier
KEYWORD_WEIGHTS = {
    # High-value terms (3 points each)
    "high": ["acquisition", "merger", "IPO", "breach", "recall", "bankruptcy", "lawsuit", "indictment"],
    # Medium-value terms (2 points each)
    "medium": ["funding", "launch", "partnership", "regulation", "ban", "fine", "settlement"],
    # Low-value terms (1 point each)
    "low": ["report", "study", "analysis", "survey", "statement"],
}


def score_article(article: Article, topic_keywords: list) -> float:
    """Score an article's relevance to a set of topic keywords."""
    text = f"{article.title} {article.description}".lower()

    # Base score: topic keyword matches in title (2x) vs description (1x)
    topic_score = 0
    title_lower = article.title.lower()
    desc_lower = article.description.lower()

    for kw in topic_keywords:
        kw_lower = kw.lower()
        if kw_lower in title_lower:
            topic_score += 2.0
        elif kw_lower in desc_lower:
            topic_score += 1.0

    # Bonus for high-importance news terms
    importance_score = 0
    for tier, keywords in KEYWORD_WEIGHTS.items():
        weight = {"high": 3, "medium": 2, "low": 1}[tier]
        for kw in keywords:
            if kw in text:
                importance_score += weight

    # Recency bonus (decay over time — newer articles score higher)
    recency_bonus = 0.0
    try:
        from email.utils import parsedate_to_datetime
        pub_dt = parsedate_to_datetime(article.published_at)
        hours_ago = (datetime.now(pub_dt.tzinfo) - pub_dt).total_seconds() / 3600
        recency_bonus = max(0, 5 - hours_ago / 24)  # Full bonus for <24h old
    except Exception:
        pass

    return topic_score + importance_score * 0.5 + recency_bonus


def rank_articles(articles: list[Article], topic_keywords: list) -> list[tuple]:
    """Rank articles by relevance score."""
    scored = [(article, score_article(article, topic_keywords)) for article in articles]
    scored.sort(key=lambda x: x[1], reverse=True)
    return scored


# Example: rank news about AI regulation
ai_keywords = ["artificial intelligence", "AI", "machine learning", "regulation", "ChatGPT", "LLM"]
all_articles = fetch_all_rss_feeds()
ranked = rank_articles(all_articles, ai_keywords)

print("Top 10 most relevant articles:")
for article, score in ranked[:10]:
    print(f"  [{score:.1f}] {article.title[:60]}")
    print(f"    {article.source} — {article.url[:60]}")

Error Handling and Source Health Monitoring

Production pipelines need to gracefully handle source failures without silently dropping coverage:

import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class SourceHealth:
    name: str
    last_success: Optional[str] = None
    consecutive_failures: int = 0
    total_fetches: int = 0
    total_articles: int = 0
    errors: list = field(default_factory=list)

    def record_success(self, article_count: int):
        self.last_success = datetime.utcnow().isoformat()
        self.consecutive_failures = 0
        self.total_fetches += 1
        self.total_articles += article_count

    def record_failure(self, error: str):
        self.consecutive_failures += 1
        self.total_fetches += 1
        self.errors.append({"time": datetime.utcnow().isoformat(), "error": error})
        if len(self.errors) > 10:
            self.errors = self.errors[-10:]  # Keep last 10 errors

    @property
    def is_healthy(self) -> bool:
        return self.consecutive_failures < 3

    @property
    def success_rate(self) -> float:
        if self.total_fetches == 0:
            return 0.0
        return (self.total_fetches - len([e for e in self.errors if e])) / self.total_fetches


def run_monitored_pipeline(
    query: str,
    newsapi_key: str,
    rss_feeds: dict = None,
    proxy: str = None,
    db_path: str = "news_monitor.db",
) -> dict:
    """Run pipeline with full source health tracking."""
    conn = init_news_db(db_path)
    all_articles = []
    source_health = {}

    # NewsAPI
    health = SourceHealth("newsapi")
    try:
        results = fetch_newsapi(query, newsapi_key)
        health.record_success(len(results))
        all_articles.extend(results)
    except Exception as e:
        health.record_failure(str(e))
    source_health["newsapi"] = health

    time.sleep(2)

    # GDELT
    health = SourceHealth("gdelt")
    try:
        results = fetch_gdelt_articles(query)
        health.record_success(len(results))
        all_articles.extend(results)
    except Exception as e:
        health.record_failure(str(e))
    source_health["gdelt"] = health

    # RSS feeds
    if rss_feeds:
        for source_name, feed_url in rss_feeds.items():
            health = SourceHealth(source_name)
            try:
                results = fetch_rss_feed(feed_url, source_name)
                health.record_success(len(results))
                all_articles.extend(results)
            except Exception as e:
                health.record_failure(str(e))
            source_health[source_name] = health

    # Dedup and store
    unique = deduplicate_articles(all_articles)
    saved = save_articles(conn, unique)

    # Report
    print(f"\nPipeline run complete:")
    print(f"  Raw articles: {len(all_articles)}")
    print(f"  After dedup: {len(unique)}")
    print(f"  New to DB: {saved}")
    print(f"\nSource health:")
    for name, h in source_health.items():
        status = "OK" if h.is_healthy else "DEGRADED"
        print(f"  {name}: {status} (failures: {h.consecutive_failures})")

    conn.close()
    return {
        "total_raw": len(all_articles),
        "total_unique": len(unique),
        "saved": saved,
        "source_health": {k: {"healthy": v.is_healthy, "failures": v.consecutive_failures} for k, v in source_health.items()},
    }