Scraping Hulu: Content Catalog and Availability Tracking (2026)
Scraping Hulu: Content Catalog and Availability Tracking (2026)
Hulu is a weird target for scraping. It's owned by Disney, runs a massive content library with licensed shows that rotate in and out, and there's no public API. But unlike Netflix, Hulu's catalog is partially browsable without authentication — their marketing pages, show detail pages, and browse sections expose a lot of metadata to search engines. That's your way in.
The main use case? Tracking what content comes and goes. Hulu's library changes constantly as licensing deals expire and new ones start. If you're building a streaming comparison service, monitoring cord-cutting options, or just want to know when a show leaves the platform — you need this data.
What's Accessible
Without logging in, Hulu exposes:
- Show and movie pages — title, description, genre, maturity rating, season/episode counts
- Browse categories — curated collections, genre pages, network-specific hubs (FX, ABC, etc.)
- Episode metadata — titles, air dates, descriptions, episode numbers
- Hub pages — Hulu Originals, trending, newly added, expiring soon
- Structured JSON-LD data — embedded directly in page HTML for SEO purposes
- Content thumbnails and artwork — accessible via CDN URLs
- Show premiere and end dates — useful for tracking licensing windows
With a subscription account (more complex, covered later):
- Full episode details — runtime, availability dates, extras
- Watch history data — if you're scraping your own account
- Recommendation data — personalized suggestions
- Live TV schedule — for Hulu + Live TV subscribers
For this guide, we focus on the public-facing data. It's enough for catalog tracking and doesn't require dealing with authentication tokens.
Dependencies and Setup
pip install httpx[http2] playwright beautifulsoup4 selectolax
playwright install chromium
We use httpx for lightweight static page fetching and Playwright for pages that require JavaScript execution or Cloudflare bypass.
Anti-Bot Setup
Hulu uses a combination of Cloudflare and custom bot detection.
Cloudflare WAF. The standard Cloudflare challenge page shows up for suspicious traffic. Datacenter IPs, high request rates, and missing browser fingerprints all trigger it.
Client-side telemetry. Hulu's JavaScript sends behavioral telemetry back to their analytics. While this isn't a hard block, anomalous patterns (no mouse events, instant page loads, sequential URL access) contribute to an escalating suspicion score.
Rate limiting. Hulu applies rate limits per IP. Hit too many pages in a short window and you'll start seeing 429 responses or Cloudflare challenge pages.
The good news: Hulu's detection is a step below Amazon or Ticketmaster. A stealth browser with residential proxies handles it reliably. For the proxy side, ThorData's residential network works well here — their US residential IPs pass Cloudflare checks without issues, and since Hulu is US-only, that's the only geo you need.
Rotating User-Agents
Keep a realistic pool of browser strings. Cloudflare and Hulu's CDN both inspect User-Agent headers:
import random
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
]
def random_headers() -> dict:
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Cache-Control": "max-age=0",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
}
Scraping the Browse Pages
Hulu's browse pages are the best starting point for catalog discovery. They're server-side rendered with content data embedded in the HTML.
import httpx
import json
import re
import time
from bs4 import BeautifulSoup
from dataclasses import dataclass, field
@dataclass
class HuluShow:
"""Represents a Hulu content item."""
entity_id: str = ""
title: str = ""
description: str = ""
content_type: str = "" # movie, series
genres: list = field(default_factory=list)
rating: str = ""
seasons: int = 0
premiere_date: str = ""
network: str = ""
url: str = ""
thumbnail: str = ""
class HuluScraper:
"""Scrape Hulu content catalog from public pages."""
BASE = "https://www.hulu.com"
def __init__(self, proxy: str = None, delay: float = 3.0):
self.proxy = proxy
self.delay = delay
self._build_client()
def _build_client(self):
client_kwargs = {
"headers": random_headers(),
"timeout": 30,
"follow_redirects": True,
}
if self.proxy:
client_kwargs["proxies"] = {"http://": self.proxy, "https://": self.proxy}
self.client = httpx.Client(**client_kwargs)
def _get(self, path: str) -> httpx.Response:
"""Fetch a page with automatic header rotation and delay."""
# Rotate headers per request
self.client.headers.update(random_headers())
resp = self.client.get(f"{self.BASE}{path}")
time.sleep(self.delay + random.uniform(-0.5, 1.5))
return resp
def scrape_hub_page(self, path: str = "/hub/movies") -> list:
"""Scrape a hub/browse page for content listings."""
resp = self._get(path)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
items = []
# Look for Next.js data blob
next_data = soup.find("script", id="__NEXT_DATA__")
if next_data:
try:
data = json.loads(next_data.string)
page_props = data.get("props", {}).get("pageProps", {})
collections = page_props.get("collections", [])
for collection in collections:
for item in collection.get("items", []):
items.append({
"entity_id": item.get("id", ""),
"title": item.get("name", ""),
"description": item.get("description", ""),
"content_type": item.get("type", ""),
"thumbnail": item.get("artwork", {}).get("horizontal", {}).get("path", ""),
"premiere_date": item.get("premiereDate", ""),
"rating": item.get("rating", ""),
"url": f"{self.BASE}/series/{item.get('id', '')}",
})
except (json.JSONDecodeError, KeyError, TypeError) as e:
print(f"Next.js data parse error on {path}: {e}")
# Fallback: extract from Open Graph and schema markup
if not items:
items.extend(self._extract_schema_items(soup, path))
return items
def _extract_schema_items(self, soup: BeautifulSoup, path: str) -> list:
"""Extract items from schema.org markup as a fallback."""
items = []
for script in soup.find_all("script", type="application/ld+json"):
try:
ld = json.loads(script.string)
if ld.get("@type") == "ItemList":
for element in ld.get("itemListElement", []):
item_data = element.get("item", {})
items.append({
"entity_id": item_data.get("@id", "").split("/")[-1],
"title": item_data.get("name", ""),
"description": item_data.get("description", ""),
"content_type": item_data.get("@type", "").lower(),
"url": item_data.get("url", ""),
})
except (json.JSONDecodeError, KeyError):
continue
return items
Show Detail Scraping
Individual show pages have the richest metadata. Here's where you get episode-level data.
def scrape_show_page(self, show_url: str) -> dict:
"""Scrape detailed metadata from a show's page."""
# Handle relative URLs
if show_url.startswith("/"):
show_url = f"{self.BASE}{show_url}"
resp = self.client.get(show_url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
time.sleep(self.delay)
show = {"url": show_url}
# JSON-LD structured data — most reliable source
ld_scripts = soup.find_all("script", type="application/ld+json")
for script in ld_scripts:
try:
ld = json.loads(script.string)
if ld.get("@type") in ("TVSeries", "Movie", "TVSeason"):
show["jsonld"] = ld
show["title"] = ld.get("name", "")
show["description"] = ld.get("description", "")
show["genres"] = ld.get("genre", [])
show["rating"] = ld.get("contentRating", "")
show["start_date"] = ld.get("startDate", "")
show["end_date"] = ld.get("endDate", "")
show["creator"] = ld.get("creator", {})
# Aggregate rating if present
if "aggregateRating" in ld:
show["aggregate_rating"] = ld["aggregateRating"].get("ratingValue")
show["rating_count"] = ld["aggregateRating"].get("ratingCount")
# Season/episode structure
if "containsSeason" in ld:
seasons = ld["containsSeason"]
show["season_count"] = len(seasons)
show["seasons"] = []
for season in seasons:
s = {
"number": season.get("seasonNumber"),
"episode_count": season.get("numberOfEpisodes"),
"name": season.get("name", ""),
}
if "episode" in season:
s["episodes"] = [{
"number": ep.get("episodeNumber"),
"title": ep.get("name", ""),
"description": ep.get("description", ""),
"date": ep.get("datePublished", ""),
"duration": ep.get("timeRequired", ""),
} for ep in season["episode"]]
show["seasons"].append(s)
break
except (json.JSONDecodeError, KeyError, TypeError):
continue
# Next.js page data as backup
next_data = soup.find("script", id="__NEXT_DATA__")
if next_data and "title" not in show:
try:
data = json.loads(next_data.string)
details = data.get("props", {}).get("pageProps", {}).get("details", {})
show["title"] = details.get("name", show.get("title", ""))
show["network"] = details.get("network", "")
show["description"] = details.get("description", show.get("description", ""))
show["tags"] = details.get("tags", [])
except (json.JSONDecodeError, KeyError):
pass
# Open Graph fallback for basic fields
og_title = soup.find("meta", property="og:title")
og_desc = soup.find("meta", property="og:description")
og_image = soup.find("meta", property="og:image")
if og_title and not show.get("title"):
show["title"] = og_title.get("content", "")
if og_desc and not show.get("description"):
show["description"] = og_desc.get("content", "")
if og_image:
show["thumbnail"] = og_image.get("content", "")
return show
Full Catalog Discovery
To build a complete catalog, crawl all the browse pages systematically.
def discover_full_catalog(self) -> list:
"""Crawl all major hub and genre pages for catalog discovery."""
hub_paths = [
"/hub/movies",
"/hub/tv-shows",
"/hub/hulu-originals",
"/hub/recently-added",
"/hub/expiring-soon",
"/hub/networks/abc",
"/hub/networks/fx",
"/hub/networks/nbc",
"/hub/networks/cbs",
"/hub/networks/bravo",
"/hub/networks/a-and-e",
"/hub/genre/comedy",
"/hub/genre/drama",
"/hub/genre/action",
"/hub/genre/horror",
"/hub/genre/documentary",
"/hub/genre/anime",
"/hub/genre/reality-tv",
"/hub/genre/kids",
"/hub/genre/sci-fi",
"/hub/genre/thriller",
"/hub/genre/crime",
"/hub/genre/romance",
"/hub/genre/animated",
]
all_items = {} # deduplicate by entity_id
for path in hub_paths:
print(f"Scraping {path}...")
try:
items = self.scrape_hub_page(path)
before = len(all_items)
for item in items:
eid = item.get("entity_id")
if eid and eid not in all_items:
all_items[eid] = item
new_items = len(all_items) - before
print(f" Found {len(items)} items (+{new_items} new, {len(all_items)} unique total)")
except Exception as e:
print(f" Error on {path}: {e}")
return list(all_items.values())
Playwright Fallback for Cloudflare-Blocked Pages
When httpx gets Cloudflare challenges on certain Hulu pages, Playwright with stealth handles the JS challenge:
from playwright.sync_api import sync_playwright
import json
def scrape_hulu_with_playwright(path: str, proxy: str = None) -> list:
"""Use Playwright to scrape Hulu hub pages that trigger Cloudflare challenges."""
BASE = "https://www.hulu.com"
launch_kwargs = {
"headless": True,
"args": [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
],
}
if proxy:
launch_kwargs["proxy"] = {"server": proxy}
items = []
with sync_playwright() as p:
browser = p.chromium.launch(**launch_kwargs)
context = browser.new_context(
user_agent=random.choice(USER_AGENTS),
locale="en-US",
timezone_id="America/New_York",
viewport={"width": 1440, "height": 900},
)
# Mask webdriver signals
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
window.chrome = { runtime: {} };
""")
page = context.new_page()
page.goto(f"{BASE}{path}", wait_until="networkidle", timeout=45000)
page.wait_for_timeout(2000)
# Try to extract Next.js data from the page
items_json = page.evaluate("""() => {
const nextData = document.getElementById('__NEXT_DATA__');
if (!nextData) return null;
try {
const data = JSON.parse(nextData.textContent);
const collections = data?.props?.pageProps?.collections || [];
const items = [];
for (const col of collections) {
for (const item of (col.items || [])) {
items.push({
entity_id: item.id || '',
title: item.name || '',
description: item.description || '',
content_type: item.type || '',
premiere_date: item.premiereDate || '',
});
}
}
return items;
} catch(e) { return null; }
}""")
if items_json:
items = items_json
browser.close()
return items
def scrape_episode_list_playwright(show_url: str, proxy: str = None) -> list:
"""Scrape the episode list from a Hulu show page via Playwright."""
launch_kwargs = {"headless": True, "args": ["--no-sandbox"]}
if proxy:
launch_kwargs["proxy"] = {"server": proxy}
episodes = []
with sync_playwright() as p:
browser = p.chromium.launch(**launch_kwargs)
context = browser.new_context(
user_agent=random.choice(USER_AGENTS),
locale="en-US",
)
page = context.new_page()
page.goto(show_url, wait_until="networkidle", timeout=45000)
page.wait_for_timeout(3000)
# Look for episode cards
episode_cards = page.query_selector_all("[class*='episode'], [data-testid*='episode']")
for card in episode_cards:
title_el = card.query_selector("h4, [class*='title']")
desc_el = card.query_selector("[class*='description'], p")
num_el = card.query_selector("[class*='number'], [class*='season']")
episode = {}
if title_el:
episode["title"] = title_el.inner_text().strip()
if desc_el:
episode["description"] = desc_el.inner_text().strip()
if num_el:
episode["number"] = num_el.inner_text().strip()
if episode:
episodes.append(episode)
browser.close()
return episodes
Availability Change Tracking
The real value is tracking changes over time. Run the catalog scraper daily and diff the results.
import sqlite3
from datetime import datetime
def init_hulu_db(path: str = "hulu_catalog.db") -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS shows (
entity_id TEXT PRIMARY KEY,
title TEXT,
content_type TEXT,
genres TEXT,
rating TEXT,
network TEXT,
season_count INTEGER,
description TEXT,
premiere_date TEXT,
thumbnail TEXT,
first_seen TEXT,
last_seen TEXT,
status TEXT DEFAULT 'active'
);
CREATE TABLE IF NOT EXISTS catalog_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id TEXT,
title TEXT,
change_type TEXT, -- 'added' or 'removed'
detected_at TEXT
);
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_date TEXT,
total_shows INTEGER,
total_movies INTEGER,
total_series INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_changes_type ON catalog_changes(change_type);
CREATE INDEX IF NOT EXISTS idx_changes_date ON catalog_changes(detected_at);
""")
conn.commit()
return conn
def update_catalog(conn: sqlite3.Connection, current_items: list) -> dict:
"""Compare current scrape with stored catalog, record changes."""
now = datetime.utcnow().isoformat()
# Get previously known active items
cursor = conn.execute(
"SELECT entity_id, title FROM shows WHERE status = 'active'"
)
known = {row[0]: row[1] for row in cursor.fetchall()}
current = {item["entity_id"]: item for item in current_items if item.get("entity_id")}
added_titles = []
removed_titles = []
# Detect new additions
for eid, item in current.items():
if eid not in known:
conn.execute(
"""INSERT OR REPLACE INTO shows
(entity_id, title, content_type, genres, rating,
network, description, premiere_date, thumbnail,
first_seen, last_seen, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active')""",
(eid, item.get("title"), item.get("content_type"),
json.dumps(item.get("genres", [])), item.get("rating"),
item.get("network", ""), item.get("description", ""),
item.get("premiere_date", ""), item.get("thumbnail", ""),
now, now),
)
conn.execute(
"INSERT INTO catalog_changes (entity_id, title, change_type, detected_at) VALUES (?, ?, 'added', ?)",
(eid, item.get("title"), now),
)
added_titles.append(item.get("title", eid))
else:
conn.execute(
"UPDATE shows SET last_seen = ? WHERE entity_id = ?", (now, eid)
)
# Detect removals
for eid, title in known.items():
if eid not in current:
conn.execute(
"INSERT INTO catalog_changes (entity_id, title, change_type, detected_at) VALUES (?, ?, 'removed', ?)",
(eid, title, now),
)
conn.execute(
"UPDATE shows SET status = 'removed' WHERE entity_id = ?", (eid,)
)
removed_titles.append(title)
# Record snapshot stats
total = len(current)
movies = sum(1 for i in current.values() if i.get("content_type") == "movie")
series = sum(1 for i in current.values() if i.get("content_type") in ("series", "tv"))
conn.execute(
"INSERT INTO snapshots (snapshot_date, total_shows, total_movies, total_series) VALUES (?, ?, ?, ?)",
(now[:10], total, movies, series)
)
conn.commit()
print(f"Catalog update: +{len(added_titles)} added, -{len(removed_titles)} removed")
return {"added": added_titles, "removed": removed_titles}
Enriching New Items with Detail Data
When new titles appear, trigger a detail-page enrichment run to capture full metadata:
def enrich_new_items(
conn: sqlite3.Connection,
scraper: HuluScraper,
added_ids: list,
limit: int = 20,
):
"""Scrape detail pages for newly discovered items."""
cursor = conn.execute(
"SELECT entity_id, title FROM shows WHERE entity_id IN ({}) AND status='active'".format(
",".join("?" * len(added_ids))
),
added_ids,
)
rows = cursor.fetchall()
for eid, title in rows[:limit]:
url = f"https://www.hulu.com/series/{eid}"
print(f" Enriching: {title}")
try:
details = scraper.scrape_show_page(url)
conn.execute(
"""UPDATE shows
SET genres = ?, rating = ?, season_count = ?, description = ?
WHERE entity_id = ?""",
(
json.dumps(details.get("genres", [])),
details.get("rating", ""),
details.get("season_count"),
details.get("description", ""),
eid,
)
)
conn.commit()
except Exception as e:
print(f" Failed to enrich {title}: {e}")
Running the Pipeline
def daily_catalog_update(proxy: str = None, db_path: str = "hulu_catalog.db"):
"""Run a full catalog scan and record changes."""
scraper = HuluScraper(proxy=proxy, delay=3.0)
conn = init_hulu_db(db_path)
print("Discovering catalog...")
items = scraper.discover_full_catalog()
print(f"Found {len(items)} total items")
print("Checking for changes...")
changes = update_catalog(conn, items)
# Enrich new items with detailed metadata
if changes["added"]:
print(f"Enriching {len(changes['added'])} new titles...")
# Get entity_ids for added titles
added_ids = [
item["entity_id"] for item in items
if item.get("title") in changes["added"]
]
enrich_new_items(conn, scraper, added_ids[:20])
# Report catalog health
total_active = conn.execute(
"SELECT COUNT(*) FROM shows WHERE status='active'"
).fetchone()[0]
total_movies = conn.execute(
"SELECT COUNT(*) FROM shows WHERE status='active' AND content_type='movie'"
).fetchone()[0]
print(f"\nCatalog status:")
print(f" Active titles: {total_active}")
print(f" Movies: {total_movies}")
print(f" Series: {total_active - total_movies}")
conn.close()
print("Done.")
def query_recent_changes(db_path: str = "hulu_catalog.db", days: int = 7) -> dict:
"""Query the database for catalog changes over the past N days."""
conn = sqlite3.connect(db_path)
from datetime import timedelta
since = (datetime.utcnow() - timedelta(days=days)).isoformat()
added = conn.execute(
"SELECT title, detected_at FROM catalog_changes WHERE change_type='added' AND detected_at > ?",
(since,)
).fetchall()
removed = conn.execute(
"SELECT title, detected_at FROM catalog_changes WHERE change_type='removed' AND detected_at > ?",
(since,)
).fetchall()
conn.close()
return {
"period_days": days,
"added": [{"title": r[0], "date": r[1][:10]} for r in added],
"removed": [{"title": r[0], "date": r[1][:10]} for r in removed],
}
if __name__ == "__main__":
PROXY = "http://YOUR_USER:[email protected]:9000"
daily_catalog_update(proxy=PROXY)
# Print recent changes
changes = query_recent_changes(days=7)
print(f"\nLast 7 days: {len(changes['added'])} added, {len(changes['removed'])} removed")
for item in changes["added"][:5]:
print(f" + {item['title']} ({item['date']})")
for item in changes["removed"][:5]:
print(f" - {item['title']} ({item['date']})")
Comparing Catalogs Across Streaming Services
The real competitive intelligence comes from comparing multiple services. With a unified schema, you can track Hulu against Netflix, Max, and Peacock:
def build_cross_service_schema(db_path: str = "streaming_catalog.db") -> sqlite3.Connection:
"""Multi-service catalog tracking schema."""
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS titles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT NOT NULL,
external_id TEXT,
title TEXT,
content_type TEXT,
genres TEXT,
rating TEXT,
premiere_date TEXT,
status TEXT DEFAULT 'active',
first_seen TEXT,
last_seen TEXT,
UNIQUE(service, external_id)
);
CREATE TABLE IF NOT EXISTS service_changes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT,
title TEXT,
change_type TEXT,
detected_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_titles_service ON titles(service, status);
CREATE INDEX IF NOT EXISTS idx_titles_name ON titles(title);
""")
conn.commit()
return conn
def find_exclusives(conn: sqlite3.Connection, service: str) -> list:
"""Find titles available on one service but no others."""
return conn.execute("""
SELECT t1.title, t1.content_type, t1.genres
FROM titles t1
WHERE t1.service = ?
AND t1.status = 'active'
AND NOT EXISTS (
SELECT 1 FROM titles t2
WHERE t2.title = t1.title
AND t2.service != t1.service
AND t2.status = 'active'
)
ORDER BY t1.title
""", (service,)).fetchall()
Error Handling and Reliability
Production catalog tracking needs graceful error handling:
import time
import random
from functools import wraps
def retry_on_failure(max_attempts: int = 3, base_delay: float = 5.0):
"""Decorator for automatic retry with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except (httpx.HTTPStatusError, httpx.TimeoutException) as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 2)
print(f"Attempt {attempt+1} failed ({e}), retrying in {delay:.1f}s...")
time.sleep(delay)
return wrapper
return decorator
@retry_on_failure(max_attempts=3, base_delay=10.0)
def safe_scrape_hub(scraper: HuluScraper, path: str) -> list:
"""Hub page scraping with retry."""
return scraper.scrape_hub_page(path)
Practical Notes
Hulu is US-only. You need a US IP address. If you're scraping from outside the US, your proxy must be US-based or you'll just get redirected to the "not available in your region" page. ThorData's residential pool has extensive US coverage, which is exactly what you need here.
Browse pages are your best source. Individual show pages have more detail, but the browse pages give you catalog breadth. Scrape browse pages daily for catalog tracking, and only hit individual show pages when you detect something new.
Content rotation is real. Shows appear and disappear within the same month. Licensed content from NBC, ABC, and other networks has complex windowing agreements. If you're not tracking daily, you'll miss short availability windows entirely.
Don't hammer the site. One request every 3-5 seconds is fine. Hulu's catalog is maybe 5,000-8,000 titles total — you can cover the browse pages in under an hour at a reasonable pace. There's no need to rush and risk getting your IP flagged.
The "expiring soon" hub is gold. Hulu actually publishes which titles are leaving the platform soon at /hub/expiring-soon. Scraping this daily gives you a 30-day advance warning of catalog changes — useful for recommendation tools or alert services.
Data Analysis: Catalog Intelligence
def analyze_catalog(db_path: str = "hulu_catalog.db") -> dict:
"""Generate catalog health and trend statistics."""
conn = sqlite3.connect(db_path)
# Overall size
total = conn.execute("SELECT COUNT(*) FROM shows WHERE status='active'").fetchone()[0]
# By content type
by_type = dict(conn.execute(
"SELECT content_type, COUNT(*) FROM shows WHERE status='active' GROUP BY content_type"
).fetchall())
# Addition/removal rate over 30 days
from datetime import timedelta
since = (datetime.utcnow() - timedelta(days=30)).isoformat()
added_30d = conn.execute(
"SELECT COUNT(*) FROM catalog_changes WHERE change_type='added' AND detected_at > ?", (since,)
).fetchone()[0]
removed_30d = conn.execute(
"SELECT COUNT(*) FROM catalog_changes WHERE change_type='removed' AND detected_at > ?", (since,)
).fetchone()[0]
# Catalog churn rate
churn_rate = round((added_30d + removed_30d) / max(total, 1) * 100, 1)
conn.close()
return {
"total_active": total,
"by_content_type": by_type,
"added_last_30_days": added_30d,
"removed_last_30_days": removed_30d,
"catalog_churn_rate_pct": churn_rate,
}
The catalog change data is what makes this worthwhile. Streaming comparison sites charge subscription fees for exactly this kind of information. Build the dataset consistently and it becomes genuinely valuable — especially when you layer in metadata like genre distribution, network sourcing, and availability window lengths.