← Back to blog

Scraping Apple Podcasts Data: Charts, Episodes and Reviews (2026)

Scraping Apple Podcasts Data: Charts, Episodes and Reviews (2026)

Apple Podcasts is still the dominant directory for podcast discovery, and the data it holds is genuinely useful — chart rankings by category and country, episode-level metadata, user reviews with star ratings, and RSS feed URLs that unlock the full episode history. Whether you're doing competitive research, building a podcast monitoring tool, or aggregating content for an analytics product, Apple is a primary source.

The good news: there's a legitimate Search API. The bad news: it doesn't expose charts or reviews, so those require scraping. Here's how to pull all of it.

What Data Is Available

The iTunes Search API gives you structured podcast metadata — show name, author, genre, episode count, artwork, feed URL, and country-specific store IDs. The feed URL is the most valuable piece because it points to the show's RSS feed, which contains every episode with title, description, duration, publish date, and enclosure URL.

Beyond the API, the Apple Podcasts web interface (podcasts.apple.com) exposes:

None of the chart/review data is in the iTunes Search API. Apple deliberately keeps chart rankings out of programmatic access to prevent competitive gaming of rankings.

Genre ID Reference

Apple organizes podcasts into categories with numeric IDs. You'll need these for chart scraping:

ID Category
26 All Podcasts
1301 Arts
1303 Comedy
1304 Education
1305 Kids & Family
1307 Health & Fitness
1309 TV & Film
1310 Music
1311 News
1314 Religion & Spirituality
1315 Science
1316 Society & Culture
1318 Sports
1320 True Crime
1321 History
1323 Technology
1324 Business
1325 Fiction
1326 Leisure
1327 Government

Using the iTunes Search API

The Search API is the right starting point. It's fast, returns JSON, and gives you feed URLs for RSS parsing.

import httpx
import asyncio
import json
import time
from typing import Optional

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0.0.0 Safari/537.36"
    ),
    "Accept-Language": "en-US,en;q=0.9",
    "Accept": "application/json, text/html",
}


async def search_podcasts(
    term: str,
    limit: int = 50,
    country: str = "us",
    proxy: Optional[str] = None,
) -> list[dict]:
    """Search iTunes for podcasts matching term."""
    url = "https://itunes.apple.com/search"
    params = {
        "term": term,
        "media": "podcast",
        "entity": "podcast",
        "limit": min(limit, 200),
        "country": country,
        "lang": "en_us",
    }
    proxies = {"http://": proxy, "https://": proxy} if proxy else None

    async with httpx.AsyncClient(
        proxies=proxies,
        headers=HEADERS,
        timeout=20,
    ) as client:
        resp = await client.get(url, params=params)
        resp.raise_for_status()
        data = resp.json()

    results = []
    for item in data.get("results", []):
        results.append({
            "collection_id": item.get("collectionId"),
            "name": item.get("collectionName"),
            "author": item.get("artistName"),
            "genre": item.get("primaryGenreName"),
            "genre_ids": item.get("genreIds", []),
            "episode_count": item.get("trackCount"),
            "feed_url": item.get("feedUrl"),
            "artwork_100": item.get("artworkUrl100"),
            "artwork_600": item.get("artworkUrl600"),
            "country": item.get("country"),
            "content_advisory": item.get("contentAdvisoryRating"),
            "release_date": item.get("releaseDate"),
            "latest_episode_date": item.get("releaseDate"),
        })
    return results


async def lookup_podcast(
    collection_id: int,
    proxy: Optional[str] = None,
) -> dict:
    """Get detailed info for a specific podcast by collection ID."""
    url = "https://itunes.apple.com/lookup"
    params = {"id": collection_id, "entity": "podcast"}
    proxies = {"http://": proxy, "https://": proxy} if proxy else None

    async with httpx.AsyncClient(proxies=proxies, headers=HEADERS, timeout=20) as client:
        resp = await client.get(url, params=params)
        resp.raise_for_status()
        data = resp.json()

    results = data.get("results", [])
    return results[0] if results else {}


# Usage
async def main():
    podcasts = await search_podcasts("true crime", limit=50, country="us")
    print(f"Found {len(podcasts)} podcasts")
    for p in podcasts[:5]:
        print(f"  {p['name']} by {p['author']} ({p['episode_count']} episodes)")

asyncio.run(main())

Scraping Charts

Charts use Apple's RSS feed API, which returns JSON without HTML scraping:

async def scrape_top_charts(
    genre_id: int = 26,
    country: str = "us",
    limit: int = 100,
    proxy: Optional[str] = None,
) -> list[dict]:
    """
    Fetch top podcast charts for a genre/country.

    This uses Apple's RSS feed API — the same endpoint that powers
    the web UI. It returns up to 100 entries per genre.
    """
    url = (
        f"https://itunes.apple.com/{country}/rss/toppodcasts/"
        f"limit={limit}/genre={genre_id}/json"
    )
    proxies = {"http://": proxy, "https://": proxy} if proxy else None

    async with httpx.AsyncClient(proxies=proxies, headers=HEADERS, timeout=30) as client:
        resp = await client.get(url)
        if resp.status_code == 404:
            return []
        resp.raise_for_status()
        feed = resp.json()

    entries = feed.get("feed", {}).get("entry", [])
    results = []
    for rank, entry in enumerate(entries, start=1):
        collection_id = (
            entry.get("id", {})
            .get("attributes", {})
            .get("im:id")
        )
        results.append({
            "rank": rank,
            "name": entry.get("im:name", {}).get("label"),
            "author": entry.get("im:artist", {}).get("label"),
            "collection_id": collection_id,
            "genre": entry.get("category", {}).get("attributes", {}).get("label"),
            "artwork": entry.get("im:image", [{}])[-1].get("label"),  # largest size
            "content_type": entry.get("im:contentType", {}).get("attributes", {}).get("label"),
        })

    return results


async def scrape_all_genre_charts(
    country: str = "us",
    proxy: Optional[str] = None,
) -> dict[str, list[dict]]:
    """Scrape charts for all major podcast categories."""
    GENRE_MAP = {
        "all": 26,
        "true_crime": 1320,
        "comedy": 1303,
        "news": 1311,
        "science": 1315,
        "business": 1324,
        "technology": 1323,
        "sports": 1318,
        "education": 1304,
        "health_fitness": 1307,
        "society_culture": 1316,
        "history": 1321,
    }

    results = {}
    for genre_name, genre_id in GENRE_MAP.items():
        charts = await scrape_top_charts(genre_id, country=country, proxy=proxy)
        results[genre_name] = charts
        print(f"  {genre_name}: {len(charts)} entries")
        await asyncio.sleep(0.5)

    return results


# Scrape US and UK charts in parallel
async def multi_country_charts(proxy: Optional[str] = None) -> dict:
    countries = ["us", "gb", "au", "ca", "de", "jp"]
    tasks = [scrape_top_charts(26, country=c, proxy=proxy) for c in countries]
    chart_lists = await asyncio.gather(*tasks, return_exceptions=True)

    return {
        country: charts
        for country, charts in zip(countries, chart_lists)
        if not isinstance(charts, Exception)
    }

Scraping Reviews

Reviews use another Apple RSS endpoint:

async def scrape_reviews(
    collection_id: int,
    country: str = "us",
    max_pages: int = 10,
    proxy: Optional[str] = None,
) -> list[dict]:
    """
    Scrape user reviews for a podcast.

    Apple's review RSS endpoint provides up to 10 pages per country.
    Each page has ~10 reviews. Max ~100 reviews per country.
    """
    all_reviews = []
    proxies = {"http://": proxy, "https://": proxy} if proxy else None

    async with httpx.AsyncClient(proxies=proxies, headers=HEADERS, timeout=30) as client:
        for page in range(1, max_pages + 1):
            url = (
                f"https://itunes.apple.com/{country}/rss/customerreviews/"
                f"page={page}/id={collection_id}/sortby=mostrecent/json"
            )

            resp = await client.get(url)
            if resp.status_code in (404, 400):
                break
            if resp.status_code == 429:
                print(f"  Rate limited on page {page}, sleeping 15s")
                await asyncio.sleep(15)
                continue
            resp.raise_for_status()

            data = resp.json()
            entries = data.get("feed", {}).get("entry", [])

            if not entries:
                break

            for entry in entries:
                # First entry is the podcast metadata, not a review
                if "im:rating" not in entry:
                    continue

                all_reviews.append({
                    "title": entry.get("title", {}).get("label"),
                    "author": entry.get("author", {}).get("name", {}).get("label"),
                    "rating": int(entry.get("im:rating", {}).get("label", 0)),
                    "body": entry.get("content", {}).get("label"),
                    "version": entry.get("im:version", {}).get("label"),
                    "vote_sum": int(entry.get("im:voteSum", {}).get("label", 0)),
                    "vote_count": int(entry.get("im:voteCount", {}).get("label", 0)),
                    "review_id": entry.get("id", {}).get("label", "").split("/")[-1],
                })

            # No next page link = we're done
            if not data.get("feed", {}).get("link"):
                break

            await asyncio.sleep(0.5)

    return all_reviews


def aggregate_review_stats(reviews: list[dict]) -> dict:
    """Calculate rating distribution and average from review list."""
    if not reviews:
        return {}

    ratings = [r["rating"] for r in reviews if r.get("rating")]
    distribution = {i: ratings.count(i) for i in range(1, 6)}
    avg = sum(ratings) / len(ratings) if ratings else 0

    return {
        "total_reviews": len(reviews),
        "average_rating": round(avg, 2),
        "distribution": distribution,
        "five_star_pct": round(distribution.get(5, 0) / len(ratings) * 100, 1) if ratings else 0,
        "one_star_pct": round(distribution.get(1, 0) / len(ratings) * 100, 1) if ratings else 0,
    }

Proxy Configuration

Both chart scraping and review scraping benefit from rotating IPs. Apple's review endpoints block repeated requests from the same IP. ThorData's residential proxies work well here since each request gets a different IP from the residential pool:

PROXY = "http://USERNAME:[email protected]:9000"

# For country-specific charts, use geo-targeted proxies
def get_country_proxy(country: str) -> str:
    """Get ThorData proxy targeted to specific country."""
    country_map = {
        "us": "US", "gb": "GB", "au": "AU",
        "ca": "CA", "de": "DE", "jp": "JP",
    }
    cc = country_map.get(country, "US")
    return f"http://USER-country-{cc}:[email protected]:9000"

# Scrape charts with country-matched proxies
async def scrape_with_matching_proxy(country: str) -> list[dict]:
    proxy = get_country_proxy(country)
    return await scrape_top_charts(26, country=country, proxy=proxy)

For concurrent multi-country scraping, use a semaphore to limit parallelism:

async def scrape_multi_country_limited(countries: list[str]) -> dict:
    """Scrape charts for multiple countries with concurrency limit."""
    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent requests

    async def scrape_one(country: str) -> tuple[str, list]:
        async with semaphore:
            proxy = get_country_proxy(country)
            charts = await scrape_top_charts(26, country=country, proxy=proxy)
            return country, charts

    tasks = [scrape_one(c) for c in countries]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return {country: charts for country, charts in results if not isinstance(charts, Exception)}

Parsing Episode Feeds

Once you have a feedUrl, episode data comes from RSS. feedparser handles messy podcast RSS variants:

import feedparser

async def parse_episodes(
    feed_url: str,
    proxy: Optional[str] = None,
    max_episodes: int | None = None,
) -> list[dict]:
    """Parse episodes from a podcast RSS feed."""
    proxies = {"http://": proxy, "https://": proxy} if proxy else None

    async with httpx.AsyncClient(
        proxies=proxies,
        timeout=30,
        follow_redirects=True,
    ) as client:
        resp = await client.get(
            feed_url,
            headers={"User-Agent": "PodcastParser/1.0"},
        )
        resp.raise_for_status()
        raw_feed = resp.text

    feed = feedparser.parse(raw_feed)

    # Show-level metadata
    show = {
        "title": feed.feed.get("title"),
        "author": feed.feed.get("itunes_author") or feed.feed.get("author"),
        "description": feed.feed.get("description") or feed.feed.get("summary"),
        "language": feed.feed.get("language"),
        "explicit": feed.feed.get("itunes_explicit"),
        "categories": [
            tag.get("term") for tag in feed.feed.get("tags", [])
        ],
        "image": feed.feed.get("image", {}).get("href"),
    }

    entries = feed.entries[:max_episodes] if max_episodes else feed.entries
    episodes = []

    for entry in entries:
        # Parse duration from multiple possible formats
        duration_raw = entry.get("itunes_duration", "")
        duration_seconds = parse_duration(duration_raw)

        # Find audio enclosure
        audio_url = None
        for enclosure in entry.get("enclosures", []):
            if "audio" in enclosure.get("type", ""):
                audio_url = enclosure.get("href")
                break

        episodes.append({
            "title": entry.get("title"),
            "published": entry.get("published"),
            "published_parsed": entry.get("published_parsed"),
            "duration_seconds": duration_seconds,
            "duration_raw": duration_raw,
            "description": entry.get("summary"),
            "audio_url": audio_url,
            "episode_number": entry.get("itunes_episode"),
            "season": entry.get("itunes_season"),
            "episode_type": entry.get("itunes_episodetype", "full"),
            "explicit": entry.get("itunes_explicit"),
            "keywords": entry.get("itunes_keywords", ""),
            "guid": entry.get("id"),
        })

    return episodes, show


def parse_duration(duration_str: str) -> int | None:
    """
    Parse podcast episode duration to seconds.
    Handles multiple formats: '3600', '1:00:00', '60:00', '1:30'
    """
    if not duration_str:
        return None
    duration_str = str(duration_str).strip()

    # Pure seconds
    if re.match(r"^\d+$", duration_str):
        return int(duration_str)

    # HH:MM:SS or MM:SS or H:MM:SS
    parts = duration_str.split(":")
    try:
        if len(parts) == 3:
            return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
        elif len(parts) == 2:
            return int(parts[0]) * 60 + int(parts[1])
    except ValueError:
        pass

    return None

Storing Data in SQLite

import sqlite3
from datetime import datetime

def init_podcast_db(db_path: str) -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS podcasts (
            collection_id   INTEGER PRIMARY KEY,
            name            TEXT,
            author          TEXT,
            genre           TEXT,
            episode_count   INTEGER,
            feed_url        TEXT,
            country         TEXT,
            artwork_url     TEXT,
            fetched_at      TEXT DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS chart_rankings (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            collection_id   INTEGER,
            country         TEXT,
            genre_id        INTEGER,
            rank            INTEGER,
            ranked_at       TEXT DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(collection_id, country, genre_id, ranked_at)
        );

        CREATE TABLE IF NOT EXISTS episodes (
            guid            TEXT PRIMARY KEY,
            collection_id   INTEGER,
            title           TEXT,
            published       TEXT,
            duration_seconds INTEGER,
            audio_url       TEXT,
            episode_number  INTEGER,
            season          INTEGER,
            description     TEXT,
            fetched_at      TEXT DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS reviews (
            review_id       TEXT,
            collection_id   INTEGER,
            country         TEXT,
            title           TEXT,
            author          TEXT,
            rating          INTEGER,
            body            TEXT,
            vote_sum        INTEGER,
            fetched_at      TEXT DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (review_id, collection_id, country)
        );

        CREATE INDEX IF NOT EXISTS idx_episodes_collection ON episodes(collection_id);
        CREATE INDEX IF NOT EXISTS idx_reviews_collection ON reviews(collection_id, country);
        CREATE INDEX IF NOT EXISTS idx_chart_rankings ON chart_rankings(country, genre_id, ranked_at);
    """)
    conn.commit()
    return conn


async def build_podcast_dataset(
    search_queries: list[str],
    countries: list[str] = ["us"],
    proxy: Optional[str] = None,
    db_path: str = "podcasts.db",
):
    """Build a complete podcast dataset from search queries."""
    conn = init_podcast_db(db_path)
    now = datetime.utcnow().isoformat()

    # 1. Collect podcasts via search
    all_podcasts = {}
    for query in search_queries:
        for country in countries:
            results = await search_podcasts(query, limit=50, country=country, proxy=proxy)
            for pod in results:
                cid = pod["collection_id"]
                if cid and cid not in all_podcasts:
                    all_podcasts[cid] = pod
            await asyncio.sleep(0.5)

    print(f"Collected {len(all_podcasts)} unique podcasts")

    # 2. Store podcasts
    for pod in all_podcasts.values():
        conn.execute(
            """INSERT OR REPLACE INTO podcasts
               (collection_id, name, author, genre, episode_count, feed_url, country, artwork_url)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            (pod["collection_id"], pod["name"], pod["author"], pod["genre"],
             pod["episode_count"], pod["feed_url"], pod["country"], pod.get("artwork_600"))
        )
    conn.commit()

    # 3. Scrape chart rankings
    for country in countries:
        charts = await scrape_top_charts(26, country=country, proxy=proxy)
        for chart_entry in charts:
            if chart_entry.get("collection_id"):
                conn.execute(
                    """INSERT OR IGNORE INTO chart_rankings
                       (collection_id, country, genre_id, rank, ranked_at)
                       VALUES (?, ?, ?, ?, ?)""",
                    (chart_entry["collection_id"], country, 26,
                     chart_entry["rank"], now)
                )
        conn.commit()
        print(f"Stored {len(charts)} chart rankings for {country}")
        await asyncio.sleep(1)

    conn.close()
    print("Dataset build complete")

Tips and Gotchas

The RSS chart endpoint is the most stable route to rankings — it's what Apple's web UI calls, and returns clean JSON without HTML scraping. Prefer it over screen-scraping podcasts.apple.com.

Review data is capped at 10 pages per country. If you need historical reviews, you need to have been collecting them over time. There's no bulk export. A show with 50,000 total reviews will only surface the 100 most recent through the API.

Feed URLs change when a show migrates hosting providers. Store the collectionId as your canonical identifier and re-fetch the feed URL via the lookup API when needed rather than hardcoding it.

Rate limiting on the Search API sits around 20 requests per minute per IP. For bulk lookups across thousands of collection IDs, rotating IPs is essential. ThorData's residential proxies work well here since each request gets a fresh IP from the residential pool.

Duration format normalization is a real problem. Different hosting providers format duration differently (3600 vs 1:00:00 vs 60:00). Always use a robust parser that handles all formats rather than assuming a single format.

Private feeds (Patreon, Supporting Cast, Supercast) require subscriber tokens embedded in the feed URL. These aren't scrapeable without valid subscriber credentials.

iTunes API vs podcasts.apple.com: The iTunes API (itunes.apple.com/search) is stable and well-supported. The web app (podcasts.apple.com) uses internal endpoints that change without notice. Stick to the iTunes API for data you need reliably, and scrape the web app only for features it exclusively offers (full chart rankings beyond the RSS feed, richer review metadata).