← Back to blog

Scraping Roblox Game Statistics and Player Data with Python (2026)

Scraping Roblox Game Statistics and Player Data with Python (2026)

Roblox hosts over 40 million user-created experiences and reports 80+ million daily active users. The platform exposes a reasonably complete set of REST APIs for game data — visit counts, concurrent players, game passes, badges, thumbnails — that don't require authentication for public reads. If you're building a game analytics tool, competitor tracker, market research pipeline, or game recommendation system, these APIs are a solid starting point.

The catch: rate limits are real, IP-based, and inconsistently documented. This guide covers what's actually available, how to hit it with Python, what to watch for when scaling up, and how to build a production-grade collection system with SQLite storage.

Platform Architecture: What You Need to Know

Roblox exposes data through a collection of subdomain APIs rather than a single unified endpoint. Understanding this architecture saves time when debugging:

API Subdomain Purpose
apis.roblox.com Universe ID lookup, general utilities
games.roblox.com Game details, search, game passes, badges
thumbnails.roblox.com Game icons, thumbnails, avatar renders
users.roblox.com User profiles, usernames
groups.roblox.com Group details, group games
badges.roblox.com Badge metadata
catalog.roblox.com Asset marketplace
economy.roblox.com Asset pricing, reseller data

These subdomain APIs are separate from www.roblox.com and generally do not require authentication for public data. Crucially, they also bypass most of Cloudflare's protections that the main website has. Always hit subdomain APIs directly rather than scraping HTML pages.

What Data Is Available

Roblox organizes experiences around two ID types:

With the universe ID you can pull:

Rate Limits and Anti-Bot Measures

Roblox doesn't fight scrapers as aggressively as consumer e-commerce sites, but the limits are real:

  1. Per-IP rate limiting — Most API endpoints allow roughly 60-100 requests per minute per IP. The games and thumbnails endpoints are more lenient; user-lookup endpoints hit limits faster.
  2. 429 responses with Retry-After — When you exceed limits, responses include Retry-After headers. Always respect these.
  3. Authentication not required — Read-only public endpoints (game details, thumbnails, user profiles) don't require a .ROBLOSECURITY cookie.
  4. Cloudflare on www.roblox.com — The main website runs Cloudflare. The subdomain APIs (games.roblox.com, etc.) generally don't, which is why using subdomain APIs directly avoids fingerprinting issues entirely.
  5. Silent throttling — Very high request rates may result in empty responses rather than explicit 429s. If you start getting empty data arrays on valid universe IDs, back off.

When to Use Proxies

For single-use analysis, you can usually collect thousands of records without proxies by: - Respecting the 12-second interval between requests - Batching 100 universe IDs per request where supported - Distributing collection across multiple hours

When collecting data across tens of thousands of games on a schedule, distributing requests across ThorData residential proxies keeps per-IP rates well under limits. Their pool supports sticky sessions if you need consistent IPs for paginated requests:

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "gate.thordata.net"
THORDATA_PORT = 9000

def make_proxy(country: str = "us", session_id: str = None) -> str:
    user = f"{THORDATA_USER}-country-{country}"
    if session_id:
        user += f"-session-{session_id}"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"

Setup

uv pip install httpx asyncio

No API key required for public endpoints. All requests below are unauthenticated.

Mapping Place IDs to Universe IDs

This is always the first step. Roblox's public URLs expose the place ID; the API wants the universe ID.

import httpx
import time
from typing import Optional

def place_to_universe(
    place_ids: list[int],
    proxy: Optional[str] = None,
) -> dict[int, int]:
    """
    Convert place IDs to universe IDs.

    Accepts up to 100 place IDs per call.
    Returns {place_id: universe_id}
    """
    url = "https://apis.roblox.com/universes/v1/places/multiget/universe-ids"
    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params={"ids": place_ids})

    if resp.status_code == 429:
        retry_after = int(resp.headers.get("Retry-After", 30))
        print(f"Rate limited on place-to-universe, waiting {retry_after}s")
        time.sleep(retry_after)
        return place_to_universe(place_ids, proxy=proxy)

    resp.raise_for_status()

    return {
        item["placeId"]: item["universeId"]
        for item in resp.json().get("universeIds", [])
    }


# Example: convert popular game place IDs
place_ids = [6872265039, 606849621, 5030456452, 142823291, 1818]
mapping = place_to_universe(place_ids)
print("Place → Universe mapping:")
for pid, uid in mapping.items():
    print(f"  {pid} → {uid}")

Fetching Game Details (Batch)

The games endpoint accepts up to 100 universe IDs per request. This is the primary workhorse for data collection.

def get_game_details(
    universe_ids: list[int],
    proxy: Optional[str] = None,
) -> list[dict]:
    """
    Fetch experience details for up to 100 universe IDs.

    Key fields: id, rootPlaceId, name, description, creator,
    genre, visits, playing, maxPlayers, favoritedCount,
    created, updated
    """
    url = "https://games.roblox.com/v1/games"
    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params={"universeIds": universe_ids})

    if resp.status_code == 429:
        retry_after = int(resp.headers.get("Retry-After", 30))
        print(f"Rate limited, waiting {retry_after}s")
        time.sleep(retry_after)
        return get_game_details(universe_ids, proxy=proxy)

    if resp.status_code != 200:
        print(f"Error {resp.status_code} fetching game details")
        return []

    results = []
    for game in resp.json().get("data", []):
        creator = game.get("creator", {}) or {}
        results.append({
            "universe_id": game["id"],
            "root_place_id": game.get("rootPlaceId"),
            "name": game.get("name"),
            "description": (game.get("description") or "")[:500],
            "creator_id": creator.get("id"),
            "creator_name": creator.get("name"),
            "creator_type": creator.get("type"),  # "User" or "Group"
            "creator_has_verified_badge": creator.get("hasVerifiedBadge", False),
            "genre": game.get("genre"),
            "genre_l2": game.get("genre_l2"),
            "visits": game.get("visits", 0),
            "playing": game.get("playing", 0),
            "max_players": game.get("maxPlayers"),
            "favorited_count": game.get("favoritedCount", 0),
            "created": game.get("created"),
            "updated": game.get("updated"),
            "studio_access_to_apis_allowed": game.get("studioAccessToApisAllowed", False),
            "create_vip_servers_allowed": game.get("createVipServersAllowed", False),
        })

    return results


# Fetch details for multiple games
universe_ids = [3940149465, 301549609, 5030456452]
games = get_game_details(universe_ids)
for g in games:
    print(f"{g['name']}: {g['visits']:,} total visits, {g['playing']} currently online")

Searching Games

Discover games by keyword or browse trending/top charts:

def search_games(
    keyword: str = None,
    sort_type: str = "PlayerCount",
    game_filter: str = "0",
    limit: int = 50,
    proxy: Optional[str] = None,
) -> list[dict]:
    """
    Search or browse Roblox experiences.

    sort_type: Relevance | PlayerCount | Favorited | Visits | RobuxEarned | Default
    game_filter: 0=all, 1=featured, 2=popular, 3=spotlight
    """
    url = "https://games.roblox.com/v1/games/list"

    all_results = []
    page_token = ""

    while len(all_results) < limit:
        params = {
            "model.sortToken": page_token,
            "model.gameFilter": game_filter,
            "model.sortDefinition": sort_type,
            "model.startRows": len(all_results),
            "model.maxRows": min(48, limit - len(all_results)),
        }
        if keyword:
            params["model.keyword"] = keyword
            params["model.pageContext.isSeeAllPage"] = "true"

        client_kwargs = {"timeout": 15}
        if proxy:
            client_kwargs["proxies"] = {"all://": proxy}

        with httpx.Client(**client_kwargs) as client:
            resp = client.get(url, params=params)

        if resp.status_code == 429:
            time.sleep(int(resp.headers.get("Retry-After", 30)))
            continue

        if resp.status_code != 200:
            break

        data = resp.json()
        games = data.get("games", [])
        if not games:
            break

        for g in games:
            all_results.append({
                "universe_id": g.get("universeId"),
                "name": g.get("name"),
                "player_count": g.get("playerCount"),
                "total_up_votes": g.get("totalUpVotes"),
                "total_down_votes": g.get("totalDownVotes"),
                "approval_rating": (
                    round(g.get("totalUpVotes", 0) /
                          max(g.get("totalUpVotes", 0) + g.get("totalDownVotes", 0), 1) * 100, 1)
                    if g.get("totalUpVotes") is not None else None
                ),
                "thumbnail_url": g.get("gameDescription"),
            })

        page_token = data.get("nextPageExclusiveStartKey", "")
        if not page_token:
            break

    return all_results[:limit]


# Find top games by player count
top_games = search_games(sort_type="PlayerCount", limit=50)
for g in top_games[:5]:
    print(f"{g['name']}: {g['player_count']:,} online, {g.get('approval_rating', '?')}% approval")

Fetching Game Icons and Thumbnails

def get_game_icons(
    universe_ids: list[int],
    size: str = "512x512",
    proxy: Optional[str] = None,
) -> dict[int, str]:
    """
    Fetch game icon URLs for multiple universe IDs.

    size: 50x50 | 128x128 | 256x256 | 512x512
    Returns {universe_id: image_url}
    """
    url = "https://thumbnails.roblox.com/v1/games/icons"
    params = {
        "universeIds": universe_ids,
        "returnPolicy": "PlaceHolder",
        "size": size,
        "format": "Png",
        "isCircular": "false",
    }

    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params=params)

    if resp.status_code != 200:
        return {}

    return {
        item["targetId"]: item["imageUrl"]
        for item in resp.json().get("data", [])
        if item.get("state") == "Completed" and item.get("imageUrl")
    }


def get_game_thumbnails(
    universe_ids: list[int],
    proxy: Optional[str] = None,
) -> dict[int, list[str]]:
    """
    Fetch multiple thumbnail URLs per game.
    Returns {universe_id: [url1, url2, ...]}
    """
    url = "https://thumbnails.roblox.com/v1/games/multiget/thumbnails"
    params = {
        "universeIds": universe_ids,
        "countPerUniverse": 1,
        "defaults": "true",
        "size": "768x432",
        "format": "Png",
        "isCircular": "false",
    }

    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url, params=params)

    if resp.status_code != 200:
        return {}

    result = {}
    for item in resp.json().get("data", []):
        uid = item.get("universeId")
        thumbnails = [
            t["imageUrl"]
            for t in item.get("thumbnails", [])
            if t.get("state") == "Completed"
        ]
        if uid and thumbnails:
            result[uid] = thumbnails

    return result

Fetching Game Passes

def get_game_passes(
    universe_id: int,
    proxy: Optional[str] = None,
) -> list[dict]:
    """
    Fetch all game passes for an experience.

    Game passes are in-game purchasable items. Their prices in Robux
    are a key revenue signal — games with many high-priced passes
    are optimized for monetization.
    """
    url = f"https://games.roblox.com/v1/games/{universe_id}/game-passes"
    params = {"limit": 100, "sortOrder": "Asc"}
    passes = []

    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        while True:
            resp = client.get(url, params=params)

            if resp.status_code == 429:
                time.sleep(int(resp.headers.get("Retry-After", 30)))
                continue
            if resp.status_code != 200:
                break

            data = resp.json()
            for p in data.get("data", []):
                passes.append({
                    "id": p["id"],
                    "name": p.get("name"),
                    "display_name": p.get("displayName") or p.get("name"),
                    "price": p.get("price"),  # Robux, None if not for sale
                    "seller_id": p.get("sellerId"),
                    "is_for_sale": p.get("price") is not None,
                })

            cursor = data.get("nextPageCursor")
            if not cursor:
                break
            params["cursor"] = cursor

    return passes


def estimate_game_monetization(
    universe_id: int,
    proxy: Optional[str] = None,
) -> dict:
    """
    Estimate a game's monetization depth from game passes.

    More passes + higher prices = more aggressive monetization.
    """
    passes = get_game_passes(universe_id, proxy=proxy)
    priced_passes = [p for p in passes if p.get("price") is not None and p["price"] > 0]
    prices = [p["price"] for p in priced_passes]

    if not prices:
        return {"pass_count": len(passes), "monetization_score": 0}

    return {
        "pass_count": len(passes),
        "priced_pass_count": len(priced_passes),
        "min_pass_price": min(prices),
        "max_pass_price": max(prices),
        "avg_pass_price": round(sum(prices) / len(prices), 1),
        "total_priced_passes": len(priced_passes),
        "monetization_score": round(len(priced_passes) * sum(prices) / max(len(prices), 1) / 100, 2),
        "passes": priced_passes[:5],  # top 5 for reference
    }

Fetching Game Badges

def get_game_badges(
    universe_id: int,
    limit: int = 100,
    proxy: Optional[str] = None,
) -> list[dict]:
    """
    Fetch badges for an experience.

    Win rate (awarder.count / statistics.winRatePercentage) indicates
    how difficult/common a badge is. Low win-rate badges signal deep content.
    """
    url = f"https://badges.roblox.com/v1/universes/{universe_id}/badges"
    params = {"limit": min(limit, 100), "sortOrder": "Desc"}
    badges = []

    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        while len(badges) < limit:
            resp = client.get(url, params=params)

            if resp.status_code == 429:
                time.sleep(int(resp.headers.get("Retry-After", 30)))
                continue
            if resp.status_code != 200:
                break

            data = resp.json()
            for b in data.get("data", []):
                stats = b.get("statistics", {}) or {}
                badges.append({
                    "id": b["id"],
                    "name": b.get("name"),
                    "description": (b.get("description") or "")[:200],
                    "win_rate_pct": stats.get("winRatePercentage"),
                    "awarded_count": stats.get("awardedCount"),
                    "created": b.get("created"),
                    "updated": b.get("updated"),
                    "enabled": b.get("enabled", True),
                })

            cursor = data.get("nextPageCursor")
            if not cursor:
                break
            params["cursor"] = cursor

    return badges[:limit]

Fetching Creator (User) Info

def get_user_info(user_id: int, proxy: Optional[str] = None) -> dict:
    """Fetch public profile for a Roblox user ID."""
    client_kwargs = {"timeout": 10}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(f"https://users.roblox.com/v1/users/{user_id}")

    if resp.status_code != 200:
        return {}

    data = resp.json()
    return {
        "user_id": data["id"],
        "username": data["name"],
        "display_name": data.get("displayName"),
        "description": (data.get("description") or "")[:300],
        "created": data.get("created"),
        "is_banned": data.get("isBanned", False),
        "has_verified_badge": data.get("hasVerifiedBadge", False),
    }


def get_users_batch(user_ids: list[int], proxy: Optional[str] = None) -> list[dict]:
    """Fetch multiple user profiles in one request (up to 100)."""
    url = "https://users.roblox.com/v1/users"
    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        resp = client.post(
            url,
            json={"userIds": user_ids[:100], "excludeBannedUsers": False},
        )

    if resp.status_code != 200:
        return []

    return [
        {
            "user_id": u["id"],
            "username": u["name"],
            "display_name": u.get("displayName"),
            "has_verified_badge": u.get("hasVerifiedBadge", False),
        }
        for u in resp.json().get("data", [])
    ]


def get_user_games(
    user_id: int,
    limit: int = 50,
    proxy: Optional[str] = None,
) -> list[dict]:
    """Fetch games created by a specific user."""
    url = f"https://games.roblox.com/v2/users/{user_id}/games"
    params = {"limit": min(limit, 50), "sortOrder": "Desc"}
    games = []

    client_kwargs = {"timeout": 15}
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    with httpx.Client(**client_kwargs) as client:
        while len(games) < limit:
            resp = client.get(url, params=params)

            if resp.status_code == 429:
                time.sleep(int(resp.headers.get("Retry-After", 30)))
                continue
            if resp.status_code != 200:
                break

            data = resp.json()
            for g in data.get("data", []):
                games.append({
                    "universe_id": g.get("id"),
                    "root_place_id": g.get("rootPlace", {}).get("id"),
                    "name": g.get("name"),
                    "description": (g.get("description") or "")[:300],
                    "plays": g.get("placeVisits", 0),
                    "created": g.get("created"),
                    "updated": g.get("updated"),
                })

            cursor = data.get("nextPageCursor")
            if not cursor:
                break
            params["cursor"] = cursor

    return games[:limit]

Async Batch Collection with Rate Limiting

For bulk collection, async HTTP dramatically improves throughput while maintaining rate limits:

import asyncio
import httpx

async def fetch_game_batch_async(
    client: httpx.AsyncClient,
    universe_ids: list[int],
) -> list[dict]:
    """Fetch game details for a batch of universe IDs asynchronously."""
    url = "https://games.roblox.com/v1/games"
    resp = await client.get(url, params={"universeIds": universe_ids})

    if resp.status_code == 429:
        retry_after = int(resp.headers.get("Retry-After", 30))
        await asyncio.sleep(retry_after)
        return await fetch_game_batch_async(client, universe_ids)

    resp.raise_for_status()
    return resp.json().get("data", [])


async def collect_all_games_async(
    all_universe_ids: list[int],
    requests_per_minute: int = 60,
    proxy: Optional[str] = None,
) -> list[dict]:
    """
    Collect data for large sets of universe IDs asynchronously.

    Groups IDs into batches of 100 (API max) and rate-limits to
    requests_per_minute across all concurrent requests.
    """
    delay = 60.0 / requests_per_minute
    batch_size = 100
    all_results = []

    client_kwargs = {}
    if proxy:
        client_kwargs["proxies"] = proxy

    async with httpx.AsyncClient(**client_kwargs, timeout=20) as client:
        for i in range(0, len(all_universe_ids), batch_size):
            batch = all_universe_ids[i:i + batch_size]

            try:
                data = await fetch_game_batch_async(client, batch)
                all_results.extend(data)
                print(f"Batch {i//batch_size + 1}: {len(data)} games (total: {len(all_results)})")
            except httpx.HTTPStatusError as e:
                print(f"Batch {i//batch_size + 1} failed: {e}")
            except Exception as e:
                print(f"Unexpected error on batch {i//batch_size + 1}: {e}")

            await asyncio.sleep(delay)

    return all_results


def collect_games_sync(
    universe_ids: list[int],
    proxy: Optional[str] = None,
) -> list[dict]:
    """Synchronous wrapper for the async collector."""
    return asyncio.run(collect_all_games_async(universe_ids, proxy=proxy))

Storing in SQLite

import sqlite3
import json
from datetime import datetime, timezone

def init_db(path: str = "roblox_games.db") -> sqlite3.Connection:
    """Initialize the Roblox game analytics database."""
    conn = sqlite3.connect(path)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS games (
            universe_id INTEGER PRIMARY KEY,
            root_place_id INTEGER,
            name TEXT,
            description TEXT,
            creator_id INTEGER,
            creator_name TEXT,
            creator_type TEXT,
            genre TEXT,
            visits INTEGER DEFAULT 0,
            playing INTEGER DEFAULT 0,
            max_players INTEGER,
            favorited_count INTEGER DEFAULT 0,
            created TEXT,
            updated TEXT,
            icon_url TEXT,
            first_seen TEXT,
            last_scraped TEXT
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS visit_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            universe_id INTEGER NOT NULL,
            visits INTEGER NOT NULL,
            playing INTEGER,
            favorited_count INTEGER,
            recorded_at TEXT NOT NULL
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS game_passes (
            id INTEGER PRIMARY KEY,
            universe_id INTEGER NOT NULL,
            name TEXT,
            price INTEGER,
            is_for_sale INTEGER DEFAULT 0,
            FOREIGN KEY (universe_id) REFERENCES games(universe_id)
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS creators (
            user_id INTEGER PRIMARY KEY,
            username TEXT,
            display_name TEXT,
            created TEXT,
            has_verified_badge INTEGER DEFAULT 0,
            is_banned INTEGER DEFAULT 0,
            last_scraped TEXT
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_universe ON visit_snapshots(universe_id)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_time ON visit_snapshots(recorded_at)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_games_visits ON games(visits DESC)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_games_playing ON games(playing DESC)")

    conn.commit()
    return conn


def save_game(conn: sqlite3.Connection, game: dict, icon_url: str = None):
    """Save game data and record a visit snapshot."""
    now = datetime.now(timezone.utc).isoformat()

    # Check if this is a new game or an update
    existing = conn.execute(
        "SELECT visits FROM games WHERE universe_id=?", (game["universe_id"],)
    ).fetchone()

    # Always record snapshot
    conn.execute("""
        INSERT INTO visit_snapshots (universe_id, visits, playing, favorited_count, recorded_at)
        VALUES (?,?,?,?,?)
    """, (
        game["universe_id"],
        game.get("visits", 0),
        game.get("playing", 0),
        game.get("favorited_count", 0),
        now,
    ))

    # Upsert game record
    conn.execute("""
        INSERT INTO games
        (universe_id, root_place_id, name, description, creator_id, creator_name,
         creator_type, genre, visits, playing, max_players, favorited_count,
         created, updated, icon_url, first_seen, last_scraped)
        VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
        ON CONFLICT(universe_id) DO UPDATE SET
            name=excluded.name,
            description=excluded.description,
            creator_name=excluded.creator_name,
            genre=excluded.genre,
            visits=excluded.visits,
            playing=excluded.playing,
            max_players=excluded.max_players,
            favorited_count=excluded.favorited_count,
            updated=excluded.updated,
            icon_url=COALESCE(excluded.icon_url, games.icon_url),
            last_scraped=excluded.last_scraped
    """, (
        game["universe_id"],
        game.get("root_place_id"),
        game.get("name"),
        game.get("description"),
        game.get("creator_id"),
        game.get("creator_name"),
        game.get("creator_type"),
        game.get("genre"),
        game.get("visits", 0),
        game.get("playing", 0),
        game.get("max_players"),
        game.get("favorited_count", 0),
        game.get("created"),
        game.get("updated"),
        icon_url,
        now if existing is None else None,  # first_seen only set on insert
        now,
    ))

    conn.commit()


def save_game_passes(conn: sqlite3.Connection, universe_id: int, passes: list[dict]):
    """Save game passes to database."""
    for p in passes:
        conn.execute("""
            INSERT OR REPLACE INTO game_passes (id, universe_id, name, price, is_for_sale)
            VALUES (?,?,?,?,?)
        """, (p["id"], universe_id, p.get("name"), p.get("price"), 1 if p.get("is_for_sale") else 0))
    conn.commit()

Analytics Queries

def find_trending_games(conn: sqlite3.Connection, days: int = 1) -> list[dict]:
    """Find games with highest visit growth in the last N days."""
    from datetime import datetime, timedelta
    cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()

    rows = conn.execute("""
        SELECT
            g.universe_id, g.name, g.creator_name,
            s_now.visits as visits_now,
            s_old.visits as visits_before,
            (s_now.visits - COALESCE(s_old.visits, 0)) as visit_gain,
            s_now.playing
        FROM games g
        JOIN (
            SELECT universe_id, visits, playing
            FROM visit_snapshots vs1
            WHERE recorded_at = (SELECT MAX(recorded_at) FROM visit_snapshots WHERE universe_id = vs1.universe_id)
        ) s_now ON s_now.universe_id = g.universe_id
        LEFT JOIN (
            SELECT universe_id, visits
            FROM visit_snapshots vs2
            WHERE recorded_at <= ?
              AND recorded_at = (SELECT MAX(recorded_at) FROM visit_snapshots WHERE universe_id = vs2.universe_id AND recorded_at <= ?)
        ) s_old ON s_old.universe_id = g.universe_id
        WHERE visit_gain > 0
        ORDER BY visit_gain DESC
        LIMIT 20
    """, (cutoff, cutoff)).fetchall()

    return [
        {
            "universe_id": r[0],
            "name": r[1],
            "creator": r[2],
            "total_visits": r[3],
            "visit_gain": r[5],
            "currently_playing": r[6],
        }
        for r in rows
    ]


def genre_stats(conn: sqlite3.Connection) -> list[dict]:
    """Analyze game statistics by genre."""
    rows = conn.execute("""
        SELECT genre,
               COUNT(*) as game_count,
               SUM(visits) as total_visits,
               AVG(visits) as avg_visits,
               MAX(playing) as peak_concurrent,
               AVG(playing) as avg_concurrent
        FROM games
        WHERE genre IS NOT NULL
        GROUP BY genre
        ORDER BY total_visits DESC
    """).fetchall()

    return [
        {
            "genre": r[0],
            "game_count": r[1],
            "total_visits": r[2],
            "avg_visits": round(r[3] or 0),
            "peak_concurrent": r[4],
            "avg_concurrent": round(r[5] or 0),
        }
        for r in rows
    ]


def top_monetizing_creators(conn: sqlite3.Connection) -> list[dict]:
    """Rank creators by total visits across all their games."""
    rows = conn.execute("""
        SELECT creator_id, creator_name, creator_type,
               COUNT(*) as game_count,
               SUM(visits) as total_visits,
               MAX(visits) as top_game_visits,
               SUM(playing) as total_concurrent
        FROM games
        WHERE creator_id IS NOT NULL
        GROUP BY creator_id
        ORDER BY total_visits DESC
        LIMIT 20
    """).fetchall()

    return [
        {
            "creator_id": r[0],
            "creator_name": r[1],
            "creator_type": r[2],
            "game_count": r[3],
            "total_visits": r[4],
            "top_game_visits": r[5],
            "total_concurrent": r[6],
        }
        for r in rows
    ]

Complete Pipeline

if __name__ == "__main__":
    conn = init_db()

    print("=== Phase 1: Collect top games by player count ===")
    top_games = search_games(sort_type="PlayerCount", limit=200)
    print(f"Found {len(top_games)} games")

    universe_ids = [g["universe_id"] for g in top_games if g.get("universe_id")]

    # Batch-fetch game details
    for i in range(0, len(universe_ids), 100):
        batch = universe_ids[i:i+100]
        details = get_game_details(batch)

        # Get icons for this batch
        icons = get_game_icons(batch)

        for game in details:
            uid = game["universe_id"]
            save_game(conn, game, icon_url=icons.get(uid))

        print(f"Batch {i//100 + 1}: stored {len(details)} games")
        time.sleep(12)

    print("\n=== Phase 2: Collect game passes for monetization analysis ===")
    # Only fetch passes for top games by visits
    top_by_visits = conn.execute(
        "SELECT universe_id FROM games ORDER BY visits DESC LIMIT 50"
    ).fetchall()

    for row in top_by_visits:
        uid = row[0]
        passes = get_game_passes(uid)
        save_game_passes(conn, uid, passes)
        monetization = estimate_game_monetization(uid)
        if monetization.get("priced_pass_count", 0) > 0:
            print(f"  Universe {uid}: {monetization['priced_pass_count']} paid passes, avg {monetization['avg_pass_price']} Robux")
        time.sleep(12)

    print("\n=== Analytics ===")
    print("\nTop 5 genres by total visits:")
    for g in genre_stats(conn)[:5]:
        print(f"  {g['genre']}: {g['total_visits']:,} visits, {g['game_count']} games")

    print("\nTop 5 trending games today:")
    for g in find_trending_games(conn, days=1)[:5]:
        print(f"  {g['name']}: +{g['visit_gain']:,} visits, {g['currently_playing']} online")

    print("\nTop 5 creators by total visits:")
    for c in top_monetizing_creators(conn)[:5]:
        print(f"  {c['creator_name']}: {c['total_visits']:,} total visits across {c['game_count']} games")

    conn.close()

Scaling Considerations

A few things to plan for when you move past a few hundred games:

Batch all universe ID lookups — Always send up to 100 IDs per request rather than one at a time. You hit the same rate limit either way, but get 100x throughput.

Track updated timestamps — Roblox returns the last-modified date on each experience. Cache your results and only re-fetch games that have been updated since your last run:

def needs_update(conn: sqlite3.Connection, universe_id: int, updated: str) -> bool:
    """Check if a game needs re-fetching based on updated timestamp."""
    row = conn.execute(
        "SELECT updated FROM games WHERE universe_id=?", (universe_id,)
    ).fetchone()
    if row is None:
        return True
    return row[0] != updated

Thumbnail fetching is a separate pipelinethumbnails.roblox.com has its own rate limit. Batch thumbnail collection separately from game details collection.

Proxy rotation for bulk jobsThorData residential proxies keep per-IP request rates well under limits when collecting across tens of thousands of games. Their pool supports sticky sessions for paginated requests where IP consistency matters:

async def collect_with_proxy_rotation(universe_ids: list[int]) -> list[dict]:
    """Distribute game collection across proxy IPs."""
    results = []
    batch_size = 100

    for i in range(0, len(universe_ids), batch_size):
        batch = universe_ids[i:i+batch_size]
        # Fresh proxy per batch
        proxy = make_proxy(country="us")
        details = get_game_details(batch, proxy=proxy)
        results.extend(details)
        await asyncio.sleep(1)

    return results

Key Takeaways