Scraping Podcast Directory Data: Listen Notes, iTunes, and Spotify (2026)
Scraping Podcast Directory Data: Listen Notes, iTunes, and Spotify (2026)
Podcast data is scattered across platforms that don't talk to each other. A show might have different stats on Spotify than Apple Podcasts, and neither of them shares listener numbers publicly. If you're doing market research, tracking competitors, or building a podcast analytics product, you need to pull from multiple directories and stitch the data together yourself.
This guide covers three primary sources — Listen Notes API for search and metadata, iTunes/Apple Podcasts for RSS-based episode data, and Spotify's podcast endpoints — with working Python code for each. We also cover anti-detection strategies, proxy setup, error handling for production workloads, and the legal landscape around podcast data collection.
What Data Exists Where
Each platform exposes different slices of the picture:
- Listen Notes — podcast-level metadata, episode listings, genre rankings, estimated audience size (Listen Score), search across 3M+ podcasts
- iTunes / Apple Podcasts — RSS feed URLs, category rankings, ratings/reviews (by country), episode lists via RSS
- Spotify — episode counts, show descriptions, episode-level data including duration and release date. Full listening stats are only available to show owners via Spotify for Podcasters
None of them give you actual download or listener numbers. Those are locked behind each platform's analytics dashboard. What you can collect is everything else — and it's enough to build useful competitive intelligence. Podcast advertising rates are tied to download estimates, and the directional signal from platform-visible metrics is often enough for market sizing work.
Podcast hosting platforms also expose RSS feeds directly, which are the canonical data source for episode metadata. Hosts like Anchor (now Spotify for Podcasters), Buzzsprout, Libsyn, Transistor, and Podbean all publish RSS feeds that any aggregator can consume. The RSS standard for podcasts, extended by the iTunes namespace, gives you show and episode metadata that's often richer than what the directories expose.
Understanding the Podcast Data Ecosystem
Before diving into code, it helps to understand the three-layer architecture of podcast data:
Layer 1: RSS feeds. The foundational data format. Each podcast has a canonical RSS feed URL. The iTunes namespace (http://www.itunes.com/dtds/podcast-1.0.dtd) adds fields like episode type, explicit flag, season/episode numbers, and chapter data. The newer podcast: namespace adds transcripts, chapters, funding links, and value-for-value payment tags. RSS feeds are the most authoritative source for episode metadata since they're controlled by the show creator.
Layer 2: Aggregator directories. Apple Podcasts, Spotify, Pocket Casts, and others index RSS feeds and add their own data on top — ratings, listener counts (internal), editorial placements, and genre classifications. These directories give you discovery data (rankings, popularity) that's not in the raw RSS feeds.
Layer 3: Analytics platforms. Listen Notes, Podchaser, and similar tools build their own derived metrics (Listen Score, etc.) by aggregating data across directories and estimating audience sizes from proxy signals. These are useful for market research but should be treated as estimates, not hard numbers.
Listen Notes API
Listen Notes offers the most comprehensive podcast search API. The free tier gives you 300 requests per month — limited, but enough for targeted research. Paid plans start at $20/month for 10,000 requests.
import httpx
import time
import json
import sqlite3
from dataclasses import dataclass, asdict, field
from typing import Optional
@dataclass
class PodcastShow:
show_id: str
title: str
publisher: str
description: str
total_episodes: int
listen_score: int
listen_score_global_rank: str
language: str
country: str
rss_url: str
website: str
genres: list
latest_episode_date: str
earliest_pub_date: str
update_frequency: str
source: str = "listennotes"
@dataclass
class Episode:
episode_id: str
title: str
description: str
published_at: str
duration_seconds: int
audio_url: str
show_title: str
show_id: str
explicit: bool = False
source: str = "listennotes"
def build_listen_notes_client(api_key: str) -> httpx.Client:
return httpx.Client(
base_url="https://listen-api.listennotes.com/api/v2",
headers={
"X-ListenAPI-Key": api_key,
"Accept": "application/json",
},
timeout=30,
)
def search_listen_notes(
query: str,
client: httpx.Client,
offset: int = 0,
sort: str = "0",
language: str = "English",
safe_mode: int = 0,
) -> tuple[list[PodcastShow], int]:
"""
Search Listen Notes for podcasts matching a query.
Returns (shows, total_count).
sort: 0=relevance, 1=recent episode first.
"""
resp = client.get(
"/search",
params={
"q": query,
"type": "podcast",
"offset": offset,
"sort_by_date": sort,
"language": language,
"safe_mode": safe_mode,
},
)
resp.raise_for_status()
data = resp.json()
total = data.get("total", 0)
shows = []
for result in data.get("results", []):
shows.append(PodcastShow(
show_id=result["id"],
title=result.get("title_original", ""),
publisher=result.get("publisher_original", ""),
description=result.get("description_original", "")[:500],
total_episodes=result.get("total_episodes", 0),
listen_score=result.get("listen_score") or 0,
listen_score_global_rank=result.get("listen_score_global_rank", ""),
language=result.get("language", ""),
country=result.get("country", ""),
rss_url=result.get("rss", ""),
website=result.get("website", ""),
genres=[g if isinstance(g, str) else g.get("name", "") for g in result.get("genre_ids", [])],
latest_episode_date=str(result.get("latest_pub_date_ms", "")),
earliest_pub_date=str(result.get("earliest_pub_date_ms", "")),
update_frequency=result.get("update_frequency_hours", ""),
))
return shows, total
def paginate_search(
query: str,
client: httpx.Client,
max_results: int = 100,
delay: float = 1.5,
) -> list[PodcastShow]:
"""Paginate through Listen Notes search results."""
all_shows = []
offset = 0
page_size = 10 # Listen Notes free tier returns 10 per page
while len(all_shows) < max_results:
shows, total = search_listen_notes(query, client, offset=offset)
if not shows:
break
all_shows.extend(shows)
if offset + page_size >= min(total, max_results):
break
offset += page_size
time.sleep(delay)
return all_shows[:max_results]
Fetching Episodes from Listen Notes
Once you have a show ID, pull its episode list:
def get_show_episodes(
show_id: str,
client: httpx.Client,
max_episodes: int = 100,
delay: float = 1.2,
) -> list[Episode]:
"""
Fetch episodes for a specific podcast show.
Uses next_episode_pub_date for cursor-based pagination.
"""
episodes = []
next_date = None
# First request: get show info
resp = client.get(f"/podcasts/{show_id}", params={"sort": "recent_first"})
resp.raise_for_status()
data = resp.json()
show_title = data.get("title", "")
for ep in data.get("episodes", []):
episodes.append(_parse_ln_episode(ep, show_title, show_id))
next_date = data.get("next_episode_pub_date")
# Paginate remaining
while next_date and len(episodes) < max_episodes:
time.sleep(delay)
resp = client.get(
f"/podcasts/{show_id}",
params={"sort": "recent_first", "next_episode_pub_date": next_date},
)
resp.raise_for_status()
data = resp.json()
for ep in data.get("episodes", []):
episodes.append(_parse_ln_episode(ep, show_title, show_id))
next_date = data.get("next_episode_pub_date")
if not data.get("episodes"):
break
return episodes[:max_episodes]
def _parse_ln_episode(ep: dict, show_title: str, show_id: str) -> Episode:
return Episode(
episode_id=ep["id"],
title=ep.get("title", ""),
description=ep.get("description", "")[:500],
published_at=str(ep.get("pub_date_ms", "")),
duration_seconds=ep.get("audio_length_sec", 0),
audio_url=ep.get("audio", ""),
show_title=show_title,
show_id=show_id,
explicit=ep.get("explicit_content", False),
)
def get_genre_rankings(client: httpx.Client, genre_id: int, region: str = "us") -> list[dict]:
"""
Get best podcasts for a specific genre.
Popular genre IDs: 67=Technology, 93=Business, 77=Sports, 133=Comedy.
"""
resp = client.get(
"/best_podcasts",
params={
"genre_id": genre_id,
"region": region,
"safe_mode": 0,
},
)
resp.raise_for_status()
data = resp.json()
return data.get("podcasts", [])
iTunes Search and RSS Parsing
Apple Podcasts data comes from two places: the iTunes Search API for discovery, and the RSS feed for episode-level data. No API key needed for either.
import xml.etree.ElementTree as ET
from urllib.parse import quote_plus
def search_itunes_podcasts(
query: str,
limit: int = 25,
country: str = "US",
explicit: str = "Yes",
) -> list[dict]:
"""
Search Apple Podcasts / iTunes for shows.
No API key required. Rate limit: ~20 requests/minute.
"""
resp = httpx.get(
"https://itunes.apple.com/search",
params={
"term": query,
"media": "podcast",
"limit": limit,
"country": country,
"explicit": explicit,
},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
results = []
for item in data.get("results", []):
results.append({
"itunes_id": item.get("collectionId"),
"name": item.get("collectionName", ""),
"artist": item.get("artistName", ""),
"feed_url": item.get("feedUrl", ""),
"track_count": item.get("trackCount", 0),
"genres": item.get("genres", []),
"primary_genre": item.get("primaryGenreName", ""),
"artwork_url_30": item.get("artworkUrl30", ""),
"artwork_url_100": item.get("artworkUrl100", ""),
"artwork_url_600": item.get("artworkUrl600", ""),
"release_date": item.get("releaseDate", ""),
"country": item.get("country", ""),
"content_advisory": item.get("contentAdvisoryRating", ""),
})
return results
def get_itunes_top_charts(
genre_id: int = 26,
limit: int = 100,
country: str = "us",
) -> list[dict]:
"""
Fetch iTunes podcast top charts.
genre_id 26 = All Podcasts. Others: 1301=Arts, 1321=Business, etc.
"""
url = f"https://itunes.apple.com/{country}/rss/toppodcasts/limit={limit}/genre={genre_id}/json"
resp = httpx.get(url, timeout=15)
resp.raise_for_status()
data = resp.json()
feed = data.get("feed", {})
entries = feed.get("entry", [])
results = []
for i, entry in enumerate(entries):
results.append({
"rank": i + 1,
"name": entry.get("im:name", {}).get("label", ""),
"artist": entry.get("im:artist", {}).get("label", ""),
"itunes_id": entry.get("id", {}).get("attributes", {}).get("im:id", ""),
"genre": entry.get("category", {}).get("attributes", {}).get("term", ""),
"artwork": entry.get("im:image", [{}])[-1].get("label", ""),
})
return results
def parse_podcast_rss(
feed_url: str,
max_episodes: int = 50,
proxy: str = None,
timeout: float = 20.0,
) -> dict:
"""
Parse a podcast RSS feed for show info and episode data.
Returns dict with 'show' and 'episodes' keys.
"""
transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
with httpx.Client(
transport=transport,
timeout=timeout,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (compatible; PodcastBot/1.0; +https://example.com/bot)",
"Accept": "application/rss+xml, application/xml, text/xml, application/atom+xml",
},
) as client:
resp = client.get(feed_url)
resp.raise_for_status()
root = ET.fromstring(resp.content)
ns = {
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
"podcast": "https://podcastindex.org/namespace/1.0",
"atom": "http://www.w3.org/2005/Atom",
"content": "http://purl.org/rss/1.0/modules/content/",
}
channel = root.find("channel")
if channel is None:
return {"show": {}, "episodes": []}
# Show-level metadata
show = {
"title": channel.findtext("title", "").strip(),
"link": channel.findtext("link", "").strip(),
"description": (channel.findtext("description") or "")[:500].strip(),
"language": channel.findtext("language", "").strip(),
"author": channel.findtext("itunes:author", namespaces=ns, default="").strip(),
"owner_email": "",
"category": [],
"explicit": channel.findtext("itunes:explicit", namespaces=ns, default="").strip(),
"image": "",
"feed_url": feed_url,
}
owner = channel.find("itunes:owner", ns)
if owner is not None:
email_el = owner.find("itunes:email", ns)
show["owner_email"] = email_el.text.strip() if email_el is not None and email_el.text else ""
cats = channel.findall("itunes:category", ns)
show["category"] = [c.get("text", "") for c in cats]
img = channel.find("itunes:image", ns)
if img is not None:
show["image"] = img.get("href", "")
# Episodes
episodes = []
for item in channel.findall("item")[:max_episodes]:
enclosure = item.find("enclosure")
duration_el = item.find("itunes:duration", ns)
ep_type_el = item.find("itunes:episodeType", ns)
season_el = item.find("itunes:season", ns)
episode_el = item.find("itunes:episode", ns)
guid_el = item.find("guid")
episodes.append({
"guid": (guid_el.text or "").strip() if guid_el is not None else "",
"title": (item.findtext("title") or "").strip(),
"published": (item.findtext("pubDate") or "").strip(),
"description": (item.findtext("description") or "")[:500].strip(),
"duration": duration_el.text.strip() if duration_el is not None and duration_el.text else "",
"audio_url": enclosure.get("url", "") if enclosure is not None else "",
"audio_size_bytes": enclosure.get("length", "") if enclosure is not None else "",
"audio_type": enclosure.get("type", "") if enclosure is not None else "",
"episode_type": ep_type_el.text.strip() if ep_type_el is not None and ep_type_el.text else "full",
"season": season_el.text.strip() if season_el is not None and season_el.text else "",
"episode_number": episode_el.text.strip() if episode_el is not None and episode_el.text else "",
"explicit": (item.findtext("itunes:explicit", namespaces=ns) or "").strip(),
})
return {"show": show, "episodes": episodes}
Spotify Podcast Data
Spotify doesn't offer a dedicated podcast scraping API, but their Web API includes podcast endpoints. You need an OAuth token via the client credentials flow:
import base64
import threading
class SpotifyClient:
"""
Spotify API client with automatic token refresh.
Uses client credentials flow — no user login required.
"""
BASE = "https://api.spotify.com/v1"
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
self._token: Optional[str] = None
self._token_expires: float = 0
self._lock = threading.Lock()
def _refresh_token(self):
auth = base64.b64encode(f"{self.client_id}:{self.client_secret}".encode()).decode()
resp = httpx.post(
"https://accounts.spotify.com/api/token",
data={"grant_type": "client_credentials"},
headers={
"Authorization": f"Basic {auth}",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
self._token = data["access_token"]
self._token_expires = time.time() + data["expires_in"] - 60 # refresh 60s early
def _get_token(self) -> str:
with self._lock:
if not self._token or time.time() >= self._token_expires:
self._refresh_token()
return self._token
def get(self, path: str, params: dict = None) -> dict:
token = self._get_token()
resp = httpx.get(
f"{self.BASE}/{path.lstrip('/')}",
params=params,
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
},
timeout=20,
)
resp.raise_for_status()
return resp.json()
def search_spotify_podcasts(
client: SpotifyClient,
query: str,
limit: int = 20,
market: str = "US",
) -> list[dict]:
"""Search Spotify for podcast shows."""
data = client.get(
"/search",
params={"q": query, "type": "show", "market": market, "limit": min(limit, 50)},
)
shows = []
for item in data.get("shows", {}).get("items", []):
if item is None:
continue
shows.append({
"spotify_id": item["id"],
"name": item["name"],
"publisher": item.get("publisher", ""),
"total_episodes": item.get("total_episodes", 0),
"description": item.get("description", "")[:500],
"languages": item.get("languages", []),
"explicit": item.get("explicit", False),
"media_type": item.get("media_type", "audio"),
"external_url": item.get("external_urls", {}).get("spotify", ""),
"html_description": item.get("html_description", "")[:200],
})
return shows
def get_spotify_show_episodes(
client: SpotifyClient,
show_id: str,
market: str = "US",
max_episodes: int = 100,
delay: float = 0.8,
) -> list[dict]:
"""
Get all episodes for a Spotify podcast show.
Handles pagination via next cursor.
"""
episodes = []
offset = 0
limit = 50
while len(episodes) < max_episodes:
data = client.get(
f"/shows/{show_id}/episodes",
params={"market": market, "limit": limit, "offset": offset},
)
items = data.get("items", [])
if not items:
break
for ep in items:
if ep is None:
continue
episodes.append({
"episode_id": ep["id"],
"name": ep["name"],
"description": ep.get("description", "")[:500],
"release_date": ep.get("release_date", ""),
"release_date_precision": ep.get("release_date_precision", ""),
"duration_ms": ep.get("duration_ms", 0),
"duration_minutes": ep.get("duration_ms", 0) // 60000,
"language": ep.get("language", ""),
"languages": ep.get("languages", []),
"explicit": ep.get("explicit", False),
"type": ep.get("type", "episode"),
"external_url": ep.get("external_urls", {}).get("spotify", ""),
})
if not data.get("next"):
break
offset += limit
time.sleep(delay)
return episodes[:max_episodes]
def get_spotify_show_details(client: SpotifyClient, show_id: str, market: str = "US") -> dict:
"""Get complete show metadata including all available fields."""
return client.get(f"/shows/{show_id}", params={"market": market})
Cross-Platform Matching and Data Deduplication
The hardest part of podcast aggregation is matching the same show across platforms. There's no universal podcast ID.
from difflib import SequenceMatcher
import re
import unicodedata
def normalize_name(name: str) -> str:
"""Normalize a podcast name for comparison."""
name = unicodedata.normalize("NFKC", name.lower())
name = re.sub(r"[^\w\s]", "", name)
name = re.sub(r"\s+", " ", name).strip()
# Remove common suffixes that vary across platforms
for suffix in ["podcast", "show", "official", "the", "a"]:
name = re.sub(rf"\b{suffix}\b", "", name).strip()
return name
def match_shows(show_a: dict, show_b: dict) -> float:
"""
Score how likely two show records represent the same podcast.
Returns 0.0 to 1.0.
"""
# RSS URL match is definitive
rss_a = show_a.get("rss_url", "") or show_a.get("feed_url", "")
rss_b = show_b.get("rss_url", "") or show_b.get("feed_url", "")
if rss_a and rss_b and rss_a == rss_b:
return 1.0
# Normalize names for comparison
title_a = normalize_name(show_a.get("title", "") or show_a.get("name", ""))
title_b = normalize_name(show_b.get("title", "") or show_b.get("name", ""))
title_sim = SequenceMatcher(None, title_a, title_b).ratio()
# Publisher / artist comparison
pub_a = normalize_name(show_a.get("publisher", "") or show_a.get("artist", ""))
pub_b = normalize_name(show_b.get("publisher", "") or show_b.get("artist", ""))
pub_sim = SequenceMatcher(None, pub_a, pub_b).ratio() if pub_a and pub_b else 0.5
# Episode count similarity as a weak signal
count_a = show_a.get("total_episodes", 0) or show_a.get("track_count", 0)
count_b = show_b.get("total_episodes", 0) or show_b.get("track_count", 0)
if count_a and count_b:
max_count = max(count_a, count_b)
min_count = min(count_a, count_b)
count_sim = min_count / max_count if max_count else 0
else:
count_sim = 0.5
# Weighted score: title is most important
return (title_sim * 0.65) + (pub_sim * 0.25) + (count_sim * 0.10)
def deduplicate_shows(shows: list[dict], threshold: float = 0.85) -> list[dict]:
"""Remove near-duplicate shows from a mixed-platform list."""
unique = []
for show in shows:
is_duplicate = False
for existing in unique:
if match_shows(show, existing) >= threshold:
is_duplicate = True
break
if not is_duplicate:
unique.append(show)
return unique
Anti-Bot Measures and Rate Limiting
The APIs covered here are mostly well-behaved — you authenticate properly and get structured data back. The problems start when you go beyond the official endpoints or scale up.
RSS feed hosting varies wildly. Some podcast hosts (Libsyn, Buzzsprout, Anchor) serve RSS feeds without issue. Others are behind Cloudflare or rate-limit aggressively. When scraping hundreds of RSS feeds, expect 10-15% to fail on any given run.
iTunes and Spotify rate limits. iTunes Search API allows roughly 20 requests per minute before returning 429s. Spotify's API is more generous (hundreds per minute with authenticated requests) but will throttle burst traffic. Always implement exponential backoff.
Scaling RSS collection. If you're monitoring thousands of shows by polling their RSS feeds, you need IP diversity. A single IP cycling through 5,000 feeds will get blocked by at least a dozen different hosting providers. ThorData's rotating residential proxies solve this — each RSS feed request exits from a different residential IP, so no single hosting provider sees suspicious volume from one address.
import random
from typing import Optional
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64; rv:125.0) Gecko/20100101 Firefox/125.0",
"Mozilla/5.0 (compatible; PodcastAggregator/2.0; +https://example.com/bot)",
]
def fetch_rss_with_retry(
feed_url: str,
proxies: Optional[list[str]] = None,
max_attempts: int = 3,
base_delay: float = 2.0,
) -> Optional[str]:
"""
Fetch RSS feed with proxy rotation and exponential backoff retry.
Returns raw XML string or None on failure.
"""
for i in range(max_attempts):
proxy = random.choice(proxies) if proxies else None
transport = httpx.HTTPTransport(proxy=proxy) if proxy else None
ua = random.choice(USER_AGENTS)
try:
with httpx.Client(
transport=transport,
timeout=15,
follow_redirects=True,
) as client:
resp = client.get(
feed_url,
headers={
"User-Agent": ua,
"Accept": "application/rss+xml, application/xml, text/xml, */*",
"Accept-Encoding": "gzip, deflate, br",
},
)
if resp.status_code == 200:
content_type = resp.headers.get("content-type", "")
if "xml" in content_type or "rss" in content_type or len(resp.text) > 500:
return resp.text
elif resp.status_code == 429:
wait = float(resp.headers.get("Retry-After", base_delay * (2 ** i)))
print(f"Rate limited on {feed_url}, waiting {wait:.1f}s")
time.sleep(wait)
elif resp.status_code in (403, 406):
# Rotate user agent on next attempt
time.sleep(base_delay * (i + 1))
else:
time.sleep(base_delay * (2 ** i))
except httpx.TimeoutException:
print(f"Timeout on {feed_url} (attempt {i + 1})")
time.sleep(base_delay * (2 ** i))
except httpx.ConnectError as e:
print(f"Connection error on {feed_url}: {e}")
time.sleep(base_delay * (i + 1))
except Exception as e:
print(f"Unexpected error on {feed_url}: {e}")
break
return None
def batch_fetch_rss(
feed_urls: list[str],
proxies: Optional[list[str]] = None,
delay: float = 0.8,
max_attempts: int = 3,
) -> dict[str, Optional[dict]]:
"""
Fetch and parse multiple RSS feeds with rate limiting.
Returns dict mapping feed_url -> parsed data (or None on failure).
"""
results = {}
for i, url in enumerate(feed_urls):
raw = fetch_rss_with_retry(url, proxies=proxies, max_attempts=max_attempts)
if raw:
try:
# Re-use the parse function but feed raw content
root = ET.fromstring(raw)
ns = {"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd"}
channel = root.find("channel")
if channel is not None:
results[url] = {
"title": channel.findtext("title", ""),
"episode_count": len(channel.findall("item")),
}
else:
results[url] = None
except ET.ParseError as e:
print(f"XML parse error for {url}: {e}")
results[url] = None
else:
results[url] = None
if (i + 1) % 10 == 0:
print(f"Progress: {i + 1}/{len(feed_urls)} feeds processed")
time.sleep(delay + random.uniform(0, 0.3))
return results
Production Error Handling and Storage
For a production podcast monitoring system, you need robust error handling and persistent storage:
def setup_database(db_path: str = "podcasts.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.executescript("""
CREATE TABLE IF NOT EXISTS shows (
show_id TEXT PRIMARY KEY,
title TEXT,
publisher TEXT,
description TEXT,
total_episodes INTEGER,
listen_score INTEGER,
language TEXT,
country TEXT,
rss_url TEXT,
website TEXT,
genres TEXT,
latest_episode_date TEXT,
source TEXT,
scraped_at TEXT DEFAULT (datetime('now')),
updated_at TEXT
);
CREATE TABLE IF NOT EXISTS episodes (
episode_id TEXT PRIMARY KEY,
show_id TEXT,
title TEXT,
description TEXT,
published_at TEXT,
duration_seconds INTEGER,
audio_url TEXT,
explicit INTEGER DEFAULT 0,
source TEXT,
scraped_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (show_id) REFERENCES shows(show_id)
);
CREATE TABLE IF NOT EXISTS scrape_errors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
resource_type TEXT,
resource_id TEXT,
error_type TEXT,
error_message TEXT,
attempt_count INTEGER DEFAULT 1,
first_seen TEXT DEFAULT (datetime('now')),
last_seen TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_episodes_show ON episodes(show_id);
CREATE INDEX IF NOT EXISTS idx_episodes_published ON episodes(published_at);
CREATE INDEX IF NOT EXISTS idx_shows_source ON shows(source);
""")
conn.commit()
return conn
def save_show(conn: sqlite3.Connection, show: PodcastShow):
conn.execute("""
INSERT OR REPLACE INTO shows
(show_id, title, publisher, description, total_episodes, listen_score,
language, country, rss_url, website, genres, latest_episode_date, source, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
""", (
show.show_id, show.title, show.publisher, show.description,
show.total_episodes, show.listen_score, show.language, show.country,
show.rss_url, show.website, json.dumps(show.genres),
show.latest_episode_date, show.source,
))
conn.commit()
def save_episodes(conn: sqlite3.Connection, episodes: list[Episode]):
conn.executemany("""
INSERT OR IGNORE INTO episodes
(episode_id, show_id, title, description, published_at,
duration_seconds, audio_url, explicit, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(
ep.episode_id, ep.show_id, ep.title, ep.description,
ep.published_at, ep.duration_seconds, ep.audio_url,
int(ep.explicit), ep.source,
)
for ep in episodes
])
conn.commit()
def log_error(conn: sqlite3.Connection, resource_type: str, resource_id: str,
error_type: str, message: str):
conn.execute("""
INSERT INTO scrape_errors (resource_type, resource_id, error_type, error_message)
VALUES (?, ?, ?, ?)
ON CONFLICT DO UPDATE SET
attempt_count = attempt_count + 1,
last_seen = datetime('now')
""", (resource_type, resource_id, error_type, str(message)[:500]))
conn.commit()
Complete Pipeline Example
Putting it all together into a multi-source data collection pipeline:
def run_podcast_collection_pipeline(
queries: list[str],
ln_api_key: str,
spotify_client_id: str,
spotify_client_secret: str,
proxy_url: Optional[str] = None,
db_path: str = "podcasts.db",
) -> dict:
"""
Full pipeline: search across all three platforms, deduplicate,
enrich with RSS data, and store to SQLite.
"""
conn = setup_database(db_path)
ln_client = build_listen_notes_client(ln_api_key)
spotify = SpotifyClient(spotify_client_id, spotify_client_secret)
proxies = [proxy_url] if proxy_url else None
stats = {"ln_shows": 0, "itunes_shows": 0, "spotify_shows": 0,
"rss_parsed": 0, "errors": 0, "total_episodes": 0}
for query in queries:
print(f"\n=== Processing query: {query} ===")
# Listen Notes
try:
ln_shows, _ = search_listen_notes(query, ln_client)
for show in ln_shows:
save_show(conn, show)
stats["ln_shows"] += len(ln_shows)
print(f" Listen Notes: {len(ln_shows)} shows")
except Exception as e:
print(f" Listen Notes error: {e}")
log_error(conn, "search_ln", query, type(e).__name__, str(e))
stats["errors"] += 1
time.sleep(1.5)
# iTunes
try:
itunes_results = search_itunes_podcasts(query, limit=20)
print(f" iTunes: {len(itunes_results)} shows")
stats["itunes_shows"] += len(itunes_results)
# Parse RSS feeds for iTunes results
for itunes_show in itunes_results[:5]: # limit RSS parsing
feed_url = itunes_show.get("feed_url")
if not feed_url:
continue
try:
rss_data = parse_podcast_rss(feed_url, max_episodes=20, proxy=proxy_url)
if rss_data["episodes"]:
stats["rss_parsed"] += 1
stats["total_episodes"] += len(rss_data["episodes"])
except Exception as e:
log_error(conn, "rss_feed", feed_url, type(e).__name__, str(e))
time.sleep(0.8)
except Exception as e:
print(f" iTunes error: {e}")
stats["errors"] += 1
time.sleep(1.0)
# Spotify
try:
spotify_results = search_spotify_podcasts(spotify, query, limit=20)
print(f" Spotify: {len(spotify_results)} shows")
stats["spotify_shows"] += len(spotify_results)
except Exception as e:
print(f" Spotify error: {e}")
stats["errors"] += 1
time.sleep(1.0)
conn.close()
print(f"\n=== Pipeline complete ===")
print(json.dumps(stats, indent=2))
return stats
ThorData Proxy Setup for Large-Scale RSS Monitoring
When scaling RSS collection to thousands of feeds, proxy rotation becomes essential. Different podcast hosting services (Libsyn, Buzzsprout, Podbean, Spreaker, Megaphone) each have their own rate limiting, and hitting them all from a single IP triggers blocks across the board.
ThorData's residential proxy network is designed exactly for this use case — large-scale distributed requests where IP diversity is the primary requirement. Their rotating residential proxy endpoints automatically assign a new residential IP for each connection, so each hosting provider sees what appears to be organic traffic from different users.
# ThorData proxy configuration for RSS monitoring
THORDATA_PROXY_ROTATING = "http://USERNAME:[email protected]:9001"
# For sticky sessions (same IP for a sequence of requests):
THORDATA_PROXY_STICKY = "http://USERNAME:PASSWORD-session-{session_id}@proxy.thordata.com:9001"
def make_rss_session(session_id: Optional[str] = None) -> httpx.Client:
"""
Create an httpx client with ThorData proxy.
Use sticky sessions when you need consistent IP across multiple requests
to the same hosting provider.
"""
if session_id:
proxy_url = THORDATA_PROXY_STICKY.format(session_id=session_id)
else:
proxy_url = THORDATA_PROXY_ROTATING
return httpx.Client(
transport=httpx.HTTPTransport(proxy=proxy_url),
headers={
"User-Agent": random.choice(USER_AGENTS),
"Accept": "application/rss+xml, application/xml, text/xml",
},
timeout=20,
follow_redirects=True,
)
def monitor_show_rss_feeds(
shows_with_feeds: list[dict],
use_proxy: bool = True,
) -> list[dict]:
"""
Poll RSS feeds for a list of shows. Returns shows with updated episode data.
Uses session-based proxy grouping by hosting provider.
"""
# Group feeds by hosting provider to use sticky sessions
by_host = {}
for show in shows_with_feeds:
feed_url = show.get("rss_url", "")
if not feed_url:
continue
try:
from urllib.parse import urlparse
host = urlparse(feed_url).netloc
except Exception:
host = "unknown"
by_host.setdefault(host, []).append(show)
updated = []
for host, host_shows in by_host.items():
session_id = f"podcast-{hash(host) % 100000}" if use_proxy else None
client = make_rss_session(session_id) if use_proxy else httpx.Client(timeout=20)
for show in host_shows:
feed_url = show.get("rss_url", "")
try:
result = parse_podcast_rss(feed_url, max_episodes=10)
show["latest_episodes"] = result["episodes"]
show["rss_status"] = "ok"
updated.append(show)
except Exception as e:
show["rss_status"] = f"error: {e}"
updated.append(show)
time.sleep(1.0)
client.close()
return updated
Podcast Namespace Extensions
Modern podcast RSS feeds use the podcast: namespace for rich metadata beyond the basic iTunes tags:
def parse_podcast_namespace_extras(channel_el: ET.Element) -> dict:
"""
Parse Podcast Namespace 2.0 extensions.
See: https://podcastindex.org/namespace/1.0
"""
ns = {
"podcast": "https://podcastindex.org/namespace/1.0",
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
}
extras = {}
# Funding links
funding_els = channel_el.findall("podcast:funding", ns)
extras["funding"] = [
{"url": el.get("url", ""), "text": el.text or ""}
for el in funding_els
]
# Transcripts (episode-level)
transcript_els = channel_el.findall(".//podcast:transcript", ns)
extras["has_transcripts"] = len(transcript_els) > 0
# Value (crypto/streaming payments)
value_el = channel_el.find("podcast:value", ns)
extras["value_enabled"] = value_el is not None
# GUID — podcast-level stable identifier
guid_el = channel_el.find("podcast:guid", ns)
extras["podcast_guid"] = guid_el.text.strip() if guid_el is not None and guid_el.text else ""
# Location
location_el = channel_el.find("podcast:location", ns)
extras["location"] = location_el.text.strip() if location_el is not None and location_el.text else ""
return extras
Legal and Ethical Considerations
Podcast data collection is generally on solid legal ground, but there are nuances worth understanding:
RSS feeds are explicitly public. The entire podcast distribution system is built on the premise that RSS feeds are openly readable by any aggregator. Fetching and parsing RSS feeds is the same thing Apple Podcasts does — it's how the ecosystem works. There's no meaningful legal or ethical argument against this.
API terms matter. Listen Notes, Spotify, and iTunes Search all have terms of service. The Spotify Web API prohibits storing data beyond what's needed for your application and forbids using data to build competing products. Listen Notes has similar restrictions on their API data. Read the current terms before building anything commercial.
Personal data in reviews. Podcast reviews on iTunes include usernames and review text. These are public posts, but collecting them at scale for behavioral profiling or selling as a dataset raises privacy concerns. The technical capability to collect this data doesn't mean it's appropriate to do so in all contexts.
Server load. Responsible bot etiquette: identify yourself in your User-Agent header, respect robots.txt where present, and add delays between requests. For RSS feeds especially, aggressive polling wastes bandwidth and hosting costs that show creators pay for.
What's clearly fine: Building analytics tools, market research, competitive intelligence, recommendation systems, and content discovery applications from publicly accessible podcast data. Using the official APIs within their terms. Parsing RSS feeds for indexing or research.
Practical Notes on Reliability
Episode GUIDs. When tracking episodes across polling runs, use the <guid> element from RSS — not the title or URL, which publishers sometimes change after publication. GUIDs are supposed to be permanent and unique per episode.
Listen Score. Listen Notes' proprietary audience estimate runs 0-100. It correlates reasonably well with actual download numbers for shows in the 40-80 range. Below 30, the data gets noisy. Use it as a rough filter, not a precise metric.
Spotify episode limits. The Spotify API caps episode retrieval at 50 per request and seems to throttle pagination beyond ~500 episodes for very long-running shows. For comprehensive episode catalogs, cross-reference with the RSS feed.
Feed freshness. RSS feeds are sometimes served with aggressive caching headers. An RSS fetch might return a cached version that's 24+ hours old. Check the Last-Modified and ETag headers to detect stale cached responses and handle conditional requests properly.
Podcast data is one of the easier scraping targets because the ecosystem is built on open standards. The challenge is scale and cross-platform reconciliation, not anti-bot evasion. Start with Listen Notes for discovery, enrich with iTunes RSS data, and add Spotify if you need their specific metrics.