← Back to blog

How to Scrape Steam Workshop Mods in 2026: Subscribers, Ratings & Update History

How to Scrape Steam Workshop Mods in 2026: Subscribers, Ratings & Update History

Steam Workshop hosts tens of millions of user-created items — mods, maps, cosmetics, scenarios — across thousands of games. If you're tracking mod popularity trends, building a mod recommendation tool, researching modding ecosystems, or monitoring a game's community health, Workshop data is surprisingly rich and largely accessible through official APIs.

The main interface is IPublishedFileService, a Steam Web API service that covers everything from subscriber counts to update history. There are gaps — comments and changelogs require scraping the Workshop pages directly — but for most use cases the API gets you most of the way there.

What Data Is Available

Through the API and Workshop pages combined, you can pull:

Subscriber count is probably the most useful signal. It's a real-time measure of how many Steam accounts have the mod installed, not just downloaded once. Tracking it over time gives you an adoption velocity curve.

Who Uses Workshop Data

Before writing code, it's worth knowing what people actually build with this data:

Steam Web API: IPublishedFileService

Get a free API key at steamcommunity.com/dev/apikey. There's no approval process for basic Workshop access.

The two endpoints you'll use most:

Both are documented in the Steamworks API reference.

Fetching Mod Details

GetDetails accepts multiple publishedfileids in a single call, which makes batch querying straightforward.

import requests
import time
import json

STEAM_API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.steampowered.com"


def get_workshop_details(file_ids: list[int],
                          return_vote_data: bool = True) -> list[dict]:
    """Fetch full metadata for a list of Workshop item IDs."""
    url = f"{BASE_URL}/IPublishedFileService/GetDetails/v1/"

    params = {
        "key": STEAM_API_KEY,
        "includeadditionalpreviews": 1,
        "includechildren": 1,
        "includetags": 1,
        "includekvtags": 1,
        "includevotes": 1 if return_vote_data else 0,
        "short_description": 0,
        "includeforsaledata": 0,
        "includemetadata": 1,
        "return_playtime_stats": 0,
        "appid": 0,  # 0 = any app
        "strip_description_bbcode": 1,
    }

    for i, fid in enumerate(file_ids):
        params[f"publishedfileids[{i}]"] = fid

    resp = requests.get(url, params=params, timeout=20)
    resp.raise_for_status()

    data = resp.json()
    items = data.get("response", {}).get("publishedfiledetails", [])

    results = []
    for item in items:
        if item.get("result") != 1:
            continue  # item not found or private

        vote = item.get("vote_data", {})
        results.append({
            "file_id": item["publishedfileid"],
            "app_id": item.get("creator_app_id"),
            "consumer_app_id": item.get("consumer_app_id"),
            "title": item.get("title"),
            "description": item.get("description", "")[:1000],
            "creator_steam_id": item.get("creator"),
            "created": item.get("time_created"),
            "updated": item.get("time_updated"),
            "file_size": item.get("file_size"),
            "preview_url": item.get("preview_url"),
            "url": f"https://steamcommunity.com/sharedfiles/filedetails/?id={item['publishedfileid']}",
            "subscriptions": item.get("subscriptions", 0),
            "favorited": item.get("favorited", 0),
            "lifetime_subscriptions": item.get("lifetime_subscriptions", 0),
            "lifetime_favorited": item.get("lifetime_favorited", 0),
            "views": item.get("views", 0),
            "num_comments": item.get("num_comments_public", 0),
            "tags": [t["tag"] for t in item.get("tags", [])],
            "votes_up": vote.get("votes_up", 0),
            "votes_down": vote.get("votes_down", 0),
            "vote_score": vote.get("score", 0),
            "num_children": item.get("num_children", 0),
        })

    return results

The vote_data field returns votes up, votes down, and a normalized score. It's useful for quality filtering — a high-subscriber mod with a low vote score is likely controversial or has quality issues.

Querying Workshop Items for a Game

To discover mods for a specific game rather than fetching known IDs, use QueryFiles:

def query_workshop(app_id: int, query_type: int = 1,
                    page: int = 1, per_page: int = 100,
                    required_tags: list[str] = None) -> dict:
    """
    Query Workshop items for a specific game.

    query_type values:
        0  = ranked by vote (top rated)
        1  = ranked by publication date (newest)
        3  = ranked by trend (trending)
        12 = ranked by total subscriptions (all-time most subscribed)
    """
    url = f"{BASE_URL}/IPublishedFileService/QueryFiles/v1/"
    params = {
        "key": STEAM_API_KEY,
        "query_type": query_type,
        "page": page,
        "numperpage": min(per_page, 100),
        "appid": app_id,
        "return_vote_data": 1,
        "return_tags": 1,
        "return_children": 0,
        "return_short_description": 1,
        "return_previews": 0,
        "return_metadata": 1,
    }

    if required_tags:
        for i, tag in enumerate(required_tags):
            params[f"requiredtags[{i}]"] = tag

    resp = requests.get(url, params=params, timeout=20)
    resp.raise_for_status()
    data = resp.json().get("response", {})

    return {
        "total": data.get("total", 0),
        "items": data.get("publishedfiledetails", []),
        "has_more": data.get("total", 0) > page * per_page,
    }


def get_top_mods(app_id: int, limit: int = 500,
                  query_type: int = 12) -> list[dict]:
    """Get top Workshop mods by subscriber count for a game."""
    all_items = []
    page = 1

    while len(all_items) < limit:
        result = query_workshop(app_id, query_type=query_type, page=page)
        batch = result["items"]
        if not batch:
            break

        all_items.extend(batch)
        print(f"Page {page}: {len(batch)} items "
              f"(total collected: {len(all_items)})")

        if not result.get("has_more"):
            break

        page += 1
        time.sleep(1.5)  # well under 200/5min rate limit

    return all_items[:limit]


def discover_mods_by_tag(app_id: int, tag: str,
                          limit: int = 200) -> list[dict]:
    """Find Workshop items for a game with a specific tag."""
    all_items = []
    page = 1

    while len(all_items) < limit:
        result = query_workshop(
            app_id, query_type=12, page=page, required_tags=[tag]
        )
        if not result["items"]:
            break
        all_items.extend(result["items"])
        page += 1
        time.sleep(1.5)

    return all_items[:limit]

Rate Limits and Anti-Bot Measures

The Steam Web API enforces roughly 200 requests per 5 minutes per API key. Hit that ceiling and you get 429 responses. At 1-2 seconds between calls you'll never come close.

The Workshop web pages are a different story. If you're hitting Workshop item pages to scrape changelogs or comments, Steam throttles aggressively at the IP level around 200 page requests per 5 minutes. Beyond that you start seeing redirects to a captcha or outright 429s.

Spreading requests across multiple IPs is the reliable solution. A residential proxy service works well here because Steam is more suspicious of datacenter IPs — residential IPs match what a real user looks like. ThorData offers residential proxies across 195+ countries with per-request IP rotation, which keeps your Workshop page scraping well below the per-IP threshold even when crawling at scale.

import random

PROXY_USER = "YOUR_USER"
PROXY_PASS = "YOUR_PASS"
PROXY_HOST = "proxy.thordata.com"
PROXY_PORT = 9000


def make_proxy(country: str = None) -> dict:
    """Build a rotating residential proxy config."""
    base = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
    if country:
        base += f"?country={country}"
    return {"http": base, "https": base}


def proxied_get(url: str, params: dict = None,
                headers: dict = None, country: str = None) -> requests.Response:
    """Make a GET request through a residential proxy."""
    default_headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/125.0.0.0 Safari/537.36"
        ),
        "Accept-Language": "en-US,en;q=0.9",
    }
    if headers:
        default_headers.update(headers)

    resp = requests.get(
        url,
        params=params,
        headers=default_headers,
        proxies=make_proxy(country),
        timeout=20,
    )
    return resp

Scraping Workshop Pages for Supplementary Data

Some data isn't in the API at all. Changelogs and comment threads require hitting the Workshop item page directly.

from bs4 import BeautifulSoup

def scrape_changelog(file_id: int, proxies: dict = None) -> list[dict]:
    """Scrape changelog entries from a Workshop item's changelog page."""
    url = (f"https://steamcommunity.com/sharedfiles/"
           f"filedetails/changelog/{file_id}")
    headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36"
        ),
        "Accept-Language": "en-US,en;q=0.9",
    }

    resp = requests.get(url, headers=headers,
                         proxies=proxies, timeout=20)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")

    changelog_entries = []
    for entry in soup.select(".changelog_entry, .workshopItemChange"):
        date_el = entry.select_one(
            ".changelog_header, .workshopItemChangeDate"
        )
        body_el = entry.select_one(
            ".changelog_body, .workshopItemChangeDesc"
        )
        if date_el and body_el:
            changelog_entries.append({
                "date": date_el.get_text(strip=True),
                "notes": body_el.get_text(strip=True)[:1000],
            })

    return changelog_entries[:25]


def get_collection_members(collection_id: int) -> list[int]:
    """Get all item IDs in a Workshop collection via ISteamRemoteStorage."""
    url = f"{BASE_URL}/ISteamRemoteStorage/GetCollectionDetails/v1/"
    payload = {
        "collectioncount": 1,
        "publishedfileids[0]": collection_id,
    }
    resp = requests.post(url, data=payload, timeout=20)
    details = (
        resp.json()
        .get("response", {})
        .get("collectiondetails", [{}])[0]
    )
    children = details.get("children", [])
    return [int(c["publishedfileid"]) for c in children]


def get_creator_items(steam_id: str,
                       app_id: int = None) -> list[dict]:
    """Get all Workshop items published by a specific Steam user."""
    url = f"{BASE_URL}/IPublishedFileService/GetUserFiles/v1/"
    params = {
        "key": STEAM_API_KEY,
        "steamid": steam_id,
        "appid": app_id or 0,
        "numperpage": 100,
        "return_vote_data": 1,
        "return_tags": 1,
    }

    resp = requests.get(url, params=params, timeout=20)
    resp.raise_for_status()
    data = resp.json().get("response", {})
    return data.get("publishedfiledetails", [])

Batch Querying

GetDetails handles up to 100 file IDs per request. Chunk your ID lists to take advantage of this — it's much faster than individual calls.

def batch_get_details(file_ids: list[int],
                       chunk_size: int = 100) -> list[dict]:
    """Fetch details for a large list of file IDs in batches."""
    all_results = []
    total = len(file_ids)

    for i in range(0, total, chunk_size):
        chunk = file_ids[i:i + chunk_size]
        try:
            results = get_workshop_details(chunk)
            all_results.extend(results)
            print(f"Fetched {len(all_results)}/{total} mods")
        except requests.HTTPError as e:
            print(f"HTTP error on chunk {i//chunk_size}: {e}")

        time.sleep(2)

    return all_results

With a 2-second delay between batches of 100, you can pull details on 3,000 mods in about 60 seconds — well under the rate limit.

Storing Results in SQLite

import sqlite3

def init_db(db_path: str = "workshop.db") -> sqlite3.Connection:
    """Initialize SQLite schema for Workshop data."""
    conn = sqlite3.connect(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS mods (
            file_id TEXT PRIMARY KEY,
            app_id INTEGER,
            consumer_app_id INTEGER,
            title TEXT,
            creator_steam_id TEXT,
            created INTEGER,
            updated INTEGER,
            file_size INTEGER,
            subscriptions INTEGER DEFAULT 0,
            lifetime_subscriptions INTEGER DEFAULT 0,
            favorited INTEGER DEFAULT 0,
            lifetime_favorited INTEGER DEFAULT 0,
            views INTEGER DEFAULT 0,
            num_comments INTEGER DEFAULT 0,
            votes_up INTEGER DEFAULT 0,
            votes_down INTEGER DEFAULT 0,
            vote_score REAL DEFAULT 0,
            tags TEXT,
            description TEXT,
            preview_url TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE TABLE IF NOT EXISTS changelogs (
            file_id TEXT,
            entry_date TEXT,
            notes TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (file_id, entry_date)
        );

        CREATE TABLE IF NOT EXISTS subscription_history (
            file_id TEXT,
            subscriptions INTEGER,
            recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (file_id, recorded_at)
        );

        CREATE INDEX IF NOT EXISTS idx_mods_app ON mods(app_id);
        CREATE INDEX IF NOT EXISTS idx_mods_subs ON mods(subscriptions);
        CREATE INDEX IF NOT EXISTS idx_mods_updated ON mods(updated);
    """)
    conn.commit()
    return conn


def save_mod(conn: sqlite3.Connection, mod: dict) -> None:
    """Save or update a mod record."""
    conn.execute("""
        INSERT OR REPLACE INTO mods
        (file_id, app_id, consumer_app_id, title, creator_steam_id,
         created, updated, file_size, subscriptions,
         lifetime_subscriptions, favorited, lifetime_favorited,
         views, num_comments, votes_up, votes_down, vote_score,
         tags, description, preview_url)
        VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    """, (
        mod["file_id"], mod.get("app_id"), mod.get("consumer_app_id"),
        mod.get("title"), mod.get("creator_steam_id"),
        mod.get("created"), mod.get("updated"), mod.get("file_size"),
        mod.get("subscriptions", 0), mod.get("lifetime_subscriptions", 0),
        mod.get("favorited", 0), mod.get("lifetime_favorited", 0),
        mod.get("views", 0), mod.get("num_comments", 0),
        mod.get("votes_up", 0), mod.get("votes_down", 0),
        mod.get("vote_score", 0),
        json.dumps(mod.get("tags", [])),
        mod.get("description", "")[:1000],
        mod.get("preview_url"),
    ))

    # Track subscription history
    conn.execute("""
        INSERT OR IGNORE INTO subscription_history (file_id, subscriptions)
        VALUES (?,?)
    """, (mod["file_id"], mod.get("subscriptions", 0)))

    conn.commit()


def get_subscription_growth(conn: sqlite3.Connection,
                              file_id: str) -> list[dict]:
    """Get subscription count history for a mod."""
    rows = conn.execute("""
        SELECT subscriptions, recorded_at
        FROM subscription_history
        WHERE file_id = ?
        ORDER BY recorded_at
    """, (file_id,)).fetchall()

    return [{"subscriptions": r[0], "date": r[1]} for r in rows]

Tracking updated timestamps over time lets you build an update frequency signal — useful for identifying actively maintained mods versus abandoned ones.

Complete Crawl for a Game

Here's a complete run for Garry's Mod (app ID 4000):

def crawl_game_workshop(app_id: int, game_name: str,
                         limit: int = 500,
                         include_changelogs: bool = False) -> None:
    """Full Workshop crawl for a specific game."""
    conn = init_db()

    print(f"Crawling Workshop for {game_name} (app {app_id})")
    print("Fetching top mods by subscriber count...")

    # Get top mods
    top_items = get_top_mods(app_id, limit=limit)
    file_ids = [int(item.get("publishedfileid", 0))
                for item in top_items
                if item.get("publishedfileid")]

    print(f"Discovered {len(file_ids)} file IDs")
    print("Batch fetching full details...")

    # Batch fetch details
    detailed = batch_get_details(file_ids)

    for mod in detailed:
        save_mod(conn, mod)

    print(f"Saved {len(detailed)} mods to workshop.db")

    # Optionally fetch changelogs
    if include_changelogs:
        print("Fetching changelogs for top 50 mods...")
        proxies = make_proxy("us")
        top_50 = sorted(
            detailed, key=lambda m: m.get("subscriptions", 0), reverse=True
        )[:50]

        for mod in top_50:
            fid = mod["file_id"]
            try:
                entries = scrape_changelog(int(fid), proxies=proxies)
                for entry in entries:
                    conn.execute("""
                        INSERT OR IGNORE INTO changelogs
                        (file_id, entry_date, notes)
                        VALUES (?,?,?)
                    """, (fid, entry["date"], entry["notes"]))
                conn.commit()
                print(f"  {mod['title'][:40]}: {len(entries)} changelog entries")
            except Exception as e:
                print(f"  Changelog error for {fid}: {e}")

            time.sleep(random.uniform(3, 6))

    # Print summary
    print(f"\n=== Top 10 Mods for {game_name} ===")
    for row in conn.execute("""
        SELECT title, subscriptions, lifetime_subscriptions,
               votes_up, votes_down
        FROM mods
        WHERE app_id = ?
        ORDER BY subscriptions DESC
        LIMIT 10
    """, (app_id,)):
        ratio = (
            row[3] / max(row[3] + row[4], 1) * 100
            if row[3] else 0
        )
        print(f"  {row[1]:>8,} subs | {ratio:.0f}% pos | {row[0][:40]}")

    conn.close()


# Example run
crawl_game_workshop(
    app_id=4000,
    game_name="Garry's Mod",
    limit=200,
    include_changelogs=True,
)

Steam's Subscriber Agreement restricts automated access, but IPublishedFileService is a documented public API explicitly provided for developer use. Pulling mod metadata through it is well within intended usage — tools like GMAD and mod management platforms have done exactly this for years.

Don't scrape user profile data at scale, don't re-host mod files, and don't hammer Workshop pages without rate limiting. The API rate limits exist for a reason; stay under them and there's no issue. Comments and changelogs are public content, but scrape them with the same restraint you'd apply to any community site.

For any serious data collection project, check the specific game's modding policies too — a few publishers have additional restrictions on Workshop content.

Key Takeaways

Analyzing Modding Ecosystem Health

Beyond individual mod data, the Workshop dataset enables ecosystem-level analysis:

def analyze_game_ecosystem(conn: sqlite3.Connection,
                             app_id: int,
                             game_name: str = "") -> None:
    """Print ecosystem health metrics for a game's Workshop."""
    print(f"=== Workshop Ecosystem: {game_name or app_id} ===\n")

    # Overall stats
    row = conn.execute("""
        SELECT COUNT(*) as total_mods,
               SUM(subscriptions) as total_subs,
               AVG(subscriptions) as avg_subs,
               MAX(subscriptions) as max_subs,
               COUNT(CASE WHEN updated > ? THEN 1 END) as active_30d
        FROM mods
        WHERE app_id = ?
    """, (int(time.time()) - 30 * 86400, app_id)).fetchone()

    if row:
        print(f"Total mods: {row[0]:,}")
        print(f"Total subscriptions: {row[1]:,}")
        print(f"Avg subscriptions/mod: {row[2]:,.0f}")
        print(f"Most subscribed mod: {row[3]:,}")
        print(f"Updated in last 30 days: {row[4]:,}")

    # Top tags
    print("\nTop tags by mod count:")
    all_mods = conn.execute(
        "SELECT tags FROM mods WHERE app_id = ?", (app_id,)
    ).fetchall()

    tag_counts = {}
    for row in all_mods:
        try:
            tags = json.loads(row[0] or "[]")
            for tag in tags:
                tag_counts[tag] = tag_counts.get(tag, 0) + 1
        except json.JSONDecodeError:
            pass

    for tag, count in sorted(
        tag_counts.items(), key=lambda x: -x[1]
    )[:10]:
        print(f"  {tag:20}: {count:4} mods")

    # Update frequency distribution
    print("\nMod update activity:")
    for row in conn.execute("""
        SELECT
            CASE
                WHEN updated > ? THEN 'Active (< 30 days)'
                WHEN updated > ? THEN 'Recent (30-90 days)'
                WHEN updated > ? THEN 'Aging (90-365 days)'
                ELSE 'Abandoned (> 1 year)'
            END as category,
            COUNT(*) as count
        FROM mods WHERE app_id = ?
        GROUP BY category
        ORDER BY MIN(updated) DESC
    """, (
        int(time.time()) - 30 * 86400,
        int(time.time()) - 90 * 86400,
        int(time.time()) - 365 * 86400,
        app_id,
    )):
        print(f"  {row[0]:25}: {row[1]:4} mods")


def find_underserved_niches(conn: sqlite3.Connection,
                             app_id: int) -> None:
    """Find tag categories with high demand but few quality mods."""
    print("\n=== Potential Underserved Niches ===\n")

    # Tags where there are many mods but most have low subscriptions
    # This indicates demand but lack of quality options
    print("Tags with high mod count but low avg subscriptions:")
    print("(suggests market opportunity for quality content)")
    print()

    # This requires post-processing the tags JSON array
    all_mods = conn.execute("""
        SELECT tags, subscriptions, votes_up, votes_down
        FROM mods WHERE app_id = ?
    """, (app_id,)).fetchall()

    tag_stats = {}
    for row in all_mods:
        try:
            tags = json.loads(row[0] or "[]")
            for tag in tags:
                if tag not in tag_stats:
                    tag_stats[tag] = {"subs": [], "quality": []}
                tag_stats[tag]["subs"].append(row[1] or 0)
                total_votes = (row[2] or 0) + (row[3] or 0)
                if total_votes > 0:
                    ratio = (row[2] or 0) / total_votes
                    tag_stats[tag]["quality"].append(ratio)
        except json.JSONDecodeError:
            pass

    niches = []
    for tag, stats in tag_stats.items():
        if len(stats["subs"]) < 5:
            continue
        avg_subs = sum(stats["subs"]) / len(stats["subs"])
        avg_quality = (
            sum(stats["quality"]) / len(stats["quality"])
            if stats["quality"] else 0.5
        )
        niches.append({
            "tag": tag,
            "mod_count": len(stats["subs"]),
            "avg_subs": avg_subs,
            "avg_quality": avg_quality,
        })

    # Sort by combination of high mod count but low quality/subs
    underserved = sorted(
        niches,
        key=lambda x: x["mod_count"] / max(x["avg_subs"], 1),
        reverse=True,
    )[:10]

    for n in underserved:
        print(f"  {n['tag']:25}: {n['mod_count']:3} mods, "
              f"{n['avg_subs']:,.0f} avg subs, "
              f"{n['avg_quality']:.0%} positive votes")

Tracking Subscription Velocity

Subscription velocity (growth rate) is often more valuable than raw subscriber count for identifying breakout mods early:

def compute_velocity(conn: sqlite3.Connection,
                      file_id: str,
                      days: int = 7) -> float | None:
    """Compute subscription growth over the last N days."""
    rows = conn.execute("""
        SELECT subscriptions, recorded_at
        FROM subscription_history
        WHERE file_id = ?
        ORDER BY recorded_at DESC
        LIMIT 2
    """, (file_id,)).fetchall()

    if len(rows) < 2:
        return None

    recent_subs = rows[0][0]
    older_subs = rows[1][0]
    return recent_subs - older_subs


def find_breakout_mods(conn: sqlite3.Connection,
                        app_id: int,
                        min_velocity: int = 100) -> list[dict]:
    """Find mods with rapid subscription growth."""
    file_ids = [
        row[0] for row in conn.execute(
            "SELECT file_id FROM mods WHERE app_id = ?", (app_id,)
        )
    ]

    breakouts = []
    for fid in file_ids:
        velocity = compute_velocity(conn, fid)
        if velocity and velocity >= min_velocity:
            mod = conn.execute(
                "SELECT title, subscriptions FROM mods WHERE file_id = ?",
                (fid,)
            ).fetchone()
            if mod:
                breakouts.append({
                    "file_id": fid,
                    "title": mod[0],
                    "current_subs": mod[1],
                    "velocity": velocity,
                })

    return sorted(breakouts, key=lambda x: -x["velocity"])

Monitoring this daily for your target games gives you early signals about which mods are gaining momentum before they hit the top 10 lists.