← Back to blog

How to Scrape Rotten Tomatoes Reviews, Scores & Tomatometer Data in Python (2026)

How to Scrape Rotten Tomatoes Reviews, Scores & Tomatometer Data in Python (2026)

Rotten Tomatoes is one of the most scraped entertainment sites on the web — and for good reason. Critic scores, audience ratings, consensus blurbs, and individual reviews are all sitting in structured HTML. There's no public API anymore (the old one was killed years ago), so scraping is how you get this data.

The catch: Rotten Tomatoes renders some content with JavaScript. The main movie page loads scores in the initial HTML, but full review lists and audience score breakdowns require JS execution. That means you need two tools — BeautifulSoup for the static parts, and Playwright for the dynamic ones.

This guide covers static HTML parsing, Playwright for JS-rendered reviews, anti-detection techniques, Cloudflare bypass strategies, proxy setup, SQLite storage, and a complete batch collection pipeline.

Setup

pip install beautifulsoup4 lxml requests playwright httpx
playwright install chromium

For Cloudflare bypass:

pip install playwright-stealth

Understanding What's Static vs. Dynamic

Before writing any scraping code, it helps to know what Rotten Tomatoes serves from initial HTML vs. what requires JavaScript:

This distinction determines which tool to use. For collecting scores at scale, requests + BeautifulSoup is 10x faster than Playwright. For review text, Playwright is unavoidable.

Scraping Movie Scores (Static HTML)

The Tomatometer score and audience score are in the initial HTML response:

import requests
from bs4 import BeautifulSoup
import time
import json
import random
import sqlite3
from datetime import datetime, timezone

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/126.0.0.0 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
}


def get_movie_scores(slug: str, session: requests.Session = None) -> dict:
    """Get Tomatometer and audience scores for a movie."""
    url = f"https://www.rottentomatoes.com/m/{slug}"
    s = session or requests.Session()

    resp = s.get(url, headers=HEADERS, timeout=15)
    resp.raise_for_status()

    soup = BeautifulSoup(resp.text, "lxml")

    # Primary score container — uses web component attributes
    score_board = soup.select_one("media-scorecard, score-board-deprecated")

    tomatometer = "N/A"
    audience = "N/A"
    tm_state = "N/A"  # "certified-fresh", "fresh", "rotten"
    as_state = "N/A"  # "upright", "spilled"

    if score_board:
        tomatometer = score_board.get("tomatometerscore", "N/A")
        audience = score_board.get("audiencescore", "N/A")
        tm_state = score_board.get("tomatometerstate", "N/A")
        as_state = score_board.get("audiencestate", "N/A")

    # Consensus blurb
    consensus_el = soup.select_one(
        "[data-qa='score-panel-critics-consensus'], "
        ".what-to-know__section-body"
    )
    consensus = consensus_el.get_text(strip=True) if consensus_el else ""

    # Movie metadata
    info = {}
    for row in soup.select("[data-qa='movie-info-item'], li.info-item"):
        label_el = row.select_one("[data-qa='movie-info-item-label'], b")
        value_el = row.select_one("[data-qa='movie-info-item-value'], span.info-item-value, a")
        if label_el and value_el:
            key = label_el.get_text(strip=True).rstrip(":")
            info[key] = value_el.get_text(strip=True)

    # Review counts
    tm_count_el = soup.select_one("[data-qa='tomatometer-review-count']")
    as_count_el = soup.select_one("[data-qa='audience-rating-count']")

    return {
        "slug": slug,
        "url": url,
        "tomatometer": tomatometer,
        "tomatometer_state": tm_state,
        "audience_score": audience,
        "audience_state": as_state,
        "consensus": consensus,
        "critic_review_count": tm_count_el.get_text(strip=True) if tm_count_el else "",
        "audience_rating_count": as_count_el.get_text(strip=True) if as_count_el else "",
        "info": info,
    }


# Example
movie = get_movie_scores("the_shawshank_redemption")
print(f"Tomatometer: {movie['tomatometer']}% ({movie['tomatometer_state']})")
print(f"Audience Score: {movie['audience_score']}%")
print(f"Consensus: {movie['consensus'][:100]}")
for k, v in movie["info"].items():
    print(f"  {k}: {v}")

Scraping Critic Reviews (JS-Rendered)

Individual critic reviews load dynamically. Playwright handles this:

from playwright.sync_api import sync_playwright

# pip install playwright-stealth
try:
    from playwright_stealth import stealth_sync
    HAS_STEALTH = True
except ImportError:
    HAS_STEALTH = False


def get_critic_reviews(
    slug: str,
    max_pages: int = 5,
    proxy: dict = None,
) -> list[dict]:
    """Scrape critic reviews using Playwright for JS rendering."""
    reviews = []
    url = f"https://www.rottentomatoes.com/m/{slug}/reviews"

    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        context_kwargs = {
            "user_agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
            "viewport": {"width": 1920, "height": 1080},
            "locale": "en-US",
        }
        if proxy:
            context_kwargs["proxy"] = proxy

        context = browser.new_context(**context_kwargs)
        page = context.new_page()

        # Apply stealth patches to defeat headless detection
        if HAS_STEALTH:
            stealth_sync(page)

        page.goto(url, wait_until="networkidle", timeout=30000)
        time.sleep(random.uniform(2, 4))

        for page_num in range(max_pages):
            try:
                page.wait_for_selector("[data-qa='review-item']", timeout=10000)
            except Exception:
                break

            html = page.content()
            soup = BeautifulSoup(html, "lxml")

            for card in soup.select("[data-qa='review-item']"):
                critic_el = card.select_one("[data-qa='review-critic']")
                pub_el = card.select_one("[data-qa='review-publication']")
                quote_el = card.select_one("[data-qa='review-quote']")
                date_el = card.select_one("[data-qa='review-date']")
                score_el = card.select_one("[data-qa='review-score']")
                link_el = card.select_one("[data-qa='review-link']")
                is_fresh = card.select_one("[data-qa='review-icon-fresh']") is not None
                is_rotten = card.select_one("[data-qa='review-icon-rotten']") is not None

                reviews.append({
                    "critic": critic_el.get_text(strip=True) if critic_el else "",
                    "publication": pub_el.get_text(strip=True) if pub_el else "",
                    "quote": quote_el.get_text(strip=True) if quote_el else "",
                    "date": date_el.get_text(strip=True) if date_el else "",
                    "score": score_el.get_text(strip=True) if score_el else "",
                    "review_url": link_el.get("href", "") if link_el else "",
                    "fresh": is_fresh,
                    "rotten": is_rotten,
                    "page": page_num + 1,
                })

            # Click "Load More" or "Next" button
            next_btn = page.query_selector(
                "rt-button[data-qa='next-btn'], "
                "button[data-qa='next-btn']"
            )
            if not next_btn:
                break

            next_btn.click()
            time.sleep(random.uniform(2, 4))

        browser.close()

    return reviews


reviews = get_critic_reviews("oppenheimer_2023", max_pages=3)
fresh = sum(1 for r in reviews if r["fresh"])
rotten = sum(1 for r in reviews if r["rotten"])
print(f"Scraped {len(reviews)} reviews: {fresh} fresh, {rotten} rotten")
for r in reviews[:5]:
    icon = "[F]" if r["fresh"] else "[R]"
    print(f"  {icon} {r['critic']} ({r['publication']}): {r['quote'][:80]}")

Scraping Audience Reviews

Audience reviews are also JS-rendered and paginated separately:

def get_audience_reviews(slug: str, max_pages: int = 3, proxy: dict = None) -> list[dict]:
    """Scrape audience reviews from Rotten Tomatoes."""
    reviews = []
    url = f"https://www.rottentomatoes.com/m/{slug}/reviews?type=user"

    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        ctx = browser.new_context(
            user_agent=(
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                "AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36"
            ),
            **({"proxy": proxy} if proxy else {}),
        )
        page = ctx.new_page()
        if HAS_STEALTH:
            stealth_sync(page)

        page.goto(url, wait_until="networkidle", timeout=30000)

        for _ in range(max_pages):
            time.sleep(random.uniform(2, 3))
            html = page.content()
            soup = BeautifulSoup(html, "lxml")

            for card in soup.select("[data-qa='audience-review-item']"):
                user_el = card.select_one("[data-qa='audience-reviewer']")
                score_el = card.select_one("[data-qa='audience-reviewer-score']")
                text_el = card.select_one("[data-qa='audience-review-body']")
                date_el = card.select_one("[data-qa='audience-review-date']")

                reviews.append({
                    "user": user_el.get_text(strip=True) if user_el else "",
                    "score": score_el.get_text(strip=True) if score_el else "",
                    "text": text_el.get_text(strip=True) if text_el else "",
                    "date": date_el.get_text(strip=True) if date_el else "",
                })

            next_btn = page.query_selector("rt-button[data-qa='next-btn']")
            if not next_btn:
                break
            next_btn.click()

        browser.close()

    return reviews

Browsing and Searching Movies

The search endpoint works with static HTML:

def search_movies(query: str, session: requests.Session = None) -> list[dict]:
    """Search Rotten Tomatoes for movies by title."""
    url = "https://www.rottentomatoes.com/search"
    s = session or requests.Session()
    resp = s.get(url, params={"search": query}, headers=HEADERS, timeout=15)
    soup = BeautifulSoup(resp.text, "lxml")

    results = []
    for item in soup.select("search-page-media-row[type='movie']"):
        name_el = item.select_one("a[data-qa='info-name']")
        year = item.get("releaseyear", "")
        score = item.get("tomatometerscore", "")
        href = name_el["href"] if name_el and name_el.has_attr("href") else ""
        slug = href.rstrip("/").split("/m/")[-1] if "/m/" in href else ""

        results.append({
            "title": name_el.get_text(strip=True) if name_el else "",
            "year": year,
            "tomatometer": score,
            "slug": slug,
            "url": f"https://www.rottentomatoes.com{href}" if href else "",
        })
    return results


def get_movies_by_category(category: str = "top-100-movies") -> list[str]:
    """
    Scrape a category page for movie slugs.
    Categories: top-100-movies, coming-soon, new-movies-and-tv, best-movies-on-netflix
    """
    url = f"https://www.rottentomatoes.com/{category}"
    resp = requests.get(url, headers=HEADERS, timeout=15)
    soup = BeautifulSoup(resp.text, "lxml")

    slugs = []
    for link in soup.select("a[href*='/m/']"):
        href = link.get("href", "")
        # Extract slug from URL like /m/movie_slug
        parts = href.split("/m/")
        if len(parts) > 1:
            slug = parts[1].rstrip("/").split("/")[0]
            if slug and slug not in slugs:
                slugs.append(slug)
    return slugs

Anti-Bot Measures and Cloudflare Bypass

Rotten Tomatoes uses Cloudflare, and they've gotten more aggressive. Here's what you'll encounter:

Rate limiting. More than 20-30 requests per minute from the same IP triggers a challenge page — 403 or a Cloudflare "checking your browser" interstitial.

Browser fingerprinting. Playwright in headless mode has detectable signals: the navigator.webdriver flag, missing plugins, canvas fingerprint differences. playwright-stealth patches these:

from playwright_stealth import stealth_sync

# Apply after creating the page, before any navigation
stealth_sync(page)

IP reputation. Residential IPs work. Datacenter IPs (cloud VMs, VPS servers) get flagged within the first few hits. If you're running from a cloud VM, your requests get challenged almost immediately.

For serious volume, rotating residential proxies are essential. ThorData's residential proxy network routes each request through a different residential IP, making Cloudflare see organic-looking traffic:

proxy_config = {
    "server": "http://proxy.thordata.com:9000",
    "username": "YOUR_USER",
    "password": "YOUR_PASS",
}

context = browser.new_context(
    proxy=proxy_config,
    user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
)

For static HTML requests via httpx:

import httpx

client = httpx.Client(
    proxy="http://YOUR_USER:[email protected]:9000",
    headers=HEADERS,
    timeout=20,
)
resp = client.get(f"https://www.rottentomatoes.com/m/{slug}")

Request spacing. Even with proxies, add 2-5 second delays between page loads. There's no reason to hammer the site and risk getting your proxy pool flagged.

Retry and Error Handling

Rotten Tomatoes selectors shift periodically. Build defensive scraping:

def safe_get_text(element, fallback: str = "") -> str:
    """Safely extract text from a BeautifulSoup element."""
    if element is None:
        return fallback
    return element.get_text(strip=True) or fallback


def fetch_with_retry(
    url: str,
    session: requests.Session,
    max_attempts: int = 5,
) -> requests.Response:
    """GET with exponential backoff for rate limits."""
    for attempt in range(max_attempts):
        try:
            resp = session.get(url, headers=HEADERS, timeout=15)
            if resp.status_code == 429:
                wait = 2 ** attempt + random.uniform(0, 1)
                print(f"Rate limited. Waiting {wait:.1f}s...")
                time.sleep(wait)
                continue
            if resp.status_code == 403:
                # Cloudflare challenge — rotate proxy if possible
                raise RuntimeError(f"403 Cloudflare block at {url}")
            resp.raise_for_status()
            return resp
        except requests.RequestException:
            if attempt == max_attempts - 1:
                raise
            time.sleep(2 ** attempt)
    raise RuntimeError("Max retries exceeded")

SQLite Storage

For building a film score database or NLP training corpus:

def init_rt_db(db_path: str = "rottentomatoes.db") -> sqlite3.Connection:
    """Initialize a SQLite database for Rotten Tomatoes data."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")

    conn.executescript("""
        CREATE TABLE IF NOT EXISTS movies (
            slug TEXT PRIMARY KEY,
            url TEXT,
            tomatometer TEXT,
            tomatometer_state TEXT,
            audience_score TEXT,
            audience_state TEXT,
            consensus TEXT,
            info_json TEXT,
            scraped_at TEXT DEFAULT (datetime('now'))
        );

        CREATE TABLE IF NOT EXISTS critic_reviews (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            slug TEXT NOT NULL,
            critic TEXT,
            publication TEXT,
            quote TEXT,
            date TEXT,
            score TEXT,
            review_url TEXT,
            fresh INTEGER,
            page INTEGER,
            scraped_at TEXT DEFAULT (datetime('now')),
            FOREIGN KEY (slug) REFERENCES movies(slug)
        );

        CREATE TABLE IF NOT EXISTS audience_reviews (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            slug TEXT NOT NULL,
            reviewer TEXT,
            score TEXT,
            text TEXT,
            date TEXT,
            scraped_at TEXT DEFAULT (datetime('now')),
            FOREIGN KEY (slug) REFERENCES movies(slug)
        );

        CREATE INDEX IF NOT EXISTS idx_reviews_slug ON critic_reviews(slug);
        CREATE INDEX IF NOT EXISTS idx_audience_slug ON audience_reviews(slug);
        CREATE INDEX IF NOT EXISTS idx_movies_tm ON movies(tomatometer);
    """)
    conn.commit()
    return conn


def upsert_movie(conn: sqlite3.Connection, movie: dict) -> None:
    conn.execute("""
        INSERT INTO movies
            (slug, url, tomatometer, tomatometer_state, audience_score,
             audience_state, consensus, info_json, scraped_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(slug) DO UPDATE SET
            tomatometer=excluded.tomatometer,
            audience_score=excluded.audience_score,
            consensus=excluded.consensus,
            scraped_at=excluded.scraped_at
    """, (
        movie["slug"], movie["url"], movie["tomatometer"],
        movie.get("tomatometer_state"), movie["audience_score"],
        movie.get("audience_state"), movie["consensus"],
        json.dumps(movie.get("info", {})),
        datetime.now(timezone.utc).isoformat(),
    ))
    conn.commit()


def insert_reviews(conn: sqlite3.Connection, slug: str, reviews: list[dict]) -> int:
    inserted = 0
    for r in reviews:
        try:
            conn.execute("""
                INSERT INTO critic_reviews
                    (slug, critic, publication, quote, date, score, review_url, fresh, page)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                slug, r["critic"], r["publication"], r["quote"],
                r["date"], r.get("score"), r.get("review_url"),
                int(r["fresh"]), r.get("page", 1),
            ))
            inserted += 1
        except sqlite3.IntegrityError:
            pass
    conn.commit()
    return inserted

Batch Collection Pipeline

Putting it all together for a list of movies:

def collect_movies(slugs: list[str], db_path: str = "rottentomatoes.db") -> dict:
    """Collect scores for a list of movie slugs."""
    conn = init_rt_db(db_path)
    session = requests.Session()
    stats = {"ok": 0, "failed": 0, "skipped": 0}

    for i, slug in enumerate(slugs):
        # Check if we already have this movie
        existing = conn.execute(
            "SELECT scraped_at FROM movies WHERE slug = ?", (slug,)
        ).fetchone()
        if existing:
            stats["skipped"] += 1
            continue

        try:
            movie = get_movie_scores(slug, session=session)
            upsert_movie(conn, movie)
            stats["ok"] += 1
            print(f"[{i+1}/{len(slugs)}] {slug}: {movie['tomatometer']}% / {movie['audience_score']}%")
        except Exception as e:
            stats["failed"] += 1
            print(f"[{i+1}/{len(slugs)}] FAILED {slug}: {e}")

        # Polite delay with jitter
        time.sleep(random.uniform(1.5, 3.5))

    conn.close()
    return stats


# Collect data for a list of films
film_slugs = [
    "the_shawshank_redemption",
    "the_godfather",
    "pulp_fiction",
    "schindler_s_list",
    "oppenheimer_2023",
    "parasite_2019",
    "get_out",
    "hereditary_2018",
    "everything_everywhere_all_at_once",
    "the_menu_2022",
]

stats = collect_movies(film_slugs)
print(f"\nResults: {stats['ok']} collected, {stats['failed']} failed, {stats['skipped']} skipped")

Analyzing the Data

def query_top_rated(conn: sqlite3.Connection, min_tm: int = 90) -> list[dict]:
    """Get movies with Tomatometer above threshold."""
    rows = conn.execute("""
        SELECT slug, tomatometer, audience_score, consensus
        FROM movies
        WHERE CAST(tomatometer AS INTEGER) >= ?
        ORDER BY CAST(tomatometer AS INTEGER) DESC
    """, (min_tm,)).fetchall()
    return [
        {"slug": r[0], "tomatometer": r[1], "audience": r[2], "consensus": r[3]}
        for r in rows
    ]


def most_controversial(conn: sqlite3.Connection) -> list[dict]:
    """Films with high Tomatometer but low audience score (or vice versa)."""
    rows = conn.execute("""
        SELECT slug, tomatometer, audience_score,
               ABS(CAST(tomatometer AS INTEGER) - CAST(audience_score AS INTEGER)) AS gap
        FROM movies
        WHERE tomatometer != 'N/A' AND audience_score != 'N/A'
          AND CAST(tomatometer AS INTEGER) > 0
          AND CAST(audience_score AS INTEGER) > 0
        ORDER BY gap DESC
        LIMIT 20
    """).fetchall()
    return [
        {
            "slug": r[0], "tomatometer": r[1],
            "audience": r[2], "gap": r[3]
        }
        for r in rows
    ]

What to Watch Out For

Selectors change. Rotten Tomatoes redesigns their markup every few months. The data-qa attributes are more stable than class names, but they still shift. Build your scraper to fail loudly when a selector returns nothing rather than silently collecting empty data.

Respect robots.txt. Rotten Tomatoes allows crawling of movie pages but restricts some paths. Check /robots.txt before scraping a new section.

Legal note. Scraping publicly available information is generally legal under the hiQ v. LinkedIn precedent, but don't republish Rotten Tomatoes content wholesale. Aggregate scores, build datasets for analysis, feed ML models — that's standard fair use territory. Don't clone their review database and publish it as your own.

The Rotten Tomatoes scraping pipeline: static HTML for scores, Playwright for reviews, residential proxies for volume, and solid error handling for when the markup inevitably changes. Start with a few movies to verify selectors, then scale up carefully.

Building a Comparison Dataset

Once you have scores for a large catalog, analytical queries become straightforward:

def genre_score_analysis(conn: sqlite3.Connection) -> None:
    """Analyze Tomatometer vs audience score by genre/category."""
    rows = conn.execute("""
        SELECT slug, tomatometer, audience_score
        FROM movies
        WHERE tomatometer != 'N/A' AND audience_score != 'N/A'
          AND CAST(tomatometer AS INTEGER) > 0
          AND CAST(audience_score AS INTEGER) > 0
    """).fetchall()

    tomatometer_scores = [int(r[1]) for r in rows]
    audience_scores = [int(r[2]) for r in rows]
    gaps = [abs(t - a) for t, a in zip(tomatometer_scores, audience_scores)]

    print(f"Dataset: {len(rows)} movies")
    print(f"Avg Tomatometer: {sum(tomatometer_scores)/len(tomatometer_scores):.1f}")
    print(f"Avg Audience Score: {sum(audience_scores)/len(audience_scores):.1f}")
    print(f"Avg Critic/Audience Gap: {sum(gaps)/len(gaps):.1f} points")
    print(f"Most controversial (gap > 40): {sum(1 for g in gaps if g > 40)} movies")


def export_reviews_for_nlp(conn: sqlite3.Connection, filename: str = "reviews.jsonl") -> int:
    """Export critic reviews as JSONL for NLP training/analysis."""
    rows = conn.execute("""
        SELECT m.slug, cr.critic, cr.publication, cr.quote, cr.fresh
        FROM critic_reviews cr
        JOIN movies m ON m.slug = cr.slug
        WHERE cr.quote != '' AND LENGTH(cr.quote) > 50
        ORDER BY m.slug, cr.fresh DESC
    """).fetchall()

    with open(filename, "w", encoding="utf-8") as f:
        for r in rows:
            record = {
                "movie": r[0],
                "critic": r[1],
                "publication": r[2],
                "text": r[3],
                "label": "fresh" if r[4] else "rotten",
            }
            f.write(json.dumps(record, ensure_ascii=False) + "\n")

    print(f"Exported {len(rows)} reviews to {filename}")
    return len(rows)

TV Show Scraping

Rotten Tomatoes also covers TV series. The URL pattern differs slightly:

def get_tv_scores(show_slug: str, session: requests.Session = None) -> dict:
    """Get Tomatometer and audience scores for a TV series."""
    url = f"https://www.rottentomatoes.com/tv/{show_slug}"
    s = session or requests.Session()
    resp = s.get(url, headers=HEADERS, timeout=15)

    if resp.status_code == 404:
        return {"error": "Not found", "slug": show_slug}

    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "lxml")

    score_board = soup.select_one("media-scorecard, score-board-deprecated")
    tomatometer = score_board.get("tomatometerscore", "N/A") if score_board else "N/A"
    audience = score_board.get("audiencescore", "N/A") if score_board else "N/A"

    consensus_el = soup.select_one("[data-qa='score-panel-critics-consensus']")
    consensus = consensus_el.get_text(strip=True) if consensus_el else ""

    return {
        "slug": show_slug,
        "url": url,
        "type": "tv",
        "tomatometer": tomatometer,
        "audience_score": audience,
        "consensus": consensus,
    }


# TV show slugs use underscores
tv_shows = [
    "the_wire", "breaking_bad", "succession", "chernobyl",
    "the_bear", "severance", "white_lotus", "andor"
]
for slug in tv_shows:
    result = get_tv_scores(slug)
    print(f"  {slug}: {result.get('tomatometer', 'N/A')}% critics / "
          f"{result.get('audience_score', 'N/A')}% audience")
    time.sleep(random.uniform(1.5, 3.0))

Tracking Score Changes Over Time

Tomatometer scores change as more critics file reviews, especially in the first weeks after release. A polling approach captures this:

def track_score_trajectory(
    slug: str,
    conn: sqlite3.Connection,
    poll_interval_hours: float = 12,
    polls: int = 14,
) -> list[dict]:
    """Poll a movie's score multiple times to track Tomatometer trajectory."""
    history = []
    session = requests.Session()

    for i in range(polls):
        try:
            data = get_movie_scores(slug, session=session)
            record = {
                "slug": slug,
                "tomatometer": data["tomatometer"],
                "audience_score": data["audience_score"],
                "polled_at": datetime.now(timezone.utc).isoformat(),
            }
            history.append(record)
            conn.execute("""
                INSERT INTO movies
                    (slug, url, tomatometer, audience_score, consensus, scraped_at)
                VALUES (?, ?, ?, ?, ?, ?)
                ON CONFLICT(slug) DO UPDATE SET
                    tomatometer=excluded.tomatometer,
                    audience_score=excluded.audience_score,
                    scraped_at=excluded.scraped_at
            """, (
                slug, data["url"], data["tomatometer"],
                data["audience_score"], data.get("consensus", ""),
                record["polled_at"],
            ))
            conn.commit()
            print(f"  Poll {i+1}: {data['tomatometer']}% TM / {data['audience_score']}% audience")
        except Exception as e:
            print(f"  Poll {i+1} failed: {e}")

        if i < polls - 1:
            time.sleep(poll_interval_hours * 3600)

    return history