← Back to blog

Scraping Patreon Creator Data with Python (2026)

Scraping Patreon Creator Data with Python (2026)

Patreon holds a substantial slice of creator economy data that is genuinely hard to get elsewhere: real patron counts, tier pricing, post cadence, and creator revenue bands. This data is useful for competitive benchmarking, niche audience sizing, and identifying underserved creator segments. Knowing that a creator in a specific niche has 4,000 patrons at a $10 average tier tells you more about audience willingness to pay than any survey.

The platform offers a legitimate API v2, but it is scoped around authenticated creators managing their own campaigns — not broad discovery. For research across arbitrary creators, you need a mix of the API where it applies and targeted page scraping where it does not. This guide covers both approaches with complete Python code, anti-detection strategies, proxy integration, and SQLite storage for building longitudinal datasets.

What Data Is Available

Patreon exposes different data depending on whether you're hitting the API or scraping public pages.

Via the Patreon API v2 (authenticated, your own campaign):

Via public profile pages (unauthenticated scraping):

Via public RSS feeds (unauthenticated):

The API gives you depth on your own data. Scraping gives you breadth across any creator. RSS gives you cadence data without fighting Cloudflare. Most research use cases need all three.

Understanding Patreon's Revenue Model

Before diving into code, understanding what the data actually means helps you build more useful analyses.

Patron counts are public on most creators' pages, but about 20-30% of creators hide their patron count (choosing to show only tier counts or nothing). When a creator hides their count, that itself is a data point — typically either because they're embarrassed by low numbers or because they've been advised to do so by Patreon for brand reasons.

Tier pricing ranges from $1 "supporter" tiers to $1000+ "executive producer" tiers. The distribution matters: a creator with 10,000 patrons all at $1/month earns less than one with 500 patrons at $25/month. The tier structure reveals the creator's pricing strategy.

Revenue estimates can be calculated as: sum(tier_price * tier_patron_count). This is a lower bound because some patrons pledge custom amounts above the tier minimum, but it's accurate to within ~10% for most creators.

Post frequency from RSS tells you about the creator's work cadence — are they a daily emailer, weekly YouTube-style, or monthly long-form? Combined with patron count, it gives you a revenue-per-post efficiency metric.

Anti-Bot Measures

Patreon runs Cloudflare on most of its surface area. The specific challenges vary:

ThorData Proxy Integration

For any volume scraping of public pages, residential proxies are necessary. Patreon's Cloudflare configuration blocks datacenter IPs consistently. ThorData's residential proxy network provides IPs from real ISP ranges that pass Cloudflare's checks. Their geo-targeting is useful if you want to pull creator pages that serve region-specific content:

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "gate.thordata.net"
THORDATA_PORT = 9000

def make_proxy(country: str = "us", session_id: str = None) -> str:
    """
    Build a ThorData residential proxy URL.

    Use session_id for sticky sessions — important for Patreon because
    the Cloudflare challenge sets cookies that subsequent requests must carry.
    """
    user = f"{THORDATA_USER}-country-{country}"
    if session_id:
        user += f"-session-{session_id}"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"

For the official API v2, no proxies are needed — just manage your token and respect backoff on 429s.

Patreon API v2: OAuth Setup

The v2 API requires a creator account and an OAuth client. Register at patreon.com/portal/registration/register-clients. Use the client credentials flow to get a token tied to your own creator account.

import httpx
import os
import time
from typing import Optional

CLIENT_ID = os.environ.get("PATREON_CLIENT_ID", "")
CLIENT_SECRET = os.environ.get("PATREON_CLIENT_SECRET", "")

def get_creator_token(client_id: str, client_secret: str) -> str:
    """Exchange client credentials for an access token."""
    resp = httpx.post(
        "https://www.patreon.com/api/oauth2/token",
        data={
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
        },
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        timeout=15,
    )
    resp.raise_for_status()
    token_data = resp.json()
    return token_data["access_token"]


def api_get(
    endpoint: str,
    params: dict = None,
    token: str = None,
    max_retries: int = 3,
) -> dict:
    """Make a Patreon API v2 request with retry logic."""
    url = f"https://www.patreon.com{endpoint}"
    headers = {"Authorization": f"Bearer {token}"}

    for attempt in range(max_retries):
        try:
            resp = httpx.get(url, params=params, headers=headers, timeout=20)

            if resp.status_code == 200:
                return resp.json()
            elif resp.status_code == 429:
                retry_after = int(resp.headers.get("Retry-After", 30))
                print(f"Rate limited. Waiting {retry_after}s (attempt {attempt + 1})")
                time.sleep(retry_after)
            elif resp.status_code == 401:
                raise Exception("Invalid or expired token")
            else:
                print(f"Error {resp.status_code} on {endpoint}: {resp.text[:200]}")
                return {}
        except httpx.TimeoutException:
            print(f"Timeout on {endpoint}, attempt {attempt + 1}")
            time.sleep(2 ** attempt * 5)

    return {}


token = get_creator_token(CLIENT_ID, CLIENT_SECRET)

With this token you can query your own campaign data. For querying other creators' public data via the API, you need their campaign ID, which requires either scraping their page to find the embedded ID or using the undocumented search endpoint.

Fetching Your Own Campaign and Tier Data

def get_my_campaign(token: str) -> dict:
    """
    Retrieve the authenticated creator's campaign with tier and patron data.

    Returns both the campaign attributes and a dict of tiers keyed by tier ID.
    """
    data = api_get(
        "/api/oauth2/v2/campaigns",
        params={
            "include": "tiers,creator,goals",
            "fields[campaign]": (
                "summary,creation_name,patron_count,published_at,url,"
                "monthly_payment_amount,pledge_url,is_monthly,is_charged_immediately,"
                "created_at,main_video_url,image_url"
            ),
            "fields[tier]": (
                "title,description,amount_cents,patron_count,published,"
                "benefits,discord_role_ids,edited_at,created_at,"
                "image_url,requires_shipping,user_limit"
            ),
            "fields[goal]": "amount_cents,completion_percent,created_at,description,reached_at,title",
        },
        token=token,
    )

    if not data or not data.get("data"):
        return {}

    campaign = data["data"][0]
    included = data.get("included", [])

    tiers = {}
    goals = {}

    for item in included:
        if item["type"] == "tier":
            tier_attrs = item["attributes"]
            tiers[item["id"]] = {
                "title": tier_attrs.get("title"),
                "description": tier_attrs.get("description", "")[:200],
                "price_usd": tier_attrs.get("amount_cents", 0) / 100,
                "patron_count": tier_attrs.get("patron_count", 0),
                "published": tier_attrs.get("published", False),
                "user_limit": tier_attrs.get("user_limit"),
            }
        elif item["type"] == "goal":
            goal_attrs = item["attributes"]
            goals[item["id"]] = {
                "title": goal_attrs.get("title"),
                "amount_usd": goal_attrs.get("amount_cents", 0) / 100,
                "completion_pct": goal_attrs.get("completion_percent", 0),
                "reached_at": goal_attrs.get("reached_at"),
            }

    return {
        "campaign": campaign["attributes"],
        "campaign_id": campaign["id"],
        "tiers": tiers,
        "goals": goals,
    }


result = get_my_campaign(token)
campaign = result.get("campaign", {})
print(f"Patron count: {campaign.get('patron_count')}")
print(f"Monthly revenue estimate: ${campaign.get('monthly_payment_amount', 0) / 100:.2f}")
print()
for tier_id, tier in result.get("tiers", {}).items():
    if tier["published"]:
        revenue = tier["price_usd"] * tier["patron_count"]
        print(f"  ${tier['price_usd']:.2f}/mo — {tier['title']} ({tier['patron_count']} patrons, ~${revenue:.0f}/mo)")

Fetching Member Data

The /members endpoint paginates with a cursor. Each member record includes their pledge amount, active tier, and lifetime value.

def get_all_members(token: str, campaign_id: str) -> list[dict]:
    """
    Paginate through all members of a campaign.

    Returns a list of member dicts with pledge amount, tier, and lifetime value.
    """
    url = f"/api/oauth2/v2/campaigns/{campaign_id}/members"
    params = {
        "include": "currently_entitled_tiers,address",
        "fields[member]": (
            "full_name,patron_status,pledge_cadence,"
            "currently_entitled_amount_cents,lifetime_support_cents,"
            "last_charge_date,last_charge_status,pledge_relationship_start"
        ),
        "fields[tier]": "title,amount_cents",
        "page[count]": 100,
    }
    members = []
    cursor = None

    while True:
        if cursor:
            params["page[cursor]"] = cursor

        data = api_get(url, params=params.copy(), token=token)

        if not data:
            break

        for member_data in data.get("data", []):
            attrs = member_data.get("attributes", {})
            members.append({
                "id": member_data.get("id"),
                "patron_status": attrs.get("patron_status"),
                "pledge_cadence": attrs.get("pledge_cadence"),
                "amount_cents": attrs.get("currently_entitled_amount_cents", 0),
                "amount_usd": attrs.get("currently_entitled_amount_cents", 0) / 100,
                "lifetime_support_usd": attrs.get("lifetime_support_cents", 0) / 100,
                "last_charge_date": attrs.get("last_charge_date"),
                "last_charge_status": attrs.get("last_charge_status"),
                "pledge_start": attrs.get("pledge_relationship_start"),
            })

        pagination = data.get("meta", {}).get("pagination", {})
        next_cursor = pagination.get("cursors", {}).get("next")
        if not next_cursor:
            break

        cursor = next_cursor
        time.sleep(0.5)

    return members


def analyze_members(members: list[dict]) -> dict:
    """Derive useful metrics from the member list."""
    active = [m for m in members if m.get("patron_status") == "active_patron"]
    amounts = [m["amount_usd"] for m in active if m["amount_usd"] > 0]

    if not amounts:
        return {}

    amounts.sort()
    n = len(amounts)

    return {
        "total_members": len(members),
        "active_patrons": n,
        "avg_pledge": round(sum(amounts) / n, 2),
        "median_pledge": amounts[n // 2],
        "total_mrr": round(sum(amounts), 2),
        "p25_pledge": amounts[int(n * 0.25)],
        "p75_pledge": amounts[int(n * 0.75)],
        "patrons_under_5": sum(1 for a in amounts if a < 5),
        "patrons_5_to_25": sum(1 for a in amounts if 5 <= a < 25),
        "patrons_25_plus": sum(1 for a in amounts if a >= 25),
    }

Fetching Post History

def get_campaign_posts(
    token: str,
    campaign_id: str,
    max_posts: int = 100,
) -> list[dict]:
    """
    Fetch published posts from a campaign.

    Returns post metadata including title, publish date, tier access level,
    and comment/like counts.
    """
    url = f"/api/oauth2/v2/campaigns/{campaign_id}/posts"
    params = {
        "fields[post]": (
            "title,published_at,post_type,teaser_text,is_public,"
            "comment_count,like_count,url"
        ),
        "page[count]": min(max_posts, 500),
    }

    data = api_get(url, params=params, token=token)
    posts = []

    for post in data.get("data", []):
        attrs = post.get("attributes", {})
        posts.append({
            "id": post.get("id"),
            "title": attrs.get("title"),
            "published_at": attrs.get("published_at"),
            "post_type": attrs.get("post_type"),
            "is_public": attrs.get("is_public", False),
            "comment_count": attrs.get("comment_count", 0),
            "like_count": attrs.get("like_count", 0),
            "url": attrs.get("url"),
        })

    return posts

Scraping Public Creator Pages

For creators you do not control, you need to scrape their public profile page. Patreon embeds a JSON blob in a <script id="__NEXT_DATA__"> tag that contains the full campaign data including tier prices and visible patron count.

import httpx
from bs4 import BeautifulSoup
import json
from typing import Optional

def scrape_creator_page(
    creator_slug: str,
    proxy: Optional[str] = None,
) -> dict:
    """
    Scrape a public Patreon creator page and extract embedded JSON data.

    creator_slug: The URL slug (e.g., "kurzgesagt" from patreon.com/kurzgesagt)

    Returns the campaign dict from Next.js page data, or {} if blocked.
    """
    url = f"https://www.patreon.com/{creator_slug}"
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "none",
        "Upgrade-Insecure-Requests": "1",
    }

    client_kwargs = {
        "timeout": 25,
        "headers": headers,
        "follow_redirects": True,
    }
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    try:
        with httpx.Client(**client_kwargs) as client:
            resp = client.get(url)
    except Exception as e:
        print(f"Request error for {creator_slug}: {e}")
        return {}

    if resp.status_code == 403:
        print(f"Cloudflare block for {creator_slug} — try residential proxy")
        return {}
    if resp.status_code == 404:
        print(f"Creator not found: {creator_slug}")
        return {}
    if resp.status_code != 200:
        print(f"Status {resp.status_code} for {creator_slug}")
        return {}

    soup = BeautifulSoup(resp.text, "html.parser")
    script_tag = soup.find("script", {"id": "__NEXT_DATA__"})

    if not script_tag:
        print(f"No __NEXT_DATA__ for {creator_slug} — likely Cloudflare interstitial")
        return {}

    try:
        data = json.loads(script_tag.string)
    except (json.JSONDecodeError, TypeError) as e:
        print(f"JSON parse error for {creator_slug}: {e}")
        return {}

    # Navigate the Next.js page props structure
    campaign = (
        data.get("props", {})
        .get("pageProps", {})
        .get("bootstrapEnvelope", {})
        .get("pageBootstrap", {})
        .get("campaign", {})
    )

    # Alternative path (Patreon has restructured this a few times)
    if not campaign:
        campaign = (
            data.get("props", {})
            .get("pageProps", {})
            .get("campaign", {})
        )

    return campaign


def extract_creator_summary(creator_slug: str, proxy: Optional[str] = None) -> dict:
    """
    Extract a structured summary from a creator's public Patreon page.

    Returns patron count, tier structure, category, and URL.
    """
    campaign = scrape_creator_page(creator_slug, proxy=proxy)
    if not campaign:
        return {"slug": creator_slug, "error": "scrape_failed"}

    # Extract tiers
    raw_tiers = campaign.get("tiers", []) or campaign.get("included_tiers", [])
    published_tiers = [
        t for t in raw_tiers
        if isinstance(t, dict) and t.get("published", True)
    ]

    tiers = []
    for t in published_tiers:
        # Handle nested attributes format vs flat format
        attrs = t.get("attributes", t)
        price_cents = attrs.get("amount_cents", 0) or attrs.get("price_cents", 0)
        tiers.append({
            "title": attrs.get("title"),
            "price_usd": price_cents / 100 if price_cents else 0,
            "patron_count": attrs.get("patron_count"),
            "description": (attrs.get("description") or "")[:200],
            "user_limit": attrs.get("user_limit"),
        })

    tiers.sort(key=lambda t: t["price_usd"])

    # Extract campaign attributes
    attrs = campaign.get("attributes", campaign)

    return {
        "slug": creator_slug,
        "name": attrs.get("name") or attrs.get("creation_name"),
        "patron_count": attrs.get("patron_count"),
        "creation_name": attrs.get("creation_name"),
        "category": attrs.get("main_video_embed") or attrs.get("creation_name"),
        "url": attrs.get("url") or f"https://www.patreon.com/{creator_slug}",
        "is_monthly": attrs.get("is_monthly", True),
        "tiers": tiers,
        "tier_count": len(tiers),
        "estimated_monthly_revenue": sum(
            (t["price_usd"] * t["patron_count"])
            for t in tiers
            if t["patron_count"] is not None
        ),
    }


# Example with proxy rotation
import random
import string

def scrape_creator_safe(creator_slug: str) -> dict:
    """Scrape a creator page with sticky residential proxy session."""
    session_id = "".join(random.choices(string.ascii_lowercase, k=8))
    proxy = make_proxy(country="us", session_id=session_id)
    return extract_creator_summary(creator_slug, proxy=proxy)

Scraping Multiple Creators in Batch

def scrape_creator_batch(
    slugs: list[str],
    delay_range: tuple = (3, 8),
    country: str = "us",
) -> list[dict]:
    """
    Scrape multiple creator pages with randomized delays and proxy rotation.

    Each creator gets a fresh sticky proxy session.
    """
    results = []

    for slug in slugs:
        print(f"Scraping: {slug}")

        session_id = "".join(random.choices(string.ascii_lowercase, k=8))
        proxy = make_proxy(country=country, session_id=session_id)

        try:
            summary = extract_creator_summary(slug, proxy=proxy)
            if summary and "error" not in summary:
                results.append(summary)
                patrons = summary.get("patron_count") or "hidden"
                revenue = summary.get("estimated_monthly_revenue", 0)
                print(f"  {summary.get('name')}: {patrons} patrons, ~${revenue:.0f}/mo")
            else:
                print(f"  Failed: {summary.get('error', 'unknown')}")
        except Exception as e:
            print(f"  Error: {e}")

        delay = random.uniform(*delay_range)
        time.sleep(delay)

    return results

Post Frequency from RSS

Patreon provides a public RSS feed for each creator at https://www.patreon.com/rss/{creator_slug}. Without an auth key, you still get public post titles and dates.

import httpx
from xml.etree import ElementTree as ET
from datetime import datetime
from typing import Optional

def get_post_frequency(
    creator_slug: str,
    proxy: Optional[str] = None,
) -> dict:
    """
    Fetch RSS feed and calculate post cadence metrics for a creator.

    Returns post count, average days between posts, and posts per month.
    These are valuable for understanding creator output volume.
    """
    url = f"https://www.patreon.com/rss/{creator_slug}"
    client_kwargs = {
        "timeout": 15,
        "headers": {"User-Agent": "Mozilla/5.0 (compatible; FeedReader/1.0)"},
        "follow_redirects": True,
    }
    if proxy:
        client_kwargs["proxies"] = {"all://": proxy}

    try:
        with httpx.Client(**client_kwargs) as client:
            resp = client.get(url)
    except Exception as e:
        return {"error": str(e)}

    if resp.status_code == 404:
        return {"error": "RSS not available for this creator"}
    if resp.status_code != 200:
        return {"error": f"HTTP {resp.status_code}"}

    try:
        root = ET.fromstring(resp.content)
    except ET.ParseError as e:
        return {"error": f"RSS parse error: {e}"}

    items = root.findall(".//item")
    pub_dates = []
    posts = []

    for item in items:
        pd = item.findtext("pubDate")
        title = item.findtext("title") or ""

        if pd:
            # Try multiple date formats
            for fmt in [
                "%a, %d %b %Y %H:%M:%S %z",
                "%a, %d %b %Y %H:%M:%S +0000",
                "%Y-%m-%dT%H:%M:%S%z",
            ]:
                try:
                    dt = datetime.strptime(pd.strip(), fmt)
                    pub_dates.append(dt)
                    posts.append({"title": title, "date": dt.isoformat()})
                    break
                except ValueError:
                    continue

    if not pub_dates:
        return {
            "post_count": 0,
            "avg_days_between_posts": None,
            "posts_per_month": None,
            "recent_posts": [],
        }

    pub_dates.sort(reverse=True)

    if len(pub_dates) >= 2:
        gaps = [
            (pub_dates[i] - pub_dates[i + 1]).days
            for i in range(len(pub_dates) - 1)
        ]
        avg_gap = sum(gaps) / len(gaps)
        posts_per_month = round(30 / avg_gap, 1) if avg_gap > 0 else None
    else:
        avg_gap = None
        posts_per_month = None

    return {
        "post_count": len(pub_dates),
        "most_recent_post": pub_dates[0].isoformat() if pub_dates else None,
        "oldest_fetched_post": pub_dates[-1].isoformat() if pub_dates else None,
        "avg_days_between_posts": round(avg_gap, 1) if avg_gap else None,
        "posts_per_month": posts_per_month,
        "recent_posts": posts[:5],
    }


# Combine page scrape + RSS in one call
def full_creator_profile(creator_slug: str, proxy: Optional[str] = None) -> dict:
    """Build a complete profile combining page data and RSS feed analysis."""
    summary = extract_creator_summary(creator_slug, proxy=proxy)
    time.sleep(2)
    frequency = get_post_frequency(creator_slug, proxy=proxy)

    return {
        **summary,
        "post_frequency": frequency,
        "revenue_per_post": (
            round(summary.get("estimated_monthly_revenue", 0) /
                  frequency.get("posts_per_month", 1), 2)
            if frequency.get("posts_per_month", 0) > 0 else None
        ),
    }

SQLite Storage and Schema

import sqlite3

def init_db(db_path: str = "patreon_creators.db") -> sqlite3.Connection:
    """Initialize the creator tracking database."""
    conn = sqlite3.connect(db_path)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS creators (
            slug TEXT PRIMARY KEY,
            name TEXT,
            creation_name TEXT,
            url TEXT,
            is_monthly INTEGER DEFAULT 1,
            first_seen TEXT,
            last_updated TEXT
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS patron_snapshots (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            slug TEXT NOT NULL,
            patron_count INTEGER,
            estimated_mrr REAL,
            tier_count INTEGER,
            captured_at TEXT NOT NULL
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS tiers (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            slug TEXT NOT NULL,
            title TEXT,
            price_usd REAL,
            patron_count INTEGER,
            user_limit INTEGER,
            captured_at TEXT NOT NULL
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS post_stats (
            slug TEXT PRIMARY KEY,
            post_count INTEGER,
            avg_days_between_posts REAL,
            posts_per_month REAL,
            most_recent_post TEXT,
            captured_at TEXT NOT NULL
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_slug ON patron_snapshots(slug)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_time ON patron_snapshots(captured_at)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_tiers_slug ON tiers(slug)")

    conn.commit()
    return conn


def save_creator(conn: sqlite3.Connection, profile: dict):
    """Save creator data and snapshot patron count."""
    now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    slug = profile.get("slug", "")

    # Upsert creator record
    conn.execute("""
        INSERT INTO creators (slug, name, creation_name, url, is_monthly, first_seen, last_updated)
        VALUES (?,?,?,?,?,?,?)
        ON CONFLICT(slug) DO UPDATE SET
            name=excluded.name,
            creation_name=excluded.creation_name,
            last_updated=excluded.last_updated
    """, (
        slug,
        profile.get("name"),
        profile.get("creation_name"),
        profile.get("url"),
        1 if profile.get("is_monthly", True) else 0,
        now,
        now,
    ))

    # Patron count snapshot
    conn.execute("""
        INSERT INTO patron_snapshots (slug, patron_count, estimated_mrr, tier_count, captured_at)
        VALUES (?,?,?,?,?)
    """, (
        slug,
        profile.get("patron_count"),
        profile.get("estimated_monthly_revenue"),
        profile.get("tier_count", 0),
        now,
    ))

    # Save current tiers
    for tier in profile.get("tiers", []):
        conn.execute("""
            INSERT INTO tiers (slug, title, price_usd, patron_count, user_limit, captured_at)
            VALUES (?,?,?,?,?,?)
        """, (slug, tier.get("title"), tier.get("price_usd"), tier.get("patron_count"), tier.get("user_limit"), now))

    # Save RSS stats
    freq = profile.get("post_frequency", {})
    if freq and "error" not in freq:
        conn.execute("""
            INSERT OR REPLACE INTO post_stats
            (slug, post_count, avg_days_between_posts, posts_per_month, most_recent_post, captured_at)
            VALUES (?,?,?,?,?,?)
        """, (
            slug,
            freq.get("post_count"),
            freq.get("avg_days_between_posts"),
            freq.get("posts_per_month"),
            freq.get("most_recent_post"),
            now,
        ))

    conn.commit()

Analytics and Insights

def patron_growth_trend(conn: sqlite3.Connection, slug: str, days: int = 30) -> list[dict]:
    """Track patron count changes over time for a creator."""
    rows = conn.execute("""
        SELECT patron_count, estimated_mrr, captured_at
        FROM patron_snapshots
        WHERE slug = ? AND patron_count IS NOT NULL
        ORDER BY captured_at ASC
    """, (slug,)).fetchall()

    if len(rows) < 2:
        return []

    trend = []
    for i in range(1, len(rows)):
        prev, curr = rows[i-1], rows[i]
        patron_delta = (curr[0] or 0) - (prev[0] or 0)
        mrr_delta = (curr[1] or 0) - (prev[1] or 0)
        trend.append({
            "date": curr[2],
            "patrons": curr[0],
            "patron_gain": patron_delta,
            "mrr": curr[1],
            "mrr_gain": round(mrr_delta, 2),
        })

    return trend


def find_high_efficiency_creators(conn: sqlite3.Connection) -> list[dict]:
    """
    Find creators with high revenue-per-post — efficient monetization.
    High RPP = fewer posts, high patron engagement.
    """
    rows = conn.execute("""
        SELECT c.slug, c.name,
               s.estimated_mrr, s.patron_count,
               p.posts_per_month,
               CASE WHEN p.posts_per_month > 0
                    THEN s.estimated_mrr / p.posts_per_month
                    ELSE NULL END as revenue_per_post
        FROM creators c
        JOIN (
            SELECT slug, estimated_mrr, patron_count
            FROM patron_snapshots ps1
            WHERE captured_at = (SELECT MAX(captured_at) FROM patron_snapshots WHERE slug = ps1.slug)
        ) s ON s.slug = c.slug
        JOIN post_stats p ON p.slug = c.slug
        WHERE s.estimated_mrr > 100
          AND p.posts_per_month > 0
        ORDER BY revenue_per_post DESC
        LIMIT 20
    """).fetchall()

    return [
        {
            "slug": r[0],
            "name": r[1],
            "estimated_mrr": round(r[2], 2),
            "patron_count": r[3],
            "posts_per_month": r[4],
            "revenue_per_post": round(r[5], 2) if r[5] else None,
        }
        for r in rows
    ]


def niche_comparison(conn: sqlite3.Connection) -> list[dict]:
    """Compare patron economics by creation type/niche."""
    rows = conn.execute("""
        SELECT c.creation_name,
               COUNT(*) as creator_count,
               AVG(s.patron_count) as avg_patrons,
               AVG(s.estimated_mrr) as avg_mrr,
               AVG(s.estimated_mrr / NULLIF(s.patron_count, 0)) as avg_revenue_per_patron
        FROM creators c
        JOIN (
            SELECT slug, estimated_mrr, patron_count
            FROM patron_snapshots ps1
            WHERE captured_at = (SELECT MAX(captured_at) FROM patron_snapshots WHERE slug = ps1.slug)
        ) s ON s.slug = c.slug
        WHERE c.creation_name IS NOT NULL
          AND s.patron_count > 0
        GROUP BY c.creation_name
        HAVING creator_count >= 3
        ORDER BY avg_revenue_per_patron DESC
    """).fetchall()

    return [
        {
            "niche": r[0],
            "creator_count": r[1],
            "avg_patrons": round(r[2] or 0, 1),
            "avg_mrr": round(r[3] or 0, 2),
            "avg_revenue_per_patron": round(r[4] or 0, 2),
        }
        for r in rows
    ]

Revenue Estimation From Tier Data

def estimate_revenue(profile: dict) -> dict:
    """
    Estimate creator revenue bounds from tier data.

    Returns lower bound (minimum pledges) and upper estimate
    (accounting for custom pledge amounts above tier minimums).
    """
    tiers = profile.get("tiers", [])
    patron_count = profile.get("patron_count") or 0

    # Only tiers where we have patron counts
    tiers_with_data = [
        t for t in tiers
        if t.get("patron_count") is not None and t.get("price_usd", 0) > 0
    ]

    # Lower bound: sum of tier_price * tier_patron_count
    lower_bound = sum(
        t["price_usd"] * t["patron_count"]
        for t in tiers_with_data
    )

    # Patrons we haven't accounted for (no tier data or free tier)
    accounted_patrons = sum(t.get("patron_count", 0) for t in tiers_with_data)
    unaccounted = patron_count - accounted_patrons

    # Average tier price for unaccounted patrons
    if tiers_with_data:
        avg_tier = sum(t["price_usd"] for t in tiers_with_data) / len(tiers_with_data)
    else:
        avg_tier = 5.0  # reasonable default

    estimated_total = lower_bound + (max(unaccounted, 0) * avg_tier)

    return {
        "lower_bound_mrr": round(lower_bound, 2),
        "estimated_mrr": round(estimated_total, 2),
        "accounted_patron_count": accounted_patrons,
        "total_patron_count": patron_count,
    }

Full Pipeline Example

if __name__ == "__main__":
    conn = init_db()

    # Example creator slugs to monitor
    # Build this list from Patreon's discovery pages or your own research
    CREATORS_TO_MONITOR = [
        "kurzgesagt",
        "cgpgrey",
        "computerphile",
        "3blue1brown",
    ]

    print("=== Scraping creator profiles ===")
    for slug in CREATORS_TO_MONITOR:
        print(f"\n{slug}:")
        profile = full_creator_profile(slug, proxy=make_proxy(country="us"))

        if "error" not in profile:
            save_creator(conn, profile)
            patrons = profile.get("patron_count") or "hidden"
            mrr = profile.get("estimated_monthly_revenue", 0)
            rpp = profile.get("revenue_per_post")
            print(f"  Patrons: {patrons} | ~${mrr:.0f}/mo | ${rpp:.2f}/post" if rpp else f"  Patrons: {patrons} | ~${mrr:.0f}/mo")
        else:
            print(f"  Failed: {profile.get('error')}")

        time.sleep(random.uniform(4, 8))

    print("\n=== High-efficiency creators ===")
    efficient = find_high_efficiency_creators(conn)
    for c in efficient[:5]:
        print(f"  {c['name']}: ${c['revenue_per_post']:.0f}/post ({c['patron_count']} patrons)")

    conn.close()

Patreon's ToS restricts automated scraping. The data on public creator pages is publicly accessible — any visitor can see patron counts and tier prices. The API terms require that you only access data from your own campaigns via the official API.

Key guidelines: - Never attempt to access patron personal data through scraping (it's not exposed anyway) - Don't build a commercial Patreon creator database for resale - Respect the rate limits and don't hammer the platform - Use the RSS feed for post frequency data — it's a legitimate syndication mechanism - Store data for your own analysis; don't republish raw Patreon profile data

The estimated revenue floor and post frequency together give you a creator efficiency metric — revenue per post — that reveals operating models at a glance. Combined with patron growth rate from weekly snapshots, you get a live signal of which niches and content formats are attracting sustainable creator income. That's genuinely useful intelligence that doesn't exist in aggregated form anywhere else.