← Back to blog

How to Scrape News Articles via RSS in 2026: Full-Text Extraction at Scale

How to Scrape News Articles via RSS in 2026: Full-Text Extraction at Scale

News data powers sentiment analysis, media monitoring, financial signal extraction, event detection, and training datasets. The tricky part isn't finding articles — RSS feeds publish them constantly. The tricky part is getting the full text without hitting paywalls, Cloudflare challenges, consent walls, or geo-restrictions on every domain you touch.

This guide covers the full stack: feed discovery, full-text extraction, paywall workarounds, bulk historical data sources, proxy integration with ThorData, and a production SQLite pipeline you can run on a schedule.

Why RSS Is Still the Best Entry Point in 2026

Despite the proliferation of APIs, push notifications, and social media feeds, RSS remains the most reliable starting point for news collection. Here's why:

Publisher-maintained — RSS feeds are maintained by the publisher's CMS. They're more stable than scraping front pages, which change layout constantly.

Machine-readable by design — Feed endpoints rarely have Cloudflare challenges because they're explicitly meant to be machine-read. Publishers don't want to protect their RSS feeds — they want you to syndicate their content.

Structured metadata — Title, URL, publish date, author, and often a summary come pre-parsed. You get structured data without HTML parsing.

Push semantics — Poll a feed every 15 minutes and you get new articles as they publish, without crawling the homepage.

Legal clarity — RSS is an explicitly public syndication mechanism. Publishers who provide RSS feeds are explicitly inviting machine consumption of their metadata.

The limitation: most feeds include only a summary or the first few paragraphs of the article. Getting the full text requires a second hop to the article URL — but the feed gives you a clean target list with timestamps, which is far better than crawling.

Feed Discovery

If you don't know a site's feed URL, there are reliable patterns to check:

import httpx
from bs4 import BeautifulSoup
from typing import Optional

COMMON_FEED_PATHS = [
    "/feed",
    "/rss",
    "/feed.xml",
    "/atom.xml",
    "/rss.xml",
    "/feeds/posts/default",
    "/index.xml",
    "/blog/feed",
    "/news/feed",
    "/feed/news",
    "/syndication.xml",
]

def discover_feeds(base_url: str, proxy: Optional[str] = None) -> list[str]:
    """
    Discover RSS/Atom feed URLs for a website.

    First looks for <link rel="alternate"> tags in the HTML head,
    then tries common feed paths.
    """
    base_url = base_url.rstrip("/")
    client_kwargs = {
        "timeout": 15,
        "follow_redirects": True,
        "headers": {"User-Agent": "Mozilla/5.0 (compatible; FeedFinder/1.0)"},
    }
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    found_feeds = []

    # First: check HTML head for feed links (most reliable)
    try:
        with httpx.Client(**client_kwargs) as client:
            resp = client.get(base_url)
        if resp.status_code == 200:
            soup = BeautifulSoup(resp.text, "html.parser")
            for link in soup.find_all("link", rel=True):
                rel = link.get("rel", [])
                if isinstance(rel, list):
                    rel = " ".join(rel)
                link_type = link.get("type", "")
                if "alternate" in rel and ("rss" in link_type or "atom" in link_type):
                    href = link.get("href", "")
                    if href:
                        if href.startswith("/"):
                            href = base_url + href
                        found_feeds.append(href)
    except Exception:
        pass

    # Second: try common paths
    if not found_feeds:
        with httpx.Client(**client_kwargs) as client:
            for path in COMMON_FEED_PATHS:
                try:
                    resp = client.head(f"{base_url}{path}")
                    if resp.status_code == 200:
                        content_type = resp.headers.get("content-type", "")
                        if "xml" in content_type or "rss" in content_type or "atom" in content_type:
                            found_feeds.append(f"{base_url}{path}")
                except Exception:
                    continue

    return list(set(found_feeds))


# Example
feeds = discover_feeds("https://techcrunch.com")
print(f"Found feeds: {feeds}")

Major News RSS Feeds

Most major news publishers maintain reliable RSS feeds:

MAJOR_NEWS_FEEDS = {
    # Wire services
    "reuters_top": "https://feeds.reuters.com/reuters/topNews",
    "reuters_tech": "https://feeds.reuters.com/reuters/technologyNews",
    "reuters_business": "https://feeds.reuters.com/reuters/businessNews",
    "ap_news": "https://feeds.apnews.com/rss/apf-topnews",

    # Newspapers
    "nytimes_home": "https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml",
    "nytimes_tech": "https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml",
    "guardian_world": "https://www.theguardian.com/world/rss",
    "guardian_tech": "https://www.theguardian.com/uk/technology/rss",
    "bbc_news": "https://feeds.bbci.co.uk/news/rss.xml",
    "bbc_tech": "https://feeds.bbci.co.uk/news/technology/rss.xml",
    "wsj_world": "https://feeds.a.dj.com/rss/RSSWorldNews.xml",

    # Tech-focused
    "techcrunch": "https://techcrunch.com/feed/",
    "ars_technica": "https://feeds.arstechnica.com/arstechnica/index",
    "wired": "https://www.wired.com/feed/rss",
    "verge": "https://www.theverge.com/rss/index.xml",
    "hacker_news": "https://hnrss.org/frontpage",

    # Finance
    "ft_news": "https://www.ft.com/?format=rss",
    "bloomberg_tech": "https://feeds.bloomberg.com/technology/news.rss",
    "cnbc_top": "https://www.cnbc.com/id/100003114/device/rss/rss.html",
}

Parsing RSS Feeds with feedparser

feedparser handles RSS 0.9x, RSS 1.0, RSS 2.0, Atom, and several edge cases automatically:

import feedparser
import time
import httpx
from typing import Optional

def fetch_feed(
    url: str,
    proxy: Optional[str] = None,
    etag: Optional[str] = None,
    modified: Optional[str] = None,
) -> dict:
    """
    Parse an RSS or Atom feed with conditional GET support.

    Uses etag and last-modified headers to avoid re-fetching unchanged feeds.
    Returns {'articles': [...], 'etag': str, 'modified': str, 'updated': bool}
    """
    # feedparser supports proxy via urllib setup
    kwargs = {}
    if proxy:
        import urllib.request
        proxy_handler = urllib.request.ProxyHandler({
            "http": proxy,
            "https": proxy,
        })
        kwargs["handlers"] = [urllib.request.build_opener(proxy_handler)]

    if etag:
        kwargs["etag"] = etag
    if modified:
        kwargs["modified"] = modified

    feed = feedparser.parse(url, **kwargs)

    # 304 Not Modified — feed hasn't changed
    if feed.get("status") == 304:
        return {"articles": [], "etag": etag, "modified": modified, "updated": False}

    articles = []
    for entry in feed.entries:
        articles.append({
            "title": entry.get("title", ""),
            "url": entry.get("link", ""),
            "published": entry.get("published", ""),
            "published_parsed": entry.get("published_parsed"),
            "summary": entry.get("summary", "")[:500],
            "author": entry.get("author", ""),
            "tags": [t.get("term", "") for t in entry.get("tags", [])],
            "media_content": [
                m.get("url") for m in entry.get("media_content", [])
                if m.get("url")
            ],
        })

    return {
        "articles": articles,
        "feed_title": feed.feed.get("title", ""),
        "feed_link": feed.feed.get("link", ""),
        "etag": feed.get("etag"),
        "modified": feed.get("modified"),
        "updated": True,
    }


def poll_feeds(
    feed_urls: list[str],
    state: dict = None,
) -> tuple[list[dict], dict]:
    """
    Poll multiple feeds, using etag/modified headers to skip unchanged feeds.

    state: dict mapping url -> {etag, modified} from previous poll
    Returns (new_articles, updated_state)
    """
    if state is None:
        state = {}

    all_articles = []
    new_state = {}

    for url in feed_urls:
        prev = state.get(url, {})
        result = fetch_feed(
            url,
            etag=prev.get("etag"),
            modified=prev.get("modified"),
        )

        if result.get("updated"):
            for article in result["articles"]:
                article["source_feed"] = url
                article["source_title"] = result.get("feed_title", "")
            all_articles.extend(result["articles"])

        new_state[url] = {
            "etag": result.get("etag"),
            "modified": result.get("modified"),
        }

    return all_articles, new_state

Install feedparser: uv pip install feedparser

Full-Text Extraction with newspaper4k

newspaper4k (the maintained fork of newspaper3k) downloads the article URL and extracts the main content:

from newspaper import Article
from typing import Optional
import time

def extract_with_newspaper(
    url: str,
    proxy: Optional[str] = None,
    language: str = "en",
) -> dict:
    """
    Download and extract full article text using newspaper4k.

    Performs NLP for keyword extraction and automatic summarization.
    """
    config = {}
    if proxy:
        config["proxies"] = {"http": proxy, "https": proxy}

    article = Article(url, language=language, **config)

    try:
        article.download()
        article.parse()
    except Exception as e:
        return {"url": url, "error": str(e), "text": ""}

    try:
        article.nlp()
    except Exception:
        pass  # NLP is optional, don't fail if it errors

    return {
        "url": url,
        "title": article.title or "",
        "text": article.text or "",
        "authors": article.authors or [],
        "publish_date": str(article.publish_date) if article.publish_date else None,
        "top_image": article.top_image or "",
        "keywords": article.keywords or [],
        "summary": article.summary or "",
        "meta_lang": article.meta_lang or language,
        "word_count": len((article.text or "").split()),
    }


# Quick test
article = extract_with_newspaper("https://techcrunch.com/some-article")
print(f"Title: {article['title']}")
print(f"Words: {article['word_count']}")
print(f"Keywords: {article['keywords'][:5]}")

Install: uv pip install newspaper4k lxml_html_clean

When newspaper4k Fails

newspaper4k misclassifies about 5-10% of articles — grabbing sidebar text, missing the article body, or pulling boilerplate. Watch for: - Very short text returns (under 200 words) on articles that should be longer - Navigation menu items appearing in the text - Missing the main content on sites with unusual layouts

In these cases, fall back to readability-lxml.

Alternative: readability-lxml

Mozilla's Readability algorithm, ported to Python:

import httpx
from readability import Document
from bs4 import BeautifulSoup
from typing import Optional

def extract_with_readability(
    url: str,
    proxy: Optional[str] = None,
    country: str = "us",
) -> dict:
    """
    Fetch and extract article using readability-lxml.

    Returns both HTML (for structured extraction) and plain text.
    More reliable than newspaper4k on unusual layouts.
    """
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        # Consent cookie for GDPR-sensitive sites
        "Cookie": "gdpr=1; consent=1; cookieconsent_status=dismiss",
    }

    client_kwargs = {
        "timeout": 20,
        "follow_redirects": True,
        "headers": headers,
    }
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    try:
        with httpx.Client(**client_kwargs) as client:
            resp = client.get(url)
        resp.raise_for_status()
    except Exception as e:
        return {"url": url, "error": str(e), "text": ""}

    doc = Document(resp.text)
    html_content = doc.summary()

    # Strip HTML tags to get plain text
    soup = BeautifulSoup(html_content, "html.parser")
    plain_text = soup.get_text(separator="\n", strip=True)

    return {
        "url": url,
        "title": doc.title() or "",
        "short_title": doc.short_title() or "",
        "html": html_content,
        "text": plain_text,
        "word_count": len(plain_text.split()),
        "char_count": len(plain_text),
    }

Install: uv pip install readability-lxml

Choosing Between newspaper4k and readability-lxml

def extract_article(
    url: str,
    proxy: Optional[str] = None,
    min_words: int = 150,
) -> dict:
    """
    Smart extraction: tries newspaper4k first, falls back to readability.

    min_words: If newspaper4k returns fewer words, fall back to readability.
    """
    # Try newspaper4k first
    result = extract_with_newspaper(url, proxy=proxy)

    if result.get("error"):
        # newspaper4k failed — try readability
        return extract_with_readability(url, proxy=proxy)

    if result.get("word_count", 0) < min_words:
        # newspaper4k returned too little content
        print(f"newspaper4k returned {result.get('word_count', 0)} words, trying readability")
        fallback = extract_with_readability(url, proxy=proxy)
        if fallback.get("word_count", 0) > result.get("word_count", 0):
            return fallback

    return result

Paywall Bypass via archive.org

For paywalled articles, the Wayback Machine often has a cached copy. This is explicitly public infrastructure — no scraping, just a well-documented API:

import httpx
from typing import Optional

def get_wayback_url(
    article_url: str,
    from_year: int = 2020,
    prefer_recent: bool = True,
) -> Optional[str]:
    """
    Query the Wayback Machine CDX API to find a snapshot of a paywalled article.

    Returns a usable archive URL or None if not found.
    """
    cdx_api = "https://web.archive.org/cdx/search/cdx"
    params = {
        "url": article_url,
        "output": "json",
        "limit": 5,
        "fl": "timestamp,statuscode,original,mimetype",
        "filter": "statuscode:200",
        "filter2": "mimetype:text/html",
        "from": f"{from_year}0101",
    }

    if prefer_recent:
        params["sort"] = "reverse"

    try:
        with httpx.Client(timeout=10) as client:
            resp = client.get(cdx_api, params=params)
        results = resp.json()
    except Exception:
        return None

    if len(results) < 2:  # First row is header
        return None

    # Pick the first valid snapshot
    for row in results[1:]:
        if len(row) >= 3:
            timestamp = row[0]
            original_url = row[2]
            return f"https://web.archive.org/web/{timestamp}/{original_url}"

    return None


def extract_paywalled_article(url: str) -> dict:
    """Try to extract a paywalled article via archive.org."""
    wayback_url = get_wayback_url(url)

    if not wayback_url:
        return {"url": url, "error": "No archive found", "text": ""}

    print(f"Using archive: {wayback_url}")
    result = extract_with_readability(wayback_url)
    result["source_url"] = url
    result["archive_url"] = wayback_url
    return result


# Usage
article = extract_paywalled_article("https://www.wsj.com/articles/some-article")
if article.get("text"):
    print(f"Retrieved {article['word_count']} words from archive")

Bulk Historical Data with CommonCrawl

For historical coverage at scale (millions of articles), CommonCrawl is the right tool. It indexes petabytes of web crawl data — you download from their S3 bucket rather than hitting publisher servers:

import httpx
import json
import gzip
import io
from typing import Iterator

def query_commoncrawl_index(
    domain: str,
    crawl: str = "CC-MAIN-2025-18",
    url_filter: str = None,
    limit: int = 100,
) -> list[dict]:
    """
    Query CommonCrawl index API for URLs from a given domain.

    crawl: CC crawl identifier. Check https://commoncrawl.org/the-data/get-started/
    for current crawl IDs. New crawls happen roughly monthly.

    Returns list of {url, timestamp, offset, length, filename} dicts
    for fetching the actual content.
    """
    index_url = f"https://index.commoncrawl.org/{crawl}-index"
    url_pattern = url_filter or f"{domain}/*"

    params = {
        "url": url_pattern,
        "output": "json",
        "limit": limit,
        "fl": "url,timestamp,status,filename,offset,length,mime",
        "filter": "status:200",
        "filter2": "mime:text/html",
    }

    with httpx.Client(timeout=30) as client:
        resp = client.get(index_url, params=params)

    if resp.status_code != 200:
        print(f"CommonCrawl index error: {resp.status_code}")
        return []

    records = []
    for line in resp.text.strip().splitlines():
        if line.strip():
            try:
                records.append(json.loads(line))
            except json.JSONDecodeError:
                continue

    return records


def fetch_commoncrawl_content(record: dict) -> Optional[str]:
    """
    Fetch the actual HTML content of a page from CommonCrawl's S3 bucket.

    Uses byte range requests to retrieve just the specific record.
    """
    filename = record.get("filename", "")
    offset = int(record.get("offset", 0))
    length = int(record.get("length", 0))

    if not filename:
        return None

    url = f"https://data.commoncrawl.org/{filename}"
    headers = {"Range": f"bytes={offset}-{offset + length - 1}"}

    try:
        with httpx.Client(timeout=30) as client:
            resp = client.get(url, headers=headers)

        if resp.status_code not in (200, 206):
            return None

        # WARC content is gzip-compressed
        raw = gzip.decompress(resp.content)
        text = raw.decode("utf-8", errors="replace")

        # WARC format: headers then blank line then HTML
        parts = text.split("\r\n\r\n", 2)
        if len(parts) >= 3:
            return parts[2]  # The HTML content
        elif len(parts) >= 2:
            return parts[1]

    except Exception as e:
        print(f"CommonCrawl fetch error: {e}")
        return None


def bulk_historical_crawl(
    domain: str,
    crawl: str = "CC-MAIN-2025-18",
    max_articles: int = 100,
) -> Iterator[dict]:
    """
    Yield article records from CommonCrawl for a domain.
    More efficient than hitting publisher servers for historical data.
    """
    records = query_commoncrawl_index(domain, crawl=crawl, limit=max_articles)
    print(f"Found {len(records)} records for {domain} in {crawl}")

    for record in records:
        html = fetch_commoncrawl_content(record)
        if not html:
            continue

        doc = Document(html)
        text = BeautifulSoup(doc.summary(), "html.parser").get_text(separator="\n", strip=True)

        if len(text.split()) < 100:  # Skip very short pages
            continue

        yield {
            "url": record.get("url"),
            "timestamp": record.get("timestamp"),
            "title": doc.title(),
            "text": text,
            "word_count": len(text.split()),
            "source": "commoncrawl",
            "crawl": crawl,
        }


# Collect Reuters articles from CommonCrawl without hitting their servers
for article in bulk_historical_crawl("reuters.com", max_articles=50):
    print(f"{article['timestamp']}: {article['title'][:60]} ({article['word_count']} words)")
    time.sleep(0.1)  # S3 is fast, but be considerate

Anti-Bot Measures and Proxy Strategy

News sites run a range of bot defenses in 2026:

Cloudflare (Most Common)

Turnstile challenges and JS fingerprinting are common on mid-tier publishers. The RSS feed itself is usually not Cloudflare-protected, but the article pages often are.

Many European publishers serve cookie consent walls. The classic workaround is sending the consent cookie in your request:

CONSENT_HEADERS = {
    "Cookie": "gdpr=1; consent=1; euconsent-v2=...; cookieconsent_status=dismiss",
    "Accept-Language": "en-GB,en;q=0.9",
}

For more stubborn consent walls, use geo-targeting in your proxy to route from a US IP — many EU publishers serve simpler versions to US visitors.

Geo-Restrictions

Publishers like BBC, DW, or local news outlets often show different content or block access based on geography. ThorData's geo-targeting handles this:

# BBC content with UK IP
proxy_uk = make_proxy(country="gb")
# US-only financial news
proxy_us = make_proxy(country="us")
# German news from Germany
proxy_de = make_proxy(country="de")

Paywalls

Rate Limiting Per Domain

Most news sites allow 30-100 requests per hour per IP. Across a multi-domain scraper, distribute requests with a minimum 3-5 second delay per domain:

from collections import defaultdict

class DomainRateLimiter:
    """Track last request time per domain and enforce minimum delays."""

    def __init__(self, default_delay: float = 3.0):
        self.last_request = defaultdict(float)
        self.default_delay = default_delay

    def wait_for(self, url: str, domain_delay: float = None):
        import urllib.parse
        domain = urllib.parse.urlparse(url).netloc
        delay = domain_delay or self.default_delay
        elapsed = time.time() - self.last_request[domain]
        if elapsed < delay:
            time.sleep(delay - elapsed)
        self.last_request[domain] = time.time()


rate_limiter = DomainRateLimiter(default_delay=3.0)

ThorData Integration

ThorData's rotating residential proxy network provides IPs from real ISP ranges that pass ASN-level geo and reputation checks. This is specifically important for news scrapers because:

  1. News sites use ASN-level reputation — AWS and DigitalOcean ranges are well-known proxy origins
  2. Geo-restrictions require country-specific routing, which ThorData supports
  3. Consent wall variants differ by geography — routing from the right country can serve simpler pages
import httpx
import random
import string
from typing import Optional

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "gate.thordata.net"
THORDATA_PORT = 9000

def make_proxy(country: str = "us", session_id: str = None) -> str:
    """
    Build a ThorData residential proxy URL.
    Use sticky sessions for multi-page article fetches.
    """
    user = f"{THORDATA_USER}-country-{country}"
    if session_id:
        user += f"-session-{session_id}"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"


def fetch_article_html(
    url: str,
    country: str = "us",
    use_sticky: bool = True,
) -> Optional[str]:
    """Fetch article HTML through a residential proxy with geo-targeting."""
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
        "Cookie": "gdpr=1; consent=1",
    }

    session_id = None
    if use_sticky:
        session_id = "".join(random.choices(string.ascii_lowercase, k=8))

    proxy = make_proxy(country=country, session_id=session_id)

    try:
        with httpx.Client(
            proxies={"all://": proxy},
            headers=headers,
            timeout=25,
            follow_redirects=True,
        ) as client:
            resp = client.get(url)
            resp.raise_for_status()
            return resp.text
    except Exception as e:
        print(f"Fetch error for {url}: {e}")
        return None

Building a Production Pipeline

import sqlite3
import json
import time
import random
import feedparser
from datetime import datetime, timezone
from typing import Optional
from readability import Document
from bs4 import BeautifulSoup

def init_db(path: str = "news.db") -> sqlite3.Connection:
    """Initialize the news article database."""
    conn = sqlite3.connect(path)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS feed_state (
            feed_url TEXT PRIMARY KEY,
            feed_title TEXT,
            etag TEXT,
            modified TEXT,
            last_polled TEXT,
            article_count INTEGER DEFAULT 0
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS articles (
            url TEXT PRIMARY KEY,
            title TEXT,
            text TEXT,
            html TEXT,
            word_count INTEGER,
            published TEXT,
            author TEXT,
            source_domain TEXT,
            source_feed TEXT,
            tags TEXT,
            keywords TEXT,
            scraped_at TEXT NOT NULL
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published DESC)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_domain ON articles(source_domain)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_scraped ON articles(scraped_at DESC)")

    conn.commit()
    return conn


def get_domain(url: str) -> str:
    """Extract domain from URL."""
    import urllib.parse
    return urllib.parse.urlparse(url).netloc


def save_article(conn: sqlite3.Connection, article: dict):
    """Save an article to SQLite, ignoring duplicates."""
    if not article.get("url") or not article.get("text"):
        return

    now = datetime.now(timezone.utc).isoformat()
    conn.execute("""
        INSERT OR IGNORE INTO articles
        (url, title, text, html, word_count, published, author,
         source_domain, source_feed, tags, keywords, scraped_at)
        VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
    """, (
        article["url"],
        article.get("title", ""),
        article.get("text", ""),
        article.get("html", ""),
        article.get("word_count", 0),
        article.get("published") or article.get("publish_date"),
        ", ".join(article.get("authors", [])) if isinstance(article.get("authors"), list) else article.get("author", ""),
        get_domain(article["url"]),
        article.get("source_feed", ""),
        json.dumps(article.get("tags", [])),
        json.dumps(article.get("keywords", [])),
        now,
    ))
    conn.commit()


def update_feed_state(conn: sqlite3.Connection, feed_url: str, result: dict, article_count: int):
    """Update the feed polling state."""
    now = datetime.now(timezone.utc).isoformat()
    conn.execute("""
        INSERT INTO feed_state (feed_url, feed_title, etag, modified, last_polled, article_count)
        VALUES (?,?,?,?,?,?)
        ON CONFLICT(feed_url) DO UPDATE SET
            feed_title=excluded.feed_title,
            etag=COALESCE(excluded.etag, feed_state.etag),
            modified=COALESCE(excluded.modified, feed_state.modified),
            last_polled=excluded.last_polled,
            article_count=feed_state.article_count + excluded.article_count
    """, (
        feed_url,
        result.get("feed_title", ""),
        result.get("etag"),
        result.get("modified"),
        now,
        article_count,
    ))
    conn.commit()


def run_pipeline(
    feeds: list[str],
    db_path: str = "news.db",
    use_proxy: bool = False,
    country: str = "us",
    min_words: int = 150,
    delay_range: tuple = (2, 5),
):
    """
    Complete news scraping pipeline.

    1. Poll each RSS feed for new articles
    2. Extract full text for new articles
    3. Save to SQLite
    """
    conn = init_db(db_path)
    rate_limiter = DomainRateLimiter(default_delay=3.0)

    # Load existing feed states for conditional GETs
    feed_states = {}
    for row in conn.execute("SELECT feed_url, etag, modified FROM feed_state").fetchall():
        feed_states[row[0]] = {"etag": row[1], "modified": row[2]}

    # Track seen URLs to avoid re-fetching
    seen_urls = {row[0] for row in conn.execute("SELECT url FROM articles")}

    total_new = 0

    for feed_url in feeds:
        domain = get_domain(feed_url)
        print(f"\nPolling: {feed_url}")

        prev_state = feed_states.get(feed_url, {})
        result = fetch_feed(
            feed_url,
            etag=prev_state.get("etag"),
            modified=prev_state.get("modified"),
        )

        if not result.get("updated"):
            print(f"  Not modified (304), skipping")
            continue

        new_articles = result.get("articles", [])
        print(f"  Found {len(new_articles)} articles in feed")

        saved_count = 0
        for feed_item in new_articles:
            url = feed_item.get("url", "")
            if not url or url in seen_urls:
                continue

            # Rate limit per article domain
            rate_limiter.wait_for(url, domain_delay=random.uniform(*delay_range))

            # Fetch and extract full text
            proxy = None
            if use_proxy:
                proxy = make_proxy(country=country)

            full = extract_article(url, proxy=proxy, min_words=min_words)

            if full.get("error") or full.get("word_count", 0) < min_words:
                # Try archive.org fallback
                wayback = get_wayback_url(url)
                if wayback:
                    full = extract_with_readability(wayback, proxy=proxy)
                    full["source_url"] = url

            if full.get("word_count", 0) >= min_words:
                full.update({
                    "source_feed": feed_url,
                    "published": feed_item.get("published"),
                    "author": feed_item.get("author"),
                    "tags": feed_item.get("tags", []),
                })
                save_article(conn, full)
                seen_urls.add(url)
                saved_count += 1
                total_new += 1
                print(f"  Saved: {full.get('title', url)[:60]} ({full.get('word_count', 0)} words)")

        update_feed_state(conn, feed_url, result, saved_count)
        print(f"  Saved {saved_count} new articles")

    print(f"\nTotal new articles this run: {total_new}")
    conn.close()


# Schedule this with cron: */15 * * * * python3 pipeline.py
if __name__ == "__main__":
    import random

    FEEDS_TO_MONITOR = list(MAJOR_NEWS_FEEDS.values())

    run_pipeline(
        feeds=FEEDS_TO_MONITOR,
        db_path="news.db",
        use_proxy=False,  # Set True for Cloudflare-protected sites
        min_words=150,
        delay_range=(2, 5),
    )

Analytics Queries

def articles_by_domain(conn: sqlite3.Connection) -> list[dict]:
    """Count articles collected per news source."""
    rows = conn.execute("""
        SELECT source_domain, COUNT(*) as count, MAX(scraped_at) as latest
        FROM articles
        GROUP BY source_domain
        ORDER BY count DESC
    """).fetchall()
    return [{"domain": r[0], "count": r[1], "latest": r[2]} for r in rows]


def keyword_frequency(conn: sqlite3.Connection, days: int = 7) -> list[dict]:
    """Find most common keywords in recent articles."""
    from datetime import datetime, timedelta
    cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()

    rows = conn.execute("""
        SELECT keywords FROM articles
        WHERE scraped_at >= ? AND keywords != '[]'
    """, (cutoff,)).fetchall()

    from collections import Counter
    counts = Counter()
    for row in rows:
        try:
            kws = json.loads(row[0])
            for kw in kws:
                if isinstance(kw, str) and len(kw) > 3:
                    counts[kw.lower()] += 1
        except Exception:
            pass

    return [{"keyword": k, "count": v} for k, v in counts.most_common(50)]


def articles_by_hour(conn: sqlite3.Connection) -> list[dict]:
    """Publishing volume by hour of day (UTC)."""
    rows = conn.execute("""
        SELECT SUBSTR(published, 12, 2) as hour, COUNT(*) as count
        FROM articles
        WHERE published IS NOT NULL
        GROUP BY hour
        ORDER BY hour
    """).fetchall()
    return [{"hour": r[0], "count": r[1]} for r in rows]

Summary