Scraping Apple Podcasts Data: Charts, Episodes and Reviews (2026)
Scraping Apple Podcasts Data: Charts, Episodes and Reviews (2026)
Apple Podcasts is still the dominant directory for podcast discovery, and the data it holds is genuinely useful — chart rankings by category and country, episode-level metadata, user reviews with star ratings, and RSS feed URLs that unlock the full episode history. Whether you're doing competitive research, building a podcast monitoring tool, or aggregating content for an analytics product, Apple is a primary source.
The good news: there's a legitimate Search API. The bad news: it doesn't expose charts or reviews, so those require scraping. Here's how to pull all of it.
What Data Is Available
The iTunes Search API gives you structured podcast metadata — show name, author, genre, episode count, artwork, feed URL, and country-specific store IDs. The feed URL is the most valuable piece because it points to the show's RSS feed, which contains every episode with title, description, duration, publish date, and enclosure URL.
Beyond the API, the Apple Podcasts web interface (podcasts.apple.com) exposes:
- Chart rankings by category and country — top shows, top episodes
- User reviews including rating text, star scores, and reviewer usernames
- Ratings summary (average stars, total ratings count)
- Show metadata in more detail than the API
None of the chart/review data is in the iTunes Search API. Apple deliberately keeps chart rankings out of programmatic access to prevent competitive gaming of rankings.
Genre ID Reference
Apple organizes podcasts into categories with numeric IDs. You'll need these for chart scraping:
| ID | Category |
|---|---|
| 26 | All Podcasts |
| 1301 | Arts |
| 1303 | Comedy |
| 1304 | Education |
| 1305 | Kids & Family |
| 1307 | Health & Fitness |
| 1309 | TV & Film |
| 1310 | Music |
| 1311 | News |
| 1314 | Religion & Spirituality |
| 1315 | Science |
| 1316 | Society & Culture |
| 1318 | Sports |
| 1320 | True Crime |
| 1321 | History |
| 1323 | Technology |
| 1324 | Business |
| 1325 | Fiction |
| 1326 | Leisure |
| 1327 | Government |
Using the iTunes Search API
The Search API is the right starting point. It's fast, returns JSON, and gives you feed URLs for RSS parsing.
import httpx
import asyncio
import json
import time
from typing import Optional
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
"Accept": "application/json, text/html",
}
async def search_podcasts(
term: str,
limit: int = 50,
country: str = "us",
proxy: Optional[str] = None,
) -> list[dict]:
"""Search iTunes for podcasts matching term."""
url = "https://itunes.apple.com/search"
params = {
"term": term,
"media": "podcast",
"entity": "podcast",
"limit": min(limit, 200),
"country": country,
"lang": "en_us",
}
proxies = {"http://": proxy, "https://": proxy} if proxy else None
async with httpx.AsyncClient(
proxies=proxies,
headers=HEADERS,
timeout=20,
) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
results = []
for item in data.get("results", []):
results.append({
"collection_id": item.get("collectionId"),
"name": item.get("collectionName"),
"author": item.get("artistName"),
"genre": item.get("primaryGenreName"),
"genre_ids": item.get("genreIds", []),
"episode_count": item.get("trackCount"),
"feed_url": item.get("feedUrl"),
"artwork_100": item.get("artworkUrl100"),
"artwork_600": item.get("artworkUrl600"),
"country": item.get("country"),
"content_advisory": item.get("contentAdvisoryRating"),
"release_date": item.get("releaseDate"),
"latest_episode_date": item.get("releaseDate"),
})
return results
async def lookup_podcast(
collection_id: int,
proxy: Optional[str] = None,
) -> dict:
"""Get detailed info for a specific podcast by collection ID."""
url = "https://itunes.apple.com/lookup"
params = {"id": collection_id, "entity": "podcast"}
proxies = {"http://": proxy, "https://": proxy} if proxy else None
async with httpx.AsyncClient(proxies=proxies, headers=HEADERS, timeout=20) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
return results[0] if results else {}
# Usage
async def main():
podcasts = await search_podcasts("true crime", limit=50, country="us")
print(f"Found {len(podcasts)} podcasts")
for p in podcasts[:5]:
print(f" {p['name']} by {p['author']} ({p['episode_count']} episodes)")
asyncio.run(main())
Scraping Charts
Charts use Apple's RSS feed API, which returns JSON without HTML scraping:
async def scrape_top_charts(
genre_id: int = 26,
country: str = "us",
limit: int = 100,
proxy: Optional[str] = None,
) -> list[dict]:
"""
Fetch top podcast charts for a genre/country.
This uses Apple's RSS feed API — the same endpoint that powers
the web UI. It returns up to 100 entries per genre.
"""
url = (
f"https://itunes.apple.com/{country}/rss/toppodcasts/"
f"limit={limit}/genre={genre_id}/json"
)
proxies = {"http://": proxy, "https://": proxy} if proxy else None
async with httpx.AsyncClient(proxies=proxies, headers=HEADERS, timeout=30) as client:
resp = await client.get(url)
if resp.status_code == 404:
return []
resp.raise_for_status()
feed = resp.json()
entries = feed.get("feed", {}).get("entry", [])
results = []
for rank, entry in enumerate(entries, start=1):
collection_id = (
entry.get("id", {})
.get("attributes", {})
.get("im:id")
)
results.append({
"rank": rank,
"name": entry.get("im:name", {}).get("label"),
"author": entry.get("im:artist", {}).get("label"),
"collection_id": collection_id,
"genre": entry.get("category", {}).get("attributes", {}).get("label"),
"artwork": entry.get("im:image", [{}])[-1].get("label"), # largest size
"content_type": entry.get("im:contentType", {}).get("attributes", {}).get("label"),
})
return results
async def scrape_all_genre_charts(
country: str = "us",
proxy: Optional[str] = None,
) -> dict[str, list[dict]]:
"""Scrape charts for all major podcast categories."""
GENRE_MAP = {
"all": 26,
"true_crime": 1320,
"comedy": 1303,
"news": 1311,
"science": 1315,
"business": 1324,
"technology": 1323,
"sports": 1318,
"education": 1304,
"health_fitness": 1307,
"society_culture": 1316,
"history": 1321,
}
results = {}
for genre_name, genre_id in GENRE_MAP.items():
charts = await scrape_top_charts(genre_id, country=country, proxy=proxy)
results[genre_name] = charts
print(f" {genre_name}: {len(charts)} entries")
await asyncio.sleep(0.5)
return results
# Scrape US and UK charts in parallel
async def multi_country_charts(proxy: Optional[str] = None) -> dict:
countries = ["us", "gb", "au", "ca", "de", "jp"]
tasks = [scrape_top_charts(26, country=c, proxy=proxy) for c in countries]
chart_lists = await asyncio.gather(*tasks, return_exceptions=True)
return {
country: charts
for country, charts in zip(countries, chart_lists)
if not isinstance(charts, Exception)
}
Scraping Reviews
Reviews use another Apple RSS endpoint:
async def scrape_reviews(
collection_id: int,
country: str = "us",
max_pages: int = 10,
proxy: Optional[str] = None,
) -> list[dict]:
"""
Scrape user reviews for a podcast.
Apple's review RSS endpoint provides up to 10 pages per country.
Each page has ~10 reviews. Max ~100 reviews per country.
"""
all_reviews = []
proxies = {"http://": proxy, "https://": proxy} if proxy else None
async with httpx.AsyncClient(proxies=proxies, headers=HEADERS, timeout=30) as client:
for page in range(1, max_pages + 1):
url = (
f"https://itunes.apple.com/{country}/rss/customerreviews/"
f"page={page}/id={collection_id}/sortby=mostrecent/json"
)
resp = await client.get(url)
if resp.status_code in (404, 400):
break
if resp.status_code == 429:
print(f" Rate limited on page {page}, sleeping 15s")
await asyncio.sleep(15)
continue
resp.raise_for_status()
data = resp.json()
entries = data.get("feed", {}).get("entry", [])
if not entries:
break
for entry in entries:
# First entry is the podcast metadata, not a review
if "im:rating" not in entry:
continue
all_reviews.append({
"title": entry.get("title", {}).get("label"),
"author": entry.get("author", {}).get("name", {}).get("label"),
"rating": int(entry.get("im:rating", {}).get("label", 0)),
"body": entry.get("content", {}).get("label"),
"version": entry.get("im:version", {}).get("label"),
"vote_sum": int(entry.get("im:voteSum", {}).get("label", 0)),
"vote_count": int(entry.get("im:voteCount", {}).get("label", 0)),
"review_id": entry.get("id", {}).get("label", "").split("/")[-1],
})
# No next page link = we're done
if not data.get("feed", {}).get("link"):
break
await asyncio.sleep(0.5)
return all_reviews
def aggregate_review_stats(reviews: list[dict]) -> dict:
"""Calculate rating distribution and average from review list."""
if not reviews:
return {}
ratings = [r["rating"] for r in reviews if r.get("rating")]
distribution = {i: ratings.count(i) for i in range(1, 6)}
avg = sum(ratings) / len(ratings) if ratings else 0
return {
"total_reviews": len(reviews),
"average_rating": round(avg, 2),
"distribution": distribution,
"five_star_pct": round(distribution.get(5, 0) / len(ratings) * 100, 1) if ratings else 0,
"one_star_pct": round(distribution.get(1, 0) / len(ratings) * 100, 1) if ratings else 0,
}
Proxy Configuration
Both chart scraping and review scraping benefit from rotating IPs. Apple's review endpoints block repeated requests from the same IP. ThorData's residential proxies work well here since each request gets a different IP from the residential pool:
PROXY = "http://USERNAME:[email protected]:9000"
# For country-specific charts, use geo-targeted proxies
def get_country_proxy(country: str) -> str:
"""Get ThorData proxy targeted to specific country."""
country_map = {
"us": "US", "gb": "GB", "au": "AU",
"ca": "CA", "de": "DE", "jp": "JP",
}
cc = country_map.get(country, "US")
return f"http://USER-country-{cc}:[email protected]:9000"
# Scrape charts with country-matched proxies
async def scrape_with_matching_proxy(country: str) -> list[dict]:
proxy = get_country_proxy(country)
return await scrape_top_charts(26, country=country, proxy=proxy)
For concurrent multi-country scraping, use a semaphore to limit parallelism:
async def scrape_multi_country_limited(countries: list[str]) -> dict:
"""Scrape charts for multiple countries with concurrency limit."""
semaphore = asyncio.Semaphore(3) # Max 3 concurrent requests
async def scrape_one(country: str) -> tuple[str, list]:
async with semaphore:
proxy = get_country_proxy(country)
charts = await scrape_top_charts(26, country=country, proxy=proxy)
return country, charts
tasks = [scrape_one(c) for c in countries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {country: charts for country, charts in results if not isinstance(charts, Exception)}
Parsing Episode Feeds
Once you have a feedUrl, episode data comes from RSS. feedparser handles messy podcast RSS variants:
import feedparser
async def parse_episodes(
feed_url: str,
proxy: Optional[str] = None,
max_episodes: int | None = None,
) -> list[dict]:
"""Parse episodes from a podcast RSS feed."""
proxies = {"http://": proxy, "https://": proxy} if proxy else None
async with httpx.AsyncClient(
proxies=proxies,
timeout=30,
follow_redirects=True,
) as client:
resp = await client.get(
feed_url,
headers={"User-Agent": "PodcastParser/1.0"},
)
resp.raise_for_status()
raw_feed = resp.text
feed = feedparser.parse(raw_feed)
# Show-level metadata
show = {
"title": feed.feed.get("title"),
"author": feed.feed.get("itunes_author") or feed.feed.get("author"),
"description": feed.feed.get("description") or feed.feed.get("summary"),
"language": feed.feed.get("language"),
"explicit": feed.feed.get("itunes_explicit"),
"categories": [
tag.get("term") for tag in feed.feed.get("tags", [])
],
"image": feed.feed.get("image", {}).get("href"),
}
entries = feed.entries[:max_episodes] if max_episodes else feed.entries
episodes = []
for entry in entries:
# Parse duration from multiple possible formats
duration_raw = entry.get("itunes_duration", "")
duration_seconds = parse_duration(duration_raw)
# Find audio enclosure
audio_url = None
for enclosure in entry.get("enclosures", []):
if "audio" in enclosure.get("type", ""):
audio_url = enclosure.get("href")
break
episodes.append({
"title": entry.get("title"),
"published": entry.get("published"),
"published_parsed": entry.get("published_parsed"),
"duration_seconds": duration_seconds,
"duration_raw": duration_raw,
"description": entry.get("summary"),
"audio_url": audio_url,
"episode_number": entry.get("itunes_episode"),
"season": entry.get("itunes_season"),
"episode_type": entry.get("itunes_episodetype", "full"),
"explicit": entry.get("itunes_explicit"),
"keywords": entry.get("itunes_keywords", ""),
"guid": entry.get("id"),
})
return episodes, show
def parse_duration(duration_str: str) -> int | None:
"""
Parse podcast episode duration to seconds.
Handles multiple formats: '3600', '1:00:00', '60:00', '1:30'
"""
if not duration_str:
return None
duration_str = str(duration_str).strip()
# Pure seconds
if re.match(r"^\d+$", duration_str):
return int(duration_str)
# HH:MM:SS or MM:SS or H:MM:SS
parts = duration_str.split(":")
try:
if len(parts) == 3:
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
elif len(parts) == 2:
return int(parts[0]) * 60 + int(parts[1])
except ValueError:
pass
return None
Storing Data in SQLite
import sqlite3
from datetime import datetime
def init_podcast_db(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS podcasts (
collection_id INTEGER PRIMARY KEY,
name TEXT,
author TEXT,
genre TEXT,
episode_count INTEGER,
feed_url TEXT,
country TEXT,
artwork_url TEXT,
fetched_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS chart_rankings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
collection_id INTEGER,
country TEXT,
genre_id INTEGER,
rank INTEGER,
ranked_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(collection_id, country, genre_id, ranked_at)
);
CREATE TABLE IF NOT EXISTS episodes (
guid TEXT PRIMARY KEY,
collection_id INTEGER,
title TEXT,
published TEXT,
duration_seconds INTEGER,
audio_url TEXT,
episode_number INTEGER,
season INTEGER,
description TEXT,
fetched_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS reviews (
review_id TEXT,
collection_id INTEGER,
country TEXT,
title TEXT,
author TEXT,
rating INTEGER,
body TEXT,
vote_sum INTEGER,
fetched_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (review_id, collection_id, country)
);
CREATE INDEX IF NOT EXISTS idx_episodes_collection ON episodes(collection_id);
CREATE INDEX IF NOT EXISTS idx_reviews_collection ON reviews(collection_id, country);
CREATE INDEX IF NOT EXISTS idx_chart_rankings ON chart_rankings(country, genre_id, ranked_at);
""")
conn.commit()
return conn
async def build_podcast_dataset(
search_queries: list[str],
countries: list[str] = ["us"],
proxy: Optional[str] = None,
db_path: str = "podcasts.db",
):
"""Build a complete podcast dataset from search queries."""
conn = init_podcast_db(db_path)
now = datetime.utcnow().isoformat()
# 1. Collect podcasts via search
all_podcasts = {}
for query in search_queries:
for country in countries:
results = await search_podcasts(query, limit=50, country=country, proxy=proxy)
for pod in results:
cid = pod["collection_id"]
if cid and cid not in all_podcasts:
all_podcasts[cid] = pod
await asyncio.sleep(0.5)
print(f"Collected {len(all_podcasts)} unique podcasts")
# 2. Store podcasts
for pod in all_podcasts.values():
conn.execute(
"""INSERT OR REPLACE INTO podcasts
(collection_id, name, author, genre, episode_count, feed_url, country, artwork_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(pod["collection_id"], pod["name"], pod["author"], pod["genre"],
pod["episode_count"], pod["feed_url"], pod["country"], pod.get("artwork_600"))
)
conn.commit()
# 3. Scrape chart rankings
for country in countries:
charts = await scrape_top_charts(26, country=country, proxy=proxy)
for chart_entry in charts:
if chart_entry.get("collection_id"):
conn.execute(
"""INSERT OR IGNORE INTO chart_rankings
(collection_id, country, genre_id, rank, ranked_at)
VALUES (?, ?, ?, ?, ?)""",
(chart_entry["collection_id"], country, 26,
chart_entry["rank"], now)
)
conn.commit()
print(f"Stored {len(charts)} chart rankings for {country}")
await asyncio.sleep(1)
conn.close()
print("Dataset build complete")
Tips and Gotchas
The RSS chart endpoint is the most stable route to rankings — it's what Apple's web UI calls, and returns clean JSON without HTML scraping. Prefer it over screen-scraping podcasts.apple.com.
Review data is capped at 10 pages per country. If you need historical reviews, you need to have been collecting them over time. There's no bulk export. A show with 50,000 total reviews will only surface the 100 most recent through the API.
Feed URLs change when a show migrates hosting providers. Store the collectionId as your canonical identifier and re-fetch the feed URL via the lookup API when needed rather than hardcoding it.
Rate limiting on the Search API sits around 20 requests per minute per IP. For bulk lookups across thousands of collection IDs, rotating IPs is essential. ThorData's residential proxies work well here since each request gets a fresh IP from the residential pool.
Duration format normalization is a real problem. Different hosting providers format duration differently (3600 vs 1:00:00 vs 60:00). Always use a robust parser that handles all formats rather than assuming a single format.
Private feeds (Patreon, Supporting Cast, Supercast) require subscriber tokens embedded in the feed URL. These aren't scrapeable without valid subscriber credentials.
iTunes API vs podcasts.apple.com: The iTunes API (itunes.apple.com/search) is stable and well-supported. The web app (podcasts.apple.com) uses internal endpoints that change without notice. Stick to the iTunes API for data you need reliably, and scrape the web app only for features it exclusively offers (full chart rankings beyond the RSS feed, richer review metadata).