← Back to blog

Scraping Hulu: Content Catalog and Availability Tracking (2026)

Scraping Hulu: Content Catalog and Availability Tracking (2026)

Hulu is a weird target for scraping. It's owned by Disney, runs a massive content library with licensed shows that rotate in and out, and there's no public API. But unlike Netflix, Hulu's catalog is partially browsable without authentication — their marketing pages, show detail pages, and browse sections expose a lot of metadata to search engines. That's your way in.

The main use case? Tracking what content comes and goes. Hulu's library changes constantly as licensing deals expire and new ones start. If you're building a streaming comparison service, monitoring cord-cutting options, or just want to know when a show leaves the platform — you need this data.

What's Accessible

Without logging in, Hulu exposes:

With a subscription account (more complex, covered later):

For this guide, we focus on the public-facing data. It's enough for catalog tracking and doesn't require dealing with authentication tokens.

Dependencies and Setup

pip install httpx[http2] playwright beautifulsoup4 selectolax
playwright install chromium

We use httpx for lightweight static page fetching and Playwright for pages that require JavaScript execution or Cloudflare bypass.

Anti-Bot Setup

Hulu uses a combination of Cloudflare and custom bot detection.

Cloudflare WAF. The standard Cloudflare challenge page shows up for suspicious traffic. Datacenter IPs, high request rates, and missing browser fingerprints all trigger it.

Client-side telemetry. Hulu's JavaScript sends behavioral telemetry back to their analytics. While this isn't a hard block, anomalous patterns (no mouse events, instant page loads, sequential URL access) contribute to an escalating suspicion score.

Rate limiting. Hulu applies rate limits per IP. Hit too many pages in a short window and you'll start seeing 429 responses or Cloudflare challenge pages.

The good news: Hulu's detection is a step below Amazon or Ticketmaster. A stealth browser with residential proxies handles it reliably. For the proxy side, ThorData's residential network works well here — their US residential IPs pass Cloudflare checks without issues, and since Hulu is US-only, that's the only geo you need.

Rotating User-Agents

Keep a realistic pool of browser strings. Cloudflare and Hulu's CDN both inspect User-Agent headers:

import random

USER_AGENTS = [
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
]

def random_headers() -> dict:
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Cache-Control": "max-age=0",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "none",
    }

Scraping the Browse Pages

Hulu's browse pages are the best starting point for catalog discovery. They're server-side rendered with content data embedded in the HTML.

import httpx
import json
import re
import time
from bs4 import BeautifulSoup
from dataclasses import dataclass, field

@dataclass
class HuluShow:
    """Represents a Hulu content item."""
    entity_id: str = ""
    title: str = ""
    description: str = ""
    content_type: str = ""  # movie, series
    genres: list = field(default_factory=list)
    rating: str = ""
    seasons: int = 0
    premiere_date: str = ""
    network: str = ""
    url: str = ""
    thumbnail: str = ""


class HuluScraper:
    """Scrape Hulu content catalog from public pages."""

    BASE = "https://www.hulu.com"

    def __init__(self, proxy: str = None, delay: float = 3.0):
        self.proxy = proxy
        self.delay = delay
        self._build_client()

    def _build_client(self):
        client_kwargs = {
            "headers": random_headers(),
            "timeout": 30,
            "follow_redirects": True,
        }
        if self.proxy:
            client_kwargs["proxies"] = {"http://": self.proxy, "https://": self.proxy}
        self.client = httpx.Client(**client_kwargs)

    def _get(self, path: str) -> httpx.Response:
        """Fetch a page with automatic header rotation and delay."""
        # Rotate headers per request
        self.client.headers.update(random_headers())
        resp = self.client.get(f"{self.BASE}{path}")
        time.sleep(self.delay + random.uniform(-0.5, 1.5))
        return resp

    def scrape_hub_page(self, path: str = "/hub/movies") -> list:
        """Scrape a hub/browse page for content listings."""
        resp = self._get(path)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")

        items = []

        # Look for Next.js data blob
        next_data = soup.find("script", id="__NEXT_DATA__")
        if next_data:
            try:
                data = json.loads(next_data.string)
                page_props = data.get("props", {}).get("pageProps", {})
                collections = page_props.get("collections", [])

                for collection in collections:
                    for item in collection.get("items", []):
                        items.append({
                            "entity_id": item.get("id", ""),
                            "title": item.get("name", ""),
                            "description": item.get("description", ""),
                            "content_type": item.get("type", ""),
                            "thumbnail": item.get("artwork", {}).get("horizontal", {}).get("path", ""),
                            "premiere_date": item.get("premiereDate", ""),
                            "rating": item.get("rating", ""),
                            "url": f"{self.BASE}/series/{item.get('id', '')}",
                        })
            except (json.JSONDecodeError, KeyError, TypeError) as e:
                print(f"Next.js data parse error on {path}: {e}")

        # Fallback: extract from Open Graph and schema markup
        if not items:
            items.extend(self._extract_schema_items(soup, path))

        return items

    def _extract_schema_items(self, soup: BeautifulSoup, path: str) -> list:
        """Extract items from schema.org markup as a fallback."""
        items = []
        for script in soup.find_all("script", type="application/ld+json"):
            try:
                ld = json.loads(script.string)
                if ld.get("@type") == "ItemList":
                    for element in ld.get("itemListElement", []):
                        item_data = element.get("item", {})
                        items.append({
                            "entity_id": item_data.get("@id", "").split("/")[-1],
                            "title": item_data.get("name", ""),
                            "description": item_data.get("description", ""),
                            "content_type": item_data.get("@type", "").lower(),
                            "url": item_data.get("url", ""),
                        })
            except (json.JSONDecodeError, KeyError):
                continue
        return items

Show Detail Scraping

Individual show pages have the richest metadata. Here's where you get episode-level data.

    def scrape_show_page(self, show_url: str) -> dict:
        """Scrape detailed metadata from a show's page."""
        # Handle relative URLs
        if show_url.startswith("/"):
            show_url = f"{self.BASE}{show_url}"

        resp = self.client.get(show_url)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")
        time.sleep(self.delay)

        show = {"url": show_url}

        # JSON-LD structured data — most reliable source
        ld_scripts = soup.find_all("script", type="application/ld+json")
        for script in ld_scripts:
            try:
                ld = json.loads(script.string)
                if ld.get("@type") in ("TVSeries", "Movie", "TVSeason"):
                    show["jsonld"] = ld
                    show["title"] = ld.get("name", "")
                    show["description"] = ld.get("description", "")
                    show["genres"] = ld.get("genre", [])
                    show["rating"] = ld.get("contentRating", "")
                    show["start_date"] = ld.get("startDate", "")
                    show["end_date"] = ld.get("endDate", "")
                    show["creator"] = ld.get("creator", {})

                    # Aggregate rating if present
                    if "aggregateRating" in ld:
                        show["aggregate_rating"] = ld["aggregateRating"].get("ratingValue")
                        show["rating_count"] = ld["aggregateRating"].get("ratingCount")

                    # Season/episode structure
                    if "containsSeason" in ld:
                        seasons = ld["containsSeason"]
                        show["season_count"] = len(seasons)
                        show["seasons"] = []
                        for season in seasons:
                            s = {
                                "number": season.get("seasonNumber"),
                                "episode_count": season.get("numberOfEpisodes"),
                                "name": season.get("name", ""),
                            }
                            if "episode" in season:
                                s["episodes"] = [{
                                    "number": ep.get("episodeNumber"),
                                    "title": ep.get("name", ""),
                                    "description": ep.get("description", ""),
                                    "date": ep.get("datePublished", ""),
                                    "duration": ep.get("timeRequired", ""),
                                } for ep in season["episode"]]
                            show["seasons"].append(s)
                    break
            except (json.JSONDecodeError, KeyError, TypeError):
                continue

        # Next.js page data as backup
        next_data = soup.find("script", id="__NEXT_DATA__")
        if next_data and "title" not in show:
            try:
                data = json.loads(next_data.string)
                details = data.get("props", {}).get("pageProps", {}).get("details", {})
                show["title"] = details.get("name", show.get("title", ""))
                show["network"] = details.get("network", "")
                show["description"] = details.get("description", show.get("description", ""))
                show["tags"] = details.get("tags", [])
            except (json.JSONDecodeError, KeyError):
                pass

        # Open Graph fallback for basic fields
        og_title = soup.find("meta", property="og:title")
        og_desc = soup.find("meta", property="og:description")
        og_image = soup.find("meta", property="og:image")
        if og_title and not show.get("title"):
            show["title"] = og_title.get("content", "")
        if og_desc and not show.get("description"):
            show["description"] = og_desc.get("content", "")
        if og_image:
            show["thumbnail"] = og_image.get("content", "")

        return show

Full Catalog Discovery

To build a complete catalog, crawl all the browse pages systematically.

    def discover_full_catalog(self) -> list:
        """Crawl all major hub and genre pages for catalog discovery."""
        hub_paths = [
            "/hub/movies",
            "/hub/tv-shows",
            "/hub/hulu-originals",
            "/hub/recently-added",
            "/hub/expiring-soon",
            "/hub/networks/abc",
            "/hub/networks/fx",
            "/hub/networks/nbc",
            "/hub/networks/cbs",
            "/hub/networks/bravo",
            "/hub/networks/a-and-e",
            "/hub/genre/comedy",
            "/hub/genre/drama",
            "/hub/genre/action",
            "/hub/genre/horror",
            "/hub/genre/documentary",
            "/hub/genre/anime",
            "/hub/genre/reality-tv",
            "/hub/genre/kids",
            "/hub/genre/sci-fi",
            "/hub/genre/thriller",
            "/hub/genre/crime",
            "/hub/genre/romance",
            "/hub/genre/animated",
        ]

        all_items = {}  # deduplicate by entity_id

        for path in hub_paths:
            print(f"Scraping {path}...")
            try:
                items = self.scrape_hub_page(path)
                before = len(all_items)
                for item in items:
                    eid = item.get("entity_id")
                    if eid and eid not in all_items:
                        all_items[eid] = item
                new_items = len(all_items) - before
                print(f"  Found {len(items)} items (+{new_items} new, {len(all_items)} unique total)")
            except Exception as e:
                print(f"  Error on {path}: {e}")

        return list(all_items.values())

Playwright Fallback for Cloudflare-Blocked Pages

When httpx gets Cloudflare challenges on certain Hulu pages, Playwright with stealth handles the JS challenge:

from playwright.sync_api import sync_playwright
import json

def scrape_hulu_with_playwright(path: str, proxy: str = None) -> list:
    """Use Playwright to scrape Hulu hub pages that trigger Cloudflare challenges."""
    BASE = "https://www.hulu.com"

    launch_kwargs = {
        "headless": True,
        "args": [
            "--disable-blink-features=AutomationControlled",
            "--no-sandbox",
            "--disable-dev-shm-usage",
        ],
    }
    if proxy:
        launch_kwargs["proxy"] = {"server": proxy}

    items = []

    with sync_playwright() as p:
        browser = p.chromium.launch(**launch_kwargs)
        context = browser.new_context(
            user_agent=random.choice(USER_AGENTS),
            locale="en-US",
            timezone_id="America/New_York",
            viewport={"width": 1440, "height": 900},
        )

        # Mask webdriver signals
        context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
            Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
            window.chrome = { runtime: {} };
        """)

        page = context.new_page()
        page.goto(f"{BASE}{path}", wait_until="networkidle", timeout=45000)
        page.wait_for_timeout(2000)

        # Try to extract Next.js data from the page
        items_json = page.evaluate("""() => {
            const nextData = document.getElementById('__NEXT_DATA__');
            if (!nextData) return null;
            try {
                const data = JSON.parse(nextData.textContent);
                const collections = data?.props?.pageProps?.collections || [];
                const items = [];
                for (const col of collections) {
                    for (const item of (col.items || [])) {
                        items.push({
                            entity_id: item.id || '',
                            title: item.name || '',
                            description: item.description || '',
                            content_type: item.type || '',
                            premiere_date: item.premiereDate || '',
                        });
                    }
                }
                return items;
            } catch(e) { return null; }
        }""")

        if items_json:
            items = items_json

        browser.close()

    return items


def scrape_episode_list_playwright(show_url: str, proxy: str = None) -> list:
    """Scrape the episode list from a Hulu show page via Playwright."""
    launch_kwargs = {"headless": True, "args": ["--no-sandbox"]}
    if proxy:
        launch_kwargs["proxy"] = {"server": proxy}

    episodes = []

    with sync_playwright() as p:
        browser = p.chromium.launch(**launch_kwargs)
        context = browser.new_context(
            user_agent=random.choice(USER_AGENTS),
            locale="en-US",
        )
        page = context.new_page()
        page.goto(show_url, wait_until="networkidle", timeout=45000)
        page.wait_for_timeout(3000)

        # Look for episode cards
        episode_cards = page.query_selector_all("[class*='episode'], [data-testid*='episode']")
        for card in episode_cards:
            title_el = card.query_selector("h4, [class*='title']")
            desc_el = card.query_selector("[class*='description'], p")
            num_el = card.query_selector("[class*='number'], [class*='season']")

            episode = {}
            if title_el:
                episode["title"] = title_el.inner_text().strip()
            if desc_el:
                episode["description"] = desc_el.inner_text().strip()
            if num_el:
                episode["number"] = num_el.inner_text().strip()
            if episode:
                episodes.append(episode)

        browser.close()

    return episodes

Availability Change Tracking

The real value is tracking changes over time. Run the catalog scraper daily and diff the results.

import sqlite3
from datetime import datetime

def init_hulu_db(path: str = "hulu_catalog.db") -> sqlite3.Connection:
    conn = sqlite3.connect(path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS shows (
            entity_id TEXT PRIMARY KEY,
            title TEXT,
            content_type TEXT,
            genres TEXT,
            rating TEXT,
            network TEXT,
            season_count INTEGER,
            description TEXT,
            premiere_date TEXT,
            thumbnail TEXT,
            first_seen TEXT,
            last_seen TEXT,
            status TEXT DEFAULT 'active'
        );
        CREATE TABLE IF NOT EXISTS catalog_changes (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            entity_id TEXT,
            title TEXT,
            change_type TEXT,  -- 'added' or 'removed'
            detected_at TEXT
        );
        CREATE TABLE IF NOT EXISTS snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            snapshot_date TEXT,
            total_shows INTEGER,
            total_movies INTEGER,
            total_series INTEGER,
            created_at TEXT DEFAULT CURRENT_TIMESTAMP
        );
        CREATE INDEX IF NOT EXISTS idx_changes_type ON catalog_changes(change_type);
        CREATE INDEX IF NOT EXISTS idx_changes_date ON catalog_changes(detected_at);
    """)
    conn.commit()
    return conn


def update_catalog(conn: sqlite3.Connection, current_items: list) -> dict:
    """Compare current scrape with stored catalog, record changes."""
    now = datetime.utcnow().isoformat()

    # Get previously known active items
    cursor = conn.execute(
        "SELECT entity_id, title FROM shows WHERE status = 'active'"
    )
    known = {row[0]: row[1] for row in cursor.fetchall()}
    current = {item["entity_id"]: item for item in current_items if item.get("entity_id")}

    added_titles = []
    removed_titles = []

    # Detect new additions
    for eid, item in current.items():
        if eid not in known:
            conn.execute(
                """INSERT OR REPLACE INTO shows
                   (entity_id, title, content_type, genres, rating,
                    network, description, premiere_date, thumbnail,
                    first_seen, last_seen, status)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active')""",
                (eid, item.get("title"), item.get("content_type"),
                 json.dumps(item.get("genres", [])), item.get("rating"),
                 item.get("network", ""), item.get("description", ""),
                 item.get("premiere_date", ""), item.get("thumbnail", ""),
                 now, now),
            )
            conn.execute(
                "INSERT INTO catalog_changes (entity_id, title, change_type, detected_at) VALUES (?, ?, 'added', ?)",
                (eid, item.get("title"), now),
            )
            added_titles.append(item.get("title", eid))
        else:
            conn.execute(
                "UPDATE shows SET last_seen = ? WHERE entity_id = ?", (now, eid)
            )

    # Detect removals
    for eid, title in known.items():
        if eid not in current:
            conn.execute(
                "INSERT INTO catalog_changes (entity_id, title, change_type, detected_at) VALUES (?, ?, 'removed', ?)",
                (eid, title, now),
            )
            conn.execute(
                "UPDATE shows SET status = 'removed' WHERE entity_id = ?", (eid,)
            )
            removed_titles.append(title)

    # Record snapshot stats
    total = len(current)
    movies = sum(1 for i in current.values() if i.get("content_type") == "movie")
    series = sum(1 for i in current.values() if i.get("content_type") in ("series", "tv"))
    conn.execute(
        "INSERT INTO snapshots (snapshot_date, total_shows, total_movies, total_series) VALUES (?, ?, ?, ?)",
        (now[:10], total, movies, series)
    )

    conn.commit()

    print(f"Catalog update: +{len(added_titles)} added, -{len(removed_titles)} removed")
    return {"added": added_titles, "removed": removed_titles}

Enriching New Items with Detail Data

When new titles appear, trigger a detail-page enrichment run to capture full metadata:

def enrich_new_items(
    conn: sqlite3.Connection,
    scraper: HuluScraper,
    added_ids: list,
    limit: int = 20,
):
    """Scrape detail pages for newly discovered items."""
    cursor = conn.execute(
        "SELECT entity_id, title FROM shows WHERE entity_id IN ({}) AND status='active'".format(
            ",".join("?" * len(added_ids))
        ),
        added_ids,
    )
    rows = cursor.fetchall()

    for eid, title in rows[:limit]:
        url = f"https://www.hulu.com/series/{eid}"
        print(f"  Enriching: {title}")
        try:
            details = scraper.scrape_show_page(url)
            conn.execute(
                """UPDATE shows
                   SET genres = ?, rating = ?, season_count = ?, description = ?
                   WHERE entity_id = ?""",
                (
                    json.dumps(details.get("genres", [])),
                    details.get("rating", ""),
                    details.get("season_count"),
                    details.get("description", ""),
                    eid,
                )
            )
            conn.commit()
        except Exception as e:
            print(f"  Failed to enrich {title}: {e}")

Running the Pipeline

def daily_catalog_update(proxy: str = None, db_path: str = "hulu_catalog.db"):
    """Run a full catalog scan and record changes."""
    scraper = HuluScraper(proxy=proxy, delay=3.0)
    conn = init_hulu_db(db_path)

    print("Discovering catalog...")
    items = scraper.discover_full_catalog()
    print(f"Found {len(items)} total items")

    print("Checking for changes...")
    changes = update_catalog(conn, items)

    # Enrich new items with detailed metadata
    if changes["added"]:
        print(f"Enriching {len(changes['added'])} new titles...")
        # Get entity_ids for added titles
        added_ids = [
            item["entity_id"] for item in items
            if item.get("title") in changes["added"]
        ]
        enrich_new_items(conn, scraper, added_ids[:20])

    # Report catalog health
    total_active = conn.execute(
        "SELECT COUNT(*) FROM shows WHERE status='active'"
    ).fetchone()[0]
    total_movies = conn.execute(
        "SELECT COUNT(*) FROM shows WHERE status='active' AND content_type='movie'"
    ).fetchone()[0]

    print(f"\nCatalog status:")
    print(f"  Active titles: {total_active}")
    print(f"  Movies: {total_movies}")
    print(f"  Series: {total_active - total_movies}")

    conn.close()
    print("Done.")


def query_recent_changes(db_path: str = "hulu_catalog.db", days: int = 7) -> dict:
    """Query the database for catalog changes over the past N days."""
    conn = sqlite3.connect(db_path)
    from datetime import timedelta
    since = (datetime.utcnow() - timedelta(days=days)).isoformat()

    added = conn.execute(
        "SELECT title, detected_at FROM catalog_changes WHERE change_type='added' AND detected_at > ?",
        (since,)
    ).fetchall()

    removed = conn.execute(
        "SELECT title, detected_at FROM catalog_changes WHERE change_type='removed' AND detected_at > ?",
        (since,)
    ).fetchall()

    conn.close()
    return {
        "period_days": days,
        "added": [{"title": r[0], "date": r[1][:10]} for r in added],
        "removed": [{"title": r[0], "date": r[1][:10]} for r in removed],
    }


if __name__ == "__main__":
    PROXY = "http://YOUR_USER:[email protected]:9000"
    daily_catalog_update(proxy=PROXY)

    # Print recent changes
    changes = query_recent_changes(days=7)
    print(f"\nLast 7 days: {len(changes['added'])} added, {len(changes['removed'])} removed")
    for item in changes["added"][:5]:
        print(f"  + {item['title']} ({item['date']})")
    for item in changes["removed"][:5]:
        print(f"  - {item['title']} ({item['date']})")

Comparing Catalogs Across Streaming Services

The real competitive intelligence comes from comparing multiple services. With a unified schema, you can track Hulu against Netflix, Max, and Peacock:

def build_cross_service_schema(db_path: str = "streaming_catalog.db") -> sqlite3.Connection:
    """Multi-service catalog tracking schema."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS titles (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            service TEXT NOT NULL,
            external_id TEXT,
            title TEXT,
            content_type TEXT,
            genres TEXT,
            rating TEXT,
            premiere_date TEXT,
            status TEXT DEFAULT 'active',
            first_seen TEXT,
            last_seen TEXT,
            UNIQUE(service, external_id)
        );
        CREATE TABLE IF NOT EXISTS service_changes (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            service TEXT,
            title TEXT,
            change_type TEXT,
            detected_at TEXT
        );
        CREATE INDEX IF NOT EXISTS idx_titles_service ON titles(service, status);
        CREATE INDEX IF NOT EXISTS idx_titles_name ON titles(title);
    """)
    conn.commit()
    return conn


def find_exclusives(conn: sqlite3.Connection, service: str) -> list:
    """Find titles available on one service but no others."""
    return conn.execute("""
        SELECT t1.title, t1.content_type, t1.genres
        FROM titles t1
        WHERE t1.service = ?
          AND t1.status = 'active'
          AND NOT EXISTS (
              SELECT 1 FROM titles t2
              WHERE t2.title = t1.title
                AND t2.service != t1.service
                AND t2.status = 'active'
          )
        ORDER BY t1.title
    """, (service,)).fetchall()

Error Handling and Reliability

Production catalog tracking needs graceful error handling:

import time
import random
from functools import wraps

def retry_on_failure(max_attempts: int = 3, base_delay: float = 5.0):
    """Decorator for automatic retry with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except (httpx.HTTPStatusError, httpx.TimeoutException) as e:
                    if attempt == max_attempts - 1:
                        raise
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 2)
                    print(f"Attempt {attempt+1} failed ({e}), retrying in {delay:.1f}s...")
                    time.sleep(delay)
        return wrapper
    return decorator


@retry_on_failure(max_attempts=3, base_delay=10.0)
def safe_scrape_hub(scraper: HuluScraper, path: str) -> list:
    """Hub page scraping with retry."""
    return scraper.scrape_hub_page(path)

Practical Notes

Hulu is US-only. You need a US IP address. If you're scraping from outside the US, your proxy must be US-based or you'll just get redirected to the "not available in your region" page. ThorData's residential pool has extensive US coverage, which is exactly what you need here.

Browse pages are your best source. Individual show pages have more detail, but the browse pages give you catalog breadth. Scrape browse pages daily for catalog tracking, and only hit individual show pages when you detect something new.

Content rotation is real. Shows appear and disappear within the same month. Licensed content from NBC, ABC, and other networks has complex windowing agreements. If you're not tracking daily, you'll miss short availability windows entirely.

Don't hammer the site. One request every 3-5 seconds is fine. Hulu's catalog is maybe 5,000-8,000 titles total — you can cover the browse pages in under an hour at a reasonable pace. There's no need to rush and risk getting your IP flagged.

The "expiring soon" hub is gold. Hulu actually publishes which titles are leaving the platform soon at /hub/expiring-soon. Scraping this daily gives you a 30-day advance warning of catalog changes — useful for recommendation tools or alert services.

Data Analysis: Catalog Intelligence

def analyze_catalog(db_path: str = "hulu_catalog.db") -> dict:
    """Generate catalog health and trend statistics."""
    conn = sqlite3.connect(db_path)

    # Overall size
    total = conn.execute("SELECT COUNT(*) FROM shows WHERE status='active'").fetchone()[0]

    # By content type
    by_type = dict(conn.execute(
        "SELECT content_type, COUNT(*) FROM shows WHERE status='active' GROUP BY content_type"
    ).fetchall())

    # Addition/removal rate over 30 days
    from datetime import timedelta
    since = (datetime.utcnow() - timedelta(days=30)).isoformat()
    added_30d = conn.execute(
        "SELECT COUNT(*) FROM catalog_changes WHERE change_type='added' AND detected_at > ?", (since,)
    ).fetchone()[0]
    removed_30d = conn.execute(
        "SELECT COUNT(*) FROM catalog_changes WHERE change_type='removed' AND detected_at > ?", (since,)
    ).fetchone()[0]

    # Catalog churn rate
    churn_rate = round((added_30d + removed_30d) / max(total, 1) * 100, 1)

    conn.close()

    return {
        "total_active": total,
        "by_content_type": by_type,
        "added_last_30_days": added_30d,
        "removed_last_30_days": removed_30d,
        "catalog_churn_rate_pct": churn_rate,
    }

The catalog change data is what makes this worthwhile. Streaming comparison sites charge subscription fees for exactly this kind of information. Build the dataset consistently and it becomes genuinely valuable — especially when you layer in metadata like genre distribution, network sourcing, and availability window lengths.