← Back to blog

Scraping Podcast Directory Data: Listen Notes, iTunes, and Spotify (2026)

Scraping Podcast Directory Data: Listen Notes, iTunes, and Spotify (2026)

Podcast data is scattered across platforms that don't talk to each other. A show might have different stats on Spotify than Apple Podcasts, and neither of them shares listener numbers publicly. If you're doing market research, tracking competitors, or building a podcast analytics product, you need to pull from multiple directories and stitch the data together yourself.

This guide covers three primary sources — Listen Notes API for search and metadata, iTunes/Apple Podcasts for RSS-based episode data, and Spotify's podcast endpoints — with working Python code for each. We also cover anti-detection strategies, proxy setup, error handling for production workloads, and the legal landscape around podcast data collection.

What Data Exists Where

Each platform exposes different slices of the picture:

None of them give you actual download or listener numbers. Those are locked behind each platform's analytics dashboard. What you can collect is everything else — and it's enough to build useful competitive intelligence. Podcast advertising rates are tied to download estimates, and the directional signal from platform-visible metrics is often enough for market sizing work.

Podcast hosting platforms also expose RSS feeds directly, which are the canonical data source for episode metadata. Hosts like Anchor (now Spotify for Podcasters), Buzzsprout, Libsyn, Transistor, and Podbean all publish RSS feeds that any aggregator can consume. The RSS standard for podcasts, extended by the iTunes namespace, gives you show and episode metadata that's often richer than what the directories expose.

Understanding the Podcast Data Ecosystem

Before diving into code, it helps to understand the three-layer architecture of podcast data:

Layer 1: RSS feeds. The foundational data format. Each podcast has a canonical RSS feed URL. The iTunes namespace (http://www.itunes.com/dtds/podcast-1.0.dtd) adds fields like episode type, explicit flag, season/episode numbers, and chapter data. The newer podcast: namespace adds transcripts, chapters, funding links, and value-for-value payment tags. RSS feeds are the most authoritative source for episode metadata since they're controlled by the show creator.

Layer 2: Aggregator directories. Apple Podcasts, Spotify, Pocket Casts, and others index RSS feeds and add their own data on top — ratings, listener counts (internal), editorial placements, and genre classifications. These directories give you discovery data (rankings, popularity) that's not in the raw RSS feeds.

Layer 3: Analytics platforms. Listen Notes, Podchaser, and similar tools build their own derived metrics (Listen Score, etc.) by aggregating data across directories and estimating audience sizes from proxy signals. These are useful for market research but should be treated as estimates, not hard numbers.

Listen Notes API

Listen Notes offers the most comprehensive podcast search API. The free tier gives you 300 requests per month — limited, but enough for targeted research. Paid plans start at $20/month for 10,000 requests.

import httpx
import time
import json
import sqlite3
from dataclasses import dataclass, asdict, field
from typing import Optional

@dataclass
class PodcastShow:
    show_id: str
    title: str
    publisher: str
    description: str
    total_episodes: int
    listen_score: int
    listen_score_global_rank: str
    language: str
    country: str
    rss_url: str
    website: str
    genres: list
    latest_episode_date: str
    earliest_pub_date: str
    update_frequency: str
    source: str = "listennotes"

@dataclass
class Episode:
    episode_id: str
    title: str
    description: str
    published_at: str
    duration_seconds: int
    audio_url: str
    show_title: str
    show_id: str
    explicit: bool = False
    source: str = "listennotes"

def build_listen_notes_client(api_key: str) -> httpx.Client:
    return httpx.Client(
        base_url="https://listen-api.listennotes.com/api/v2",
        headers={
            "X-ListenAPI-Key": api_key,
            "Accept": "application/json",
        },
        timeout=30,
    )

def search_listen_notes(
    query: str,
    client: httpx.Client,
    offset: int = 0,
    sort: str = "0",
    language: str = "English",
    safe_mode: int = 0,
) -> tuple[list[PodcastShow], int]:
    """
    Search Listen Notes for podcasts matching a query.
    Returns (shows, total_count).
    sort: 0=relevance, 1=recent episode first.
    """
    resp = client.get(
        "/search",
        params={
            "q": query,
            "type": "podcast",
            "offset": offset,
            "sort_by_date": sort,
            "language": language,
            "safe_mode": safe_mode,
        },
    )
    resp.raise_for_status()
    data = resp.json()
    total = data.get("total", 0)

    shows = []
    for result in data.get("results", []):
        shows.append(PodcastShow(
            show_id=result["id"],
            title=result.get("title_original", ""),
            publisher=result.get("publisher_original", ""),
            description=result.get("description_original", "")[:500],
            total_episodes=result.get("total_episodes", 0),
            listen_score=result.get("listen_score") or 0,
            listen_score_global_rank=result.get("listen_score_global_rank", ""),
            language=result.get("language", ""),
            country=result.get("country", ""),
            rss_url=result.get("rss", ""),
            website=result.get("website", ""),
            genres=[g if isinstance(g, str) else g.get("name", "") for g in result.get("genre_ids", [])],
            latest_episode_date=str(result.get("latest_pub_date_ms", "")),
            earliest_pub_date=str(result.get("earliest_pub_date_ms", "")),
            update_frequency=result.get("update_frequency_hours", ""),
        ))

    return shows, total

def paginate_search(
    query: str,
    client: httpx.Client,
    max_results: int = 100,
    delay: float = 1.5,
) -> list[PodcastShow]:
    """Paginate through Listen Notes search results."""
    all_shows = []
    offset = 0
    page_size = 10  # Listen Notes free tier returns 10 per page

    while len(all_shows) < max_results:
        shows, total = search_listen_notes(query, client, offset=offset)
        if not shows:
            break
        all_shows.extend(shows)
        if offset + page_size >= min(total, max_results):
            break
        offset += page_size
        time.sleep(delay)

    return all_shows[:max_results]

Fetching Episodes from Listen Notes

Once you have a show ID, pull its episode list:

def get_show_episodes(
    show_id: str,
    client: httpx.Client,
    max_episodes: int = 100,
    delay: float = 1.2,
) -> list[Episode]:
    """
    Fetch episodes for a specific podcast show.
    Uses next_episode_pub_date for cursor-based pagination.
    """
    episodes = []
    next_date = None

    # First request: get show info
    resp = client.get(f"/podcasts/{show_id}", params={"sort": "recent_first"})
    resp.raise_for_status()
    data = resp.json()
    show_title = data.get("title", "")

    for ep in data.get("episodes", []):
        episodes.append(_parse_ln_episode(ep, show_title, show_id))

    next_date = data.get("next_episode_pub_date")

    # Paginate remaining
    while next_date and len(episodes) < max_episodes:
        time.sleep(delay)
        resp = client.get(
            f"/podcasts/{show_id}",
            params={"sort": "recent_first", "next_episode_pub_date": next_date},
        )
        resp.raise_for_status()
        data = resp.json()

        for ep in data.get("episodes", []):
            episodes.append(_parse_ln_episode(ep, show_title, show_id))

        next_date = data.get("next_episode_pub_date")
        if not data.get("episodes"):
            break

    return episodes[:max_episodes]

def _parse_ln_episode(ep: dict, show_title: str, show_id: str) -> Episode:
    return Episode(
        episode_id=ep["id"],
        title=ep.get("title", ""),
        description=ep.get("description", "")[:500],
        published_at=str(ep.get("pub_date_ms", "")),
        duration_seconds=ep.get("audio_length_sec", 0),
        audio_url=ep.get("audio", ""),
        show_title=show_title,
        show_id=show_id,
        explicit=ep.get("explicit_content", False),
    )

def get_genre_rankings(client: httpx.Client, genre_id: int, region: str = "us") -> list[dict]:
    """
    Get best podcasts for a specific genre.
    Popular genre IDs: 67=Technology, 93=Business, 77=Sports, 133=Comedy.
    """
    resp = client.get(
        "/best_podcasts",
        params={
            "genre_id": genre_id,
            "region": region,
            "safe_mode": 0,
        },
    )
    resp.raise_for_status()
    data = resp.json()
    return data.get("podcasts", [])

iTunes Search and RSS Parsing

Apple Podcasts data comes from two places: the iTunes Search API for discovery, and the RSS feed for episode-level data. No API key needed for either.

import xml.etree.ElementTree as ET
from urllib.parse import quote_plus

def search_itunes_podcasts(
    query: str,
    limit: int = 25,
    country: str = "US",
    explicit: str = "Yes",
) -> list[dict]:
    """
    Search Apple Podcasts / iTunes for shows.
    No API key required. Rate limit: ~20 requests/minute.
    """
    resp = httpx.get(
        "https://itunes.apple.com/search",
        params={
            "term": query,
            "media": "podcast",
            "limit": limit,
            "country": country,
            "explicit": explicit,
        },
        timeout=15,
    )
    resp.raise_for_status()
    data = resp.json()

    results = []
    for item in data.get("results", []):
        results.append({
            "itunes_id": item.get("collectionId"),
            "name": item.get("collectionName", ""),
            "artist": item.get("artistName", ""),
            "feed_url": item.get("feedUrl", ""),
            "track_count": item.get("trackCount", 0),
            "genres": item.get("genres", []),
            "primary_genre": item.get("primaryGenreName", ""),
            "artwork_url_30": item.get("artworkUrl30", ""),
            "artwork_url_100": item.get("artworkUrl100", ""),
            "artwork_url_600": item.get("artworkUrl600", ""),
            "release_date": item.get("releaseDate", ""),
            "country": item.get("country", ""),
            "content_advisory": item.get("contentAdvisoryRating", ""),
        })

    return results

def get_itunes_top_charts(
    genre_id: int = 26,
    limit: int = 100,
    country: str = "us",
) -> list[dict]:
    """
    Fetch iTunes podcast top charts.
    genre_id 26 = All Podcasts. Others: 1301=Arts, 1321=Business, etc.
    """
    url = f"https://itunes.apple.com/{country}/rss/toppodcasts/limit={limit}/genre={genre_id}/json"
    resp = httpx.get(url, timeout=15)
    resp.raise_for_status()
    data = resp.json()
    feed = data.get("feed", {})
    entries = feed.get("entry", [])

    results = []
    for i, entry in enumerate(entries):
        results.append({
            "rank": i + 1,
            "name": entry.get("im:name", {}).get("label", ""),
            "artist": entry.get("im:artist", {}).get("label", ""),
            "itunes_id": entry.get("id", {}).get("attributes", {}).get("im:id", ""),
            "genre": entry.get("category", {}).get("attributes", {}).get("term", ""),
            "artwork": entry.get("im:image", [{}])[-1].get("label", ""),
        })

    return results

def parse_podcast_rss(
    feed_url: str,
    max_episodes: int = 50,
    proxy: str = None,
    timeout: float = 20.0,
) -> dict:
    """
    Parse a podcast RSS feed for show info and episode data.
    Returns dict with 'show' and 'episodes' keys.
    """
    transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
    with httpx.Client(
        transport=transport,
        timeout=timeout,
        follow_redirects=True,
        headers={
            "User-Agent": "Mozilla/5.0 (compatible; PodcastBot/1.0; +https://example.com/bot)",
            "Accept": "application/rss+xml, application/xml, text/xml, application/atom+xml",
        },
    ) as client:
        resp = client.get(feed_url)
        resp.raise_for_status()

    root = ET.fromstring(resp.content)
    ns = {
        "itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
        "podcast": "https://podcastindex.org/namespace/1.0",
        "atom": "http://www.w3.org/2005/Atom",
        "content": "http://purl.org/rss/1.0/modules/content/",
    }
    channel = root.find("channel")
    if channel is None:
        return {"show": {}, "episodes": []}

    # Show-level metadata
    show = {
        "title": channel.findtext("title", "").strip(),
        "link": channel.findtext("link", "").strip(),
        "description": (channel.findtext("description") or "")[:500].strip(),
        "language": channel.findtext("language", "").strip(),
        "author": channel.findtext("itunes:author", namespaces=ns, default="").strip(),
        "owner_email": "",
        "category": [],
        "explicit": channel.findtext("itunes:explicit", namespaces=ns, default="").strip(),
        "image": "",
        "feed_url": feed_url,
    }

    owner = channel.find("itunes:owner", ns)
    if owner is not None:
        email_el = owner.find("itunes:email", ns)
        show["owner_email"] = email_el.text.strip() if email_el is not None and email_el.text else ""

    cats = channel.findall("itunes:category", ns)
    show["category"] = [c.get("text", "") for c in cats]

    img = channel.find("itunes:image", ns)
    if img is not None:
        show["image"] = img.get("href", "")

    # Episodes
    episodes = []
    for item in channel.findall("item")[:max_episodes]:
        enclosure = item.find("enclosure")
        duration_el = item.find("itunes:duration", ns)
        ep_type_el = item.find("itunes:episodeType", ns)
        season_el = item.find("itunes:season", ns)
        episode_el = item.find("itunes:episode", ns)
        guid_el = item.find("guid")

        episodes.append({
            "guid": (guid_el.text or "").strip() if guid_el is not None else "",
            "title": (item.findtext("title") or "").strip(),
            "published": (item.findtext("pubDate") or "").strip(),
            "description": (item.findtext("description") or "")[:500].strip(),
            "duration": duration_el.text.strip() if duration_el is not None and duration_el.text else "",
            "audio_url": enclosure.get("url", "") if enclosure is not None else "",
            "audio_size_bytes": enclosure.get("length", "") if enclosure is not None else "",
            "audio_type": enclosure.get("type", "") if enclosure is not None else "",
            "episode_type": ep_type_el.text.strip() if ep_type_el is not None and ep_type_el.text else "full",
            "season": season_el.text.strip() if season_el is not None and season_el.text else "",
            "episode_number": episode_el.text.strip() if episode_el is not None and episode_el.text else "",
            "explicit": (item.findtext("itunes:explicit", namespaces=ns) or "").strip(),
        })

    return {"show": show, "episodes": episodes}

Spotify Podcast Data

Spotify doesn't offer a dedicated podcast scraping API, but their Web API includes podcast endpoints. You need an OAuth token via the client credentials flow:

import base64
import threading

class SpotifyClient:
    """
    Spotify API client with automatic token refresh.
    Uses client credentials flow — no user login required.
    """

    BASE = "https://api.spotify.com/v1"

    def __init__(self, client_id: str, client_secret: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self._token: Optional[str] = None
        self._token_expires: float = 0
        self._lock = threading.Lock()

    def _refresh_token(self):
        auth = base64.b64encode(f"{self.client_id}:{self.client_secret}".encode()).decode()
        resp = httpx.post(
            "https://accounts.spotify.com/api/token",
            data={"grant_type": "client_credentials"},
            headers={
                "Authorization": f"Basic {auth}",
                "Content-Type": "application/x-www-form-urlencoded",
            },
            timeout=15,
        )
        resp.raise_for_status()
        data = resp.json()
        self._token = data["access_token"]
        self._token_expires = time.time() + data["expires_in"] - 60  # refresh 60s early

    def _get_token(self) -> str:
        with self._lock:
            if not self._token or time.time() >= self._token_expires:
                self._refresh_token()
            return self._token

    def get(self, path: str, params: dict = None) -> dict:
        token = self._get_token()
        resp = httpx.get(
            f"{self.BASE}/{path.lstrip('/')}",
            params=params,
            headers={
                "Authorization": f"Bearer {token}",
                "Accept": "application/json",
            },
            timeout=20,
        )
        resp.raise_for_status()
        return resp.json()

def search_spotify_podcasts(
    client: SpotifyClient,
    query: str,
    limit: int = 20,
    market: str = "US",
) -> list[dict]:
    """Search Spotify for podcast shows."""
    data = client.get(
        "/search",
        params={"q": query, "type": "show", "market": market, "limit": min(limit, 50)},
    )
    shows = []
    for item in data.get("shows", {}).get("items", []):
        if item is None:
            continue
        shows.append({
            "spotify_id": item["id"],
            "name": item["name"],
            "publisher": item.get("publisher", ""),
            "total_episodes": item.get("total_episodes", 0),
            "description": item.get("description", "")[:500],
            "languages": item.get("languages", []),
            "explicit": item.get("explicit", False),
            "media_type": item.get("media_type", "audio"),
            "external_url": item.get("external_urls", {}).get("spotify", ""),
            "html_description": item.get("html_description", "")[:200],
        })
    return shows

def get_spotify_show_episodes(
    client: SpotifyClient,
    show_id: str,
    market: str = "US",
    max_episodes: int = 100,
    delay: float = 0.8,
) -> list[dict]:
    """
    Get all episodes for a Spotify podcast show.
    Handles pagination via next cursor.
    """
    episodes = []
    offset = 0
    limit = 50

    while len(episodes) < max_episodes:
        data = client.get(
            f"/shows/{show_id}/episodes",
            params={"market": market, "limit": limit, "offset": offset},
        )
        items = data.get("items", [])
        if not items:
            break

        for ep in items:
            if ep is None:
                continue
            episodes.append({
                "episode_id": ep["id"],
                "name": ep["name"],
                "description": ep.get("description", "")[:500],
                "release_date": ep.get("release_date", ""),
                "release_date_precision": ep.get("release_date_precision", ""),
                "duration_ms": ep.get("duration_ms", 0),
                "duration_minutes": ep.get("duration_ms", 0) // 60000,
                "language": ep.get("language", ""),
                "languages": ep.get("languages", []),
                "explicit": ep.get("explicit", False),
                "type": ep.get("type", "episode"),
                "external_url": ep.get("external_urls", {}).get("spotify", ""),
            })

        if not data.get("next"):
            break
        offset += limit
        time.sleep(delay)

    return episodes[:max_episodes]

def get_spotify_show_details(client: SpotifyClient, show_id: str, market: str = "US") -> dict:
    """Get complete show metadata including all available fields."""
    return client.get(f"/shows/{show_id}", params={"market": market})

Cross-Platform Matching and Data Deduplication

The hardest part of podcast aggregation is matching the same show across platforms. There's no universal podcast ID.

from difflib import SequenceMatcher
import re
import unicodedata

def normalize_name(name: str) -> str:
    """Normalize a podcast name for comparison."""
    name = unicodedata.normalize("NFKC", name.lower())
    name = re.sub(r"[^\w\s]", "", name)
    name = re.sub(r"\s+", " ", name).strip()
    # Remove common suffixes that vary across platforms
    for suffix in ["podcast", "show", "official", "the", "a"]:
        name = re.sub(rf"\b{suffix}\b", "", name).strip()
    return name

def match_shows(show_a: dict, show_b: dict) -> float:
    """
    Score how likely two show records represent the same podcast.
    Returns 0.0 to 1.0.
    """
    # RSS URL match is definitive
    rss_a = show_a.get("rss_url", "") or show_a.get("feed_url", "")
    rss_b = show_b.get("rss_url", "") or show_b.get("feed_url", "")
    if rss_a and rss_b and rss_a == rss_b:
        return 1.0

    # Normalize names for comparison
    title_a = normalize_name(show_a.get("title", "") or show_a.get("name", ""))
    title_b = normalize_name(show_b.get("title", "") or show_b.get("name", ""))
    title_sim = SequenceMatcher(None, title_a, title_b).ratio()

    # Publisher / artist comparison
    pub_a = normalize_name(show_a.get("publisher", "") or show_a.get("artist", ""))
    pub_b = normalize_name(show_b.get("publisher", "") or show_b.get("artist", ""))
    pub_sim = SequenceMatcher(None, pub_a, pub_b).ratio() if pub_a and pub_b else 0.5

    # Episode count similarity as a weak signal
    count_a = show_a.get("total_episodes", 0) or show_a.get("track_count", 0)
    count_b = show_b.get("total_episodes", 0) or show_b.get("track_count", 0)
    if count_a and count_b:
        max_count = max(count_a, count_b)
        min_count = min(count_a, count_b)
        count_sim = min_count / max_count if max_count else 0
    else:
        count_sim = 0.5

    # Weighted score: title is most important
    return (title_sim * 0.65) + (pub_sim * 0.25) + (count_sim * 0.10)

def deduplicate_shows(shows: list[dict], threshold: float = 0.85) -> list[dict]:
    """Remove near-duplicate shows from a mixed-platform list."""
    unique = []
    for show in shows:
        is_duplicate = False
        for existing in unique:
            if match_shows(show, existing) >= threshold:
                is_duplicate = True
                break
        if not is_duplicate:
            unique.append(show)
    return unique

Anti-Bot Measures and Rate Limiting

The APIs covered here are mostly well-behaved — you authenticate properly and get structured data back. The problems start when you go beyond the official endpoints or scale up.

RSS feed hosting varies wildly. Some podcast hosts (Libsyn, Buzzsprout, Anchor) serve RSS feeds without issue. Others are behind Cloudflare or rate-limit aggressively. When scraping hundreds of RSS feeds, expect 10-15% to fail on any given run.

iTunes and Spotify rate limits. iTunes Search API allows roughly 20 requests per minute before returning 429s. Spotify's API is more generous (hundreds per minute with authenticated requests) but will throttle burst traffic. Always implement exponential backoff.

Scaling RSS collection. If you're monitoring thousands of shows by polling their RSS feeds, you need IP diversity. A single IP cycling through 5,000 feeds will get blocked by at least a dozen different hosting providers. ThorData's rotating residential proxies solve this — each RSS feed request exits from a different residential IP, so no single hosting provider sees suspicious volume from one address.

import random
from typing import Optional

USER_AGENTS = [
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64; rv:125.0) Gecko/20100101 Firefox/125.0",
    "Mozilla/5.0 (compatible; PodcastAggregator/2.0; +https://example.com/bot)",
]

def fetch_rss_with_retry(
    feed_url: str,
    proxies: Optional[list[str]] = None,
    max_attempts: int = 3,
    base_delay: float = 2.0,
) -> Optional[str]:
    """
    Fetch RSS feed with proxy rotation and exponential backoff retry.
    Returns raw XML string or None on failure.
    """
    for i in range(max_attempts):
        proxy = random.choice(proxies) if proxies else None
        transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
        ua = random.choice(USER_AGENTS)

        try:
            with httpx.Client(
                transport=transport,
                timeout=15,
                follow_redirects=True,
            ) as client:
                resp = client.get(
                    feed_url,
                    headers={
                        "User-Agent": ua,
                        "Accept": "application/rss+xml, application/xml, text/xml, */*",
                        "Accept-Encoding": "gzip, deflate, br",
                    },
                )
                if resp.status_code == 200:
                    content_type = resp.headers.get("content-type", "")
                    if "xml" in content_type or "rss" in content_type or len(resp.text) > 500:
                        return resp.text
                elif resp.status_code == 429:
                    wait = float(resp.headers.get("Retry-After", base_delay * (2 ** i)))
                    print(f"Rate limited on {feed_url}, waiting {wait:.1f}s")
                    time.sleep(wait)
                elif resp.status_code in (403, 406):
                    # Rotate user agent on next attempt
                    time.sleep(base_delay * (i + 1))
                else:
                    time.sleep(base_delay * (2 ** i))
        except httpx.TimeoutException:
            print(f"Timeout on {feed_url} (attempt {i + 1})")
            time.sleep(base_delay * (2 ** i))
        except httpx.ConnectError as e:
            print(f"Connection error on {feed_url}: {e}")
            time.sleep(base_delay * (i + 1))
        except Exception as e:
            print(f"Unexpected error on {feed_url}: {e}")
            break

    return None

def batch_fetch_rss(
    feed_urls: list[str],
    proxies: Optional[list[str]] = None,
    delay: float = 0.8,
    max_attempts: int = 3,
) -> dict[str, Optional[dict]]:
    """
    Fetch and parse multiple RSS feeds with rate limiting.
    Returns dict mapping feed_url -> parsed data (or None on failure).
    """
    results = {}
    for i, url in enumerate(feed_urls):
        raw = fetch_rss_with_retry(url, proxies=proxies, max_attempts=max_attempts)
        if raw:
            try:
                # Re-use the parse function but feed raw content
                root = ET.fromstring(raw)
                ns = {"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd"}
                channel = root.find("channel")
                if channel is not None:
                    results[url] = {
                        "title": channel.findtext("title", ""),
                        "episode_count": len(channel.findall("item")),
                    }
                else:
                    results[url] = None
            except ET.ParseError as e:
                print(f"XML parse error for {url}: {e}")
                results[url] = None
        else:
            results[url] = None

        if (i + 1) % 10 == 0:
            print(f"Progress: {i + 1}/{len(feed_urls)} feeds processed")

        time.sleep(delay + random.uniform(0, 0.3))

    return results

Production Error Handling and Storage

For a production podcast monitoring system, you need robust error handling and persistent storage:

def setup_database(db_path: str = "podcasts.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS shows (
            show_id TEXT PRIMARY KEY,
            title TEXT,
            publisher TEXT,
            description TEXT,
            total_episodes INTEGER,
            listen_score INTEGER,
            language TEXT,
            country TEXT,
            rss_url TEXT,
            website TEXT,
            genres TEXT,
            latest_episode_date TEXT,
            source TEXT,
            scraped_at TEXT DEFAULT (datetime('now')),
            updated_at TEXT
        );

        CREATE TABLE IF NOT EXISTS episodes (
            episode_id TEXT PRIMARY KEY,
            show_id TEXT,
            title TEXT,
            description TEXT,
            published_at TEXT,
            duration_seconds INTEGER,
            audio_url TEXT,
            explicit INTEGER DEFAULT 0,
            source TEXT,
            scraped_at TEXT DEFAULT (datetime('now')),
            FOREIGN KEY (show_id) REFERENCES shows(show_id)
        );

        CREATE TABLE IF NOT EXISTS scrape_errors (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            resource_type TEXT,
            resource_id TEXT,
            error_type TEXT,
            error_message TEXT,
            attempt_count INTEGER DEFAULT 1,
            first_seen TEXT DEFAULT (datetime('now')),
            last_seen TEXT DEFAULT (datetime('now'))
        );

        CREATE INDEX IF NOT EXISTS idx_episodes_show ON episodes(show_id);
        CREATE INDEX IF NOT EXISTS idx_episodes_published ON episodes(published_at);
        CREATE INDEX IF NOT EXISTS idx_shows_source ON shows(source);
    """)
    conn.commit()
    return conn

def save_show(conn: sqlite3.Connection, show: PodcastShow):
    conn.execute("""
        INSERT OR REPLACE INTO shows
        (show_id, title, publisher, description, total_episodes, listen_score,
         language, country, rss_url, website, genres, latest_episode_date, source, updated_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
    """, (
        show.show_id, show.title, show.publisher, show.description,
        show.total_episodes, show.listen_score, show.language, show.country,
        show.rss_url, show.website, json.dumps(show.genres),
        show.latest_episode_date, show.source,
    ))
    conn.commit()

def save_episodes(conn: sqlite3.Connection, episodes: list[Episode]):
    conn.executemany("""
        INSERT OR IGNORE INTO episodes
        (episode_id, show_id, title, description, published_at,
         duration_seconds, audio_url, explicit, source)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, [
        (
            ep.episode_id, ep.show_id, ep.title, ep.description,
            ep.published_at, ep.duration_seconds, ep.audio_url,
            int(ep.explicit), ep.source,
        )
        for ep in episodes
    ])
    conn.commit()

def log_error(conn: sqlite3.Connection, resource_type: str, resource_id: str,
               error_type: str, message: str):
    conn.execute("""
        INSERT INTO scrape_errors (resource_type, resource_id, error_type, error_message)
        VALUES (?, ?, ?, ?)
        ON CONFLICT DO UPDATE SET
            attempt_count = attempt_count + 1,
            last_seen = datetime('now')
    """, (resource_type, resource_id, error_type, str(message)[:500]))
    conn.commit()

Complete Pipeline Example

Putting it all together into a multi-source data collection pipeline:

def run_podcast_collection_pipeline(
    queries: list[str],
    ln_api_key: str,
    spotify_client_id: str,
    spotify_client_secret: str,
    proxy_url: Optional[str] = None,
    db_path: str = "podcasts.db",
) -> dict:
    """
    Full pipeline: search across all three platforms, deduplicate,
    enrich with RSS data, and store to SQLite.
    """
    conn = setup_database(db_path)
    ln_client = build_listen_notes_client(ln_api_key)
    spotify = SpotifyClient(spotify_client_id, spotify_client_secret)
    proxies = [proxy_url] if proxy_url else None

    stats = {"ln_shows": 0, "itunes_shows": 0, "spotify_shows": 0,
             "rss_parsed": 0, "errors": 0, "total_episodes": 0}

    for query in queries:
        print(f"\n=== Processing query: {query} ===")

        # Listen Notes
        try:
            ln_shows, _ = search_listen_notes(query, ln_client)
            for show in ln_shows:
                save_show(conn, show)
            stats["ln_shows"] += len(ln_shows)
            print(f"  Listen Notes: {len(ln_shows)} shows")
        except Exception as e:
            print(f"  Listen Notes error: {e}")
            log_error(conn, "search_ln", query, type(e).__name__, str(e))
            stats["errors"] += 1

        time.sleep(1.5)

        # iTunes
        try:
            itunes_results = search_itunes_podcasts(query, limit=20)
            print(f"  iTunes: {len(itunes_results)} shows")
            stats["itunes_shows"] += len(itunes_results)

            # Parse RSS feeds for iTunes results
            for itunes_show in itunes_results[:5]:  # limit RSS parsing
                feed_url = itunes_show.get("feed_url")
                if not feed_url:
                    continue
                try:
                    rss_data = parse_podcast_rss(feed_url, max_episodes=20, proxy=proxy_url)
                    if rss_data["episodes"]:
                        stats["rss_parsed"] += 1
                        stats["total_episodes"] += len(rss_data["episodes"])
                except Exception as e:
                    log_error(conn, "rss_feed", feed_url, type(e).__name__, str(e))
                time.sleep(0.8)
        except Exception as e:
            print(f"  iTunes error: {e}")
            stats["errors"] += 1

        time.sleep(1.0)

        # Spotify
        try:
            spotify_results = search_spotify_podcasts(spotify, query, limit=20)
            print(f"  Spotify: {len(spotify_results)} shows")
            stats["spotify_shows"] += len(spotify_results)
        except Exception as e:
            print(f"  Spotify error: {e}")
            stats["errors"] += 1

        time.sleep(1.0)

    conn.close()
    print(f"\n=== Pipeline complete ===")
    print(json.dumps(stats, indent=2))
    return stats

ThorData Proxy Setup for Large-Scale RSS Monitoring

When scaling RSS collection to thousands of feeds, proxy rotation becomes essential. Different podcast hosting services (Libsyn, Buzzsprout, Podbean, Spreaker, Megaphone) each have their own rate limiting, and hitting them all from a single IP triggers blocks across the board.

ThorData's residential proxy network is designed exactly for this use case — large-scale distributed requests where IP diversity is the primary requirement. Their rotating residential proxy endpoints automatically assign a new residential IP for each connection, so each hosting provider sees what appears to be organic traffic from different users.

# ThorData proxy configuration for RSS monitoring
THORDATA_PROXY_ROTATING = "http://USERNAME:[email protected]:9001"
# For sticky sessions (same IP for a sequence of requests):
THORDATA_PROXY_STICKY = "http://USERNAME:PASSWORD-session-{session_id}@proxy.thordata.com:9001"

def make_rss_session(session_id: Optional[str] = None) -> httpx.Client:
    """
    Create an httpx client with ThorData proxy.
    Use sticky sessions when you need consistent IP across multiple requests
    to the same hosting provider.
    """
    if session_id:
        proxy_url = THORDATA_PROXY_STICKY.format(session_id=session_id)
    else:
        proxy_url = THORDATA_PROXY_ROTATING

    return httpx.Client(
        transport=httpx.HTTPTransport(proxy=proxy_url),
        headers={
            "User-Agent": random.choice(USER_AGENTS),
            "Accept": "application/rss+xml, application/xml, text/xml",
        },
        timeout=20,
        follow_redirects=True,
    )

def monitor_show_rss_feeds(
    shows_with_feeds: list[dict],
    use_proxy: bool = True,
) -> list[dict]:
    """
    Poll RSS feeds for a list of shows. Returns shows with updated episode data.
    Uses session-based proxy grouping by hosting provider.
    """
    # Group feeds by hosting provider to use sticky sessions
    by_host = {}
    for show in shows_with_feeds:
        feed_url = show.get("rss_url", "")
        if not feed_url:
            continue
        try:
            from urllib.parse import urlparse
            host = urlparse(feed_url).netloc
        except Exception:
            host = "unknown"
        by_host.setdefault(host, []).append(show)

    updated = []
    for host, host_shows in by_host.items():
        session_id = f"podcast-{hash(host) % 100000}" if use_proxy else None
        client = make_rss_session(session_id) if use_proxy else httpx.Client(timeout=20)

        for show in host_shows:
            feed_url = show.get("rss_url", "")
            try:
                result = parse_podcast_rss(feed_url, max_episodes=10)
                show["latest_episodes"] = result["episodes"]
                show["rss_status"] = "ok"
                updated.append(show)
            except Exception as e:
                show["rss_status"] = f"error: {e}"
                updated.append(show)
            time.sleep(1.0)

        client.close()

    return updated

Podcast Namespace Extensions

Modern podcast RSS feeds use the podcast: namespace for rich metadata beyond the basic iTunes tags:

def parse_podcast_namespace_extras(channel_el: ET.Element) -> dict:
    """
    Parse Podcast Namespace 2.0 extensions.
    See: https://podcastindex.org/namespace/1.0
    """
    ns = {
        "podcast": "https://podcastindex.org/namespace/1.0",
        "itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
    }

    extras = {}

    # Funding links
    funding_els = channel_el.findall("podcast:funding", ns)
    extras["funding"] = [
        {"url": el.get("url", ""), "text": el.text or ""}
        for el in funding_els
    ]

    # Transcripts (episode-level)
    transcript_els = channel_el.findall(".//podcast:transcript", ns)
    extras["has_transcripts"] = len(transcript_els) > 0

    # Value (crypto/streaming payments)
    value_el = channel_el.find("podcast:value", ns)
    extras["value_enabled"] = value_el is not None

    # GUID — podcast-level stable identifier
    guid_el = channel_el.find("podcast:guid", ns)
    extras["podcast_guid"] = guid_el.text.strip() if guid_el is not None and guid_el.text else ""

    # Location
    location_el = channel_el.find("podcast:location", ns)
    extras["location"] = location_el.text.strip() if location_el is not None and location_el.text else ""

    return extras

Podcast data collection is generally on solid legal ground, but there are nuances worth understanding:

RSS feeds are explicitly public. The entire podcast distribution system is built on the premise that RSS feeds are openly readable by any aggregator. Fetching and parsing RSS feeds is the same thing Apple Podcasts does — it's how the ecosystem works. There's no meaningful legal or ethical argument against this.

API terms matter. Listen Notes, Spotify, and iTunes Search all have terms of service. The Spotify Web API prohibits storing data beyond what's needed for your application and forbids using data to build competing products. Listen Notes has similar restrictions on their API data. Read the current terms before building anything commercial.

Personal data in reviews. Podcast reviews on iTunes include usernames and review text. These are public posts, but collecting them at scale for behavioral profiling or selling as a dataset raises privacy concerns. The technical capability to collect this data doesn't mean it's appropriate to do so in all contexts.

Server load. Responsible bot etiquette: identify yourself in your User-Agent header, respect robots.txt where present, and add delays between requests. For RSS feeds especially, aggressive polling wastes bandwidth and hosting costs that show creators pay for.

What's clearly fine: Building analytics tools, market research, competitive intelligence, recommendation systems, and content discovery applications from publicly accessible podcast data. Using the official APIs within their terms. Parsing RSS feeds for indexing or research.

Practical Notes on Reliability

Episode GUIDs. When tracking episodes across polling runs, use the <guid> element from RSS — not the title or URL, which publishers sometimes change after publication. GUIDs are supposed to be permanent and unique per episode.

Listen Score. Listen Notes' proprietary audience estimate runs 0-100. It correlates reasonably well with actual download numbers for shows in the 40-80 range. Below 30, the data gets noisy. Use it as a rough filter, not a precise metric.

Spotify episode limits. The Spotify API caps episode retrieval at 50 per request and seems to throttle pagination beyond ~500 episodes for very long-running shows. For comprehensive episode catalogs, cross-reference with the RSS feed.

Feed freshness. RSS feeds are sometimes served with aggressive caching headers. An RSS fetch might return a cached version that's 24+ hours old. Check the Last-Modified and ETag headers to detect stale cached responses and handle conditional requests properly.

Podcast data is one of the easier scraping targets because the ecosystem is built on open standards. The challenge is scale and cross-platform reconciliation, not anti-bot evasion. Start with Listen Notes for discovery, enrich with iTunes RSS data, and add Spotify if you need their specific metrics.