← Back to blog

How to Scrape Medium Articles in 2026: Content, Tags & User Profiles

How to Scrape Medium Articles in 2026: Content, Tags & User Profiles

Medium hosts millions of articles across technology, business, science, and culture. For content analysis, trend research, or building reading recommendation systems, Medium's data is valuable — full article text, clap counts, read times, user follower counts, and tag taxonomies.

Medium deprecated its public API in 2023, but the platform leaks structured data through several channels: clean URL formats that return parseable JSON, an unofficial GraphQL API, and direct HTML scraping for content not available through JSON endpoints. This guide covers all of them.

What Data Medium Exposes

Medium articles and profiles contain:

Medium's Anti-Scraping Defenses in 2026

Medium has moderate but layered protections:

  1. Paywall enforcement: Member-only articles return truncated content (about 40% of popular articles). The paywall check is server-side — you can't bypass it with JavaScript tricks.
  2. Rate limiting: Aggressive throttling after 50-80 requests per minute. Returns 429 responses with Retry-After headers.
  3. Bot fingerprinting: Medium tracks browser fingerprints on suspicious traffic. Inconsistent headers or high-frequency access patterns trigger CAPTCHA challenges.
  4. Dynamic rendering: Articles load via React hydration. The initial HTML contains the full text, but some metadata requires parsing the embedded Apollo state.
  5. Cloudflare protection: Standard Cloudflare JS challenges on flagged IPs — particularly targeting datacenter ranges that make many requests.
  6. gRPC API: Medium's mobile app uses a gRPC API that's harder to call from Python than their JSON API.

Setting Up Your Environment

pip install httpx beautifulsoup4 fake-useragent lxml sqlite3

For Cloudflare bypass (when needed):

pip install curl-cffi

Method 1: The ?format=json Trick (Most Reliable)

Medium articles and profile pages can return structured JSON by appending ?format=json to any URL. Medium prefixes the response with ])}while(1);</x> as an anti-XSSI measure — strip that prefix and you get clean, deeply nested JSON with all article data:

import httpx
import json
import time
import random
import re
from fake_useragent import UserAgent
from curl_cffi import requests as cffi_requests

ua = UserAgent()
JSON_HIJACK_PREFIX = "])}while(1);</x>"

# ThorData residential proxy — required for Cloudflare protection
# https://thordata.partnerstack.com/partner/0a0x4nzq (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
PROXY = "http://USERNAME:[email protected]:7777"

def fetch_medium_json(url: str, proxy: str = None) -> dict:
    """
    Fetch a Medium URL and return parsed JSON.
    Works for: article URLs, profile URLs, tag URLs, publication URLs.
    """
    # Append format=json
    separator = "&" if "?" in url else "?"
    json_url = f"{url}{separator}format=json"

    headers = {
        "User-Agent": ua.random,
        "Accept": "application/json",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate",
        "Referer": "https://medium.com/",
        "Sec-Fetch-Dest": "empty",
        "Sec-Fetch-Mode": "cors",
        "Sec-Fetch-Site": "same-origin",
    }

    for attempt in range(3):
        try:
            if proxy:
                session = cffi_requests.Session(impersonate="chrome124")
                session.proxies = {"http": proxy, "https": proxy}
                resp = session.get(json_url, headers=headers, timeout=20)
            else:
                with httpx.Client(headers=headers, follow_redirects=True, timeout=20) as client:
                    resp = client.get(json_url)

            if resp.status_code == 429:
                retry_after = int(resp.headers.get("Retry-After", 30))
                print(f"Rate limited. Waiting {retry_after}s...")
                time.sleep(retry_after)
                continue

            if resp.status_code != 200:
                return {"error": f"HTTP {resp.status_code}", "url": url}

            text = resp.text
            if text.startswith(JSON_HIJACK_PREFIX):
                text = text[len(JSON_HIJACK_PREFIX):]

            return json.loads(text)

        except json.JSONDecodeError as e:
            return {"error": f"JSON parse failed: {e}", "url": url}
        except Exception as e:
            if attempt == 2:
                return {"error": str(e), "url": url}
            time.sleep(5 * (attempt + 1))

    return {"error": "Max retries exceeded", "url": url}


def scrape_medium_article(article_url: str, proxy: str = None) -> dict:
    """
    Scrape a Medium article for full content and metadata.

    Works with:
      - medium.com/@author/article-slug
      - medium.com/publication/article-slug
      - medium.com/p/article-id
      - custom domain articles (e.g., towardsdatascience.com/article-slug)
    """
    data = fetch_medium_json(article_url, proxy)

    if "error" in data:
        return data

    payload = data.get("payload", {})
    post = payload.get("value", {})

    if not post:
        # Some URLs return the post in a different location
        posts = payload.get("post")
        if posts:
            post = posts
        else:
            return {"error": "No post data found in response", "url": article_url}

    # Extract article metadata
    article = {
        "url": article_url,
        "id": post.get("id"),
        "title": post.get("title"),
        "subtitle": post.get("content", {}).get("subtitle") if isinstance(post.get("content"), dict) else None,
        "slug": post.get("slug"),
        "clap_count": post.get("clapCount", 0),
        "voter_count": post.get("voterCount", 0),
        "read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
        "read_time_raw": post.get("readingTime"),
        "word_count": post.get("wordCount"),
        "published_at": post.get("firstPublishedAt"),
        "updated_at": post.get("latestPublishedAt"),
        "response_count": post.get("responsesCount", 0),
        "is_paywalled": post.get("isLockedPreviewOnly", False) or post.get("memberOnly", False),
        "is_boosted": post.get("isBoosted", False),
        "language": post.get("detectedLanguage"),
        "tags": [t.get("slug") for t in post.get("tags", []) if isinstance(t, dict)],
        "canonical_url": post.get("canonicalUrl"),
        "license": post.get("license"),
    }

    # Extract author info
    creator = post.get("creator", {})
    if isinstance(creator, dict):
        article["author"] = {
            "id": creator.get("userId"),
            "name": creator.get("name"),
            "username": creator.get("username"),
            "bio": creator.get("bio"),
            "follower_count": creator.get("socialStats", {}).get("followerCount") if isinstance(creator.get("socialStats"), dict) else None,
            "following_count": creator.get("socialStats", {}).get("followingCount") if isinstance(creator.get("socialStats"), dict) else None,
            "is_writer_program": creator.get("isWriterProgramEnrolled", False),
            "medium_member": creator.get("isMediumMember", False),
        }
    else:
        article["author"] = {}

    # Extract publication info
    collection = post.get("collection")
    if isinstance(collection, dict) and collection:
        article["publication"] = {
            "id": collection.get("id"),
            "name": collection.get("name"),
            "slug": collection.get("slug"),
            "follower_count": collection.get("followersCount", 0),
        }
    else:
        article["publication"] = None

    # Extract body text from content paragraphs
    content = post.get("content", {})
    if isinstance(content, dict):
        body_model = content.get("bodyModel", {})
        if isinstance(body_model, dict):
            paragraphs = body_model.get("paragraphs", [])
            body_parts = []
            image_urls = []

            for p in paragraphs:
                p_type = p.get("type")
                text = p.get("text", "")

                # Skip images (type 4), embedded media (type 11), and horizontal rules (type 6)
                if p_type in (4, 6, 11):
                    # But still collect image URLs
                    if p.get("metadata", {}).get("originalWidth"):
                        img_id = p.get("metadata", {}).get("id")
                        if img_id:
                            image_urls.append(f"https://miro.medium.com/v2/resize:fit:1400/{img_id}")
                    continue

                if text:
                    # Handle code blocks (type 8 = PRE/code block)
                    if p_type == 8:
                        body_parts.append(f"```\n{text}\n```")
                    elif p_type == 3:  # Header
                        body_parts.append(f"\n## {text}\n")
                    elif p_type == 13:  # Small header
                        body_parts.append(f"\n### {text}\n")
                    elif p_type == 9:  # Quote/pullquote
                        body_parts.append(f"\n> {text}\n")
                    else:
                        body_parts.append(text)

            article["body_text"] = "\n\n".join(body_parts)
            article["body_length"] = len(article["body_text"])
            article["image_urls"] = image_urls[:10]  # First 10 images

    return article


def scrape_medium_article_html(article_url: str, proxy: str = None) -> dict:
    """
    Fallback: scrape article from rendered HTML.
    Use when ?format=json returns empty body (some custom domain articles).
    """
    if proxy:
        session = cffi_requests.Session(impersonate="chrome124")
        session.proxies = {"http": proxy, "https": proxy}
        resp = session.get(article_url, timeout=20)
    else:
        with httpx.Client(follow_redirects=True, timeout=20) as client:
            resp = client.get(
                article_url,
                headers={"User-Agent": ua.random, "Accept-Language": "en-US,en;q=0.9"}
            )

    if resp.status_code != 200:
        return {"error": f"HTTP {resp.status_code}"}

    from bs4 import BeautifulSoup
    soup = BeautifulSoup(resp.text, "lxml")

    # Title
    title = None
    h1 = soup.find("h1")
    if h1:
        title = h1.get_text(strip=True)

    # Article body
    article_el = soup.find("article")
    if not article_el:
        article_el = soup.find("div", {"class": re.compile(r"article|content|post")})

    body_text = ""
    if article_el:
        # Remove nav, footer, ads
        for el in article_el.select("nav, footer, [class*='footer'], [class*='paywall'], button"):
            el.decompose()
        body_text = article_el.get_text(separator="\n", strip=True)

    return {
        "url": article_url,
        "title": title,
        "body_text": body_text,
        "source": "html_fallback",
    }

Method 2: Tag Feed Scraping

Medium organizes content by tags with up to 5 tags per article. Each tag has a "latest" and "trending" feed:

def scrape_tag_feed(
    tag: str,
    proxy: str = None,
    include_paywalled: bool = True,
) -> list[dict]:
    """
    Scrape Medium's tag feed for articles.
    tag examples: 'python', 'machine-learning', 'startup', 'design'
    """
    url = f"https://medium.com/tag/{tag}"
    data = fetch_medium_json(url, proxy)

    if "error" in data:
        return []

    payload = data.get("payload", {})
    articles = []

    # Articles come from multiple locations in the payload
    refs = payload.get("references", {})
    posts_dict = refs.get("Post", {})

    # Also check collection items (publication articles in the tag)
    items = (
        payload.get("panda", {}).get("items", []) +
        payload.get("streamItems", {}).get("items", []) if isinstance(payload.get("streamItems"), dict) else []
    )

    # Process referenced posts
    for post_id, post in posts_dict.items():
        if not isinstance(post, dict):
            continue

        creator = post.get("creator", {})
        is_paywalled = post.get("isLockedPreviewOnly", False) or post.get("memberOnly", False)

        if not include_paywalled and is_paywalled:
            continue

        article = {
            "id": post_id,
            "title": post.get("title"),
            "subtitle": post.get("content", {}).get("subtitle") if isinstance(post.get("content"), dict) else None,
            "clap_count": post.get("clapCount", 0),
            "read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
            "published_at": post.get("firstPublishedAt"),
            "author_name": creator.get("name") if isinstance(creator, dict) else None,
            "author_username": creator.get("username") if isinstance(creator, dict) else None,
            "is_paywalled": is_paywalled,
            "response_count": post.get("responsesCount", 0),
            "voter_count": post.get("voterCount", 0),
            "tags": [t.get("slug") for t in post.get("tags", []) if isinstance(t, dict)],
            "url": f"https://medium.com/p/{post_id}",
        }

        # Get canonical URL if available
        slug = post.get("slug")
        if slug and isinstance(creator, dict) and creator.get("username"):
            article["url"] = f"https://medium.com/@{creator['username']}/{slug}"

        articles.append(article)

    # Sort by clap count (highest first)
    return sorted(articles, key=lambda a: a.get("clap_count", 0), reverse=True)


def scrape_tag_top_articles(
    tag: str,
    period: str = "year",
    proxy: str = None,
) -> list[dict]:
    """
    Get top articles for a tag filtered by time period.
    period options: year, month, week, day
    """
    url = f"https://medium.com/tag/{tag}/top/{period}"
    data = fetch_medium_json(url, proxy)

    if "error" in data:
        return []

    payload = data.get("payload", {})
    refs = payload.get("references", {}).get("Post", {})
    panda_items = payload.get("panda", {})

    articles = []
    for post_id, post in refs.items():
        if not isinstance(post, dict):
            continue
        creator = post.get("creator", {})
        articles.append({
            "id": post_id,
            "title": post.get("title"),
            "clap_count": post.get("clapCount", 0),
            "read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
            "author": creator.get("name") if isinstance(creator, dict) else None,
            "is_paywalled": post.get("isLockedPreviewOnly", False),
            "url": f"https://medium.com/p/{post_id}",
        })

    return sorted(articles, key=lambda a: a.get("clap_count", 0), reverse=True)

Method 3: User Profile Scraping

Scrape a writer's full profile and article list:

def scrape_user_profile(username: str, proxy: str = None) -> dict:
    """
    Scrape a Medium user's profile including their articles and stats.
    username: the @username without the @ symbol
    """
    url = f"https://medium.com/@{username}"
    data = fetch_medium_json(url, proxy)

    if "error" in data:
        return data

    payload = data.get("payload", {})
    user = payload.get("user", {})

    if not user:
        # Try alternate structure
        user = payload.get("value", {})

    profile = {
        "username": username,
        "user_id": user.get("userId"),
        "name": user.get("name"),
        "bio": user.get("bio"),
        "image_url": None,
        "follower_count": user.get("socialStats", {}).get("followerCount") if isinstance(user.get("socialStats"), dict) else 0,
        "following_count": user.get("socialStats", {}).get("followingCount") if isinstance(user.get("socialStats"), dict) else 0,
        "is_writer_program": user.get("isWriterProgramEnrolled", False),
        "is_medium_member": user.get("isMediumMember", False),
        "is_suspended": user.get("isSuspended", False),
        "custom_domain": user.get("customDomain"),
        "articles": [],
    }

    # Extract profile image
    image_id = user.get("imageId")
    if image_id:
        profile["image_url"] = f"https://miro.medium.com/v2/resize:fill:96:96/{image_id}"

    # Extract articles from references
    refs = payload.get("references", {}).get("Post", {})
    for post_id, post in refs.items():
        if not isinstance(post, dict):
            continue

        profile["articles"].append({
            "id": post_id,
            "title": post.get("title"),
            "clap_count": post.get("clapCount", 0),
            "published_at": post.get("firstPublishedAt"),
            "read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
            "is_paywalled": post.get("isLockedPreviewOnly", False),
            "response_count": post.get("responsesCount", 0),
            "tags": [t.get("slug") for t in post.get("tags", []) if isinstance(t, dict)],
            "url": f"https://medium.com/p/{post_id}",
        })

    profile["articles"].sort(key=lambda a: a.get("clap_count", 0), reverse=True)
    profile["total_articles"] = len(profile["articles"])
    profile["total_claps"] = sum(a.get("clap_count", 0) for a in profile["articles"])

    return profile


def get_user_latest_articles(
    username: str,
    max_articles: int = 50,
    proxy: str = None,
) -> list[dict]:
    """
    Get a user's latest articles by paginating their profile feed.
    The ?format=json on the profile page only returns ~10 articles —
    use the stream API to get more.
    """
    all_articles = []
    page = 1

    while len(all_articles) < max_articles:
        # Medium's user stream endpoint
        url = f"https://medium.com/@{username}/latest?format=json&page={page}&limit=10"
        data = fetch_medium_json(f"https://medium.com/@{username}/latest", proxy)

        if "error" in data or not data:
            break

        payload = data.get("payload", {})
        refs = payload.get("references", {}).get("Post", {})

        if not refs:
            break

        for post_id, post in refs.items():
            if isinstance(post, dict):
                all_articles.append({
                    "id": post_id,
                    "title": post.get("title"),
                    "clap_count": post.get("clapCount", 0),
                    "published_at": post.get("firstPublishedAt"),
                    "read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
                    "is_paywalled": post.get("isLockedPreviewOnly", False),
                    "url": f"https://medium.com/p/{post_id}",
                })

        if not payload.get("paging", {}).get("nextPageToken"):
            break

        page += 1
        time.sleep(random.uniform(2, 5))

    return all_articles[:max_articles]

Method 4: Medium's GraphQL API

For recommendation data and personalized feeds, Medium's frontend uses a GraphQL API:

def fetch_surfacing_recommendations(
    tag: str,
    first: int = 25,
    after: str = "",
    proxy: str = None,
) -> list[dict]:
    """
    Fetch recommended articles from Medium's GraphQL API.
    This is the same endpoint the Medium web app uses for tag feeds.
    """
    url = "https://medium.com/_/graphql"

    # This query mirrors what Medium's frontend sends for tag pages
    query = {
        "operationName": "TopicFeedQuery",
        "variables": {
            "tagSlug": tag,
            "first": first,
            "after": after,
        },
        "query": """
        query TopicFeedQuery($tagSlug: String!, $first: Int, $after: String) {
            tagFromSlug(tagSlug: $tagSlug) {
                name
                postCount
                followerCount
                viewerEdge {
                    feedItems(first: $first, after: $after) {
                        pageInfo { hasNextPage endCursor }
                        edges {
                            node {
                                feedId
                                post {
                                    id
                                    title
                                    clapCount
                                    readingTime
                                    memberOnly
                                    firstPublishedAt
                                    creator {
                                        name
                                        username
                                        imageId
                                    }
                                    tags { name slug }
                                }
                            }
                        }
                    }
                }
            }
        }
        """
    }

    headers = {
        "User-Agent": ua.random,
        "Accept": "application/json",
        "Content-Type": "application/json",
        "Referer": f"https://medium.com/tag/{tag}",
        "Graphql-Operation": "TopicFeedQuery",
    }

    for attempt in range(3):
        try:
            if proxy:
                session = cffi_requests.Session(impersonate="chrome124")
                session.proxies = {"http": proxy, "https": proxy}
                resp = session.post(url, json=query, headers=headers, timeout=20)
            else:
                with httpx.Client(headers=headers, timeout=20) as client:
                    resp = client.post(url, json=query)

            if resp.status_code == 429:
                time.sleep(30 * (2 ** attempt))
                continue

            if resp.status_code != 200:
                return []

            data = resp.json()
            tag_data = data.get("data", {}).get("tagFromSlug", {})
            edges = (
                tag_data.get("viewerEdge", {})
                .get("feedItems", {})
                .get("edges", [])
            )

            articles = []
            for edge in edges:
                post = edge.get("node", {}).get("post", {})
                if not post:
                    continue

                creator = post.get("creator", {})
                articles.append({
                    "id": post.get("id"),
                    "title": post.get("title"),
                    "claps": post.get("clapCount"),
                    "read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
                    "is_paywalled": post.get("memberOnly", False),
                    "author": creator.get("name") if isinstance(creator, dict) else None,
                    "author_username": creator.get("username") if isinstance(creator, dict) else None,
                    "tags": [t.get("slug") for t in (post.get("tags") or [])],
                    "published_at": post.get("firstPublishedAt"),
                    "url": f"https://medium.com/p/{post.get('id')}",
                })

            return articles

        except Exception as e:
            if attempt == 2:
                print(f"GraphQL error: {e}")
                return []
            time.sleep(5 * (attempt + 1))

    return []

Building a Trend Research Pipeline

Here's a complete pipeline that scrapes trending articles across multiple tags and stores them for analysis:

import sqlite3
from datetime import datetime

def setup_medium_database(db_path: str) -> sqlite3.Connection:
    """Create SQLite schema for Medium article data."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")

    conn.execute("""
        CREATE TABLE IF NOT EXISTS articles (
            id TEXT PRIMARY KEY,
            title TEXT,
            subtitle TEXT,
            url TEXT,
            clap_count INTEGER DEFAULT 0,
            voter_count INTEGER DEFAULT 0,
            response_count INTEGER DEFAULT 0,
            read_time_minutes REAL,
            word_count INTEGER,
            published_at INTEGER,
            is_paywalled INTEGER DEFAULT 0,
            is_boosted INTEGER DEFAULT 0,
            language TEXT,
            author_name TEXT,
            author_username TEXT,
            author_follower_count INTEGER,
            publication_name TEXT,
            tags TEXT,
            body_text TEXT,
            body_length INTEGER,
            source_tag TEXT,
            scraped_at TEXT
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS tag_trends (
            tag TEXT,
            article_id TEXT,
            rank_position INTEGER,
            scraped_at TEXT,
            PRIMARY KEY (tag, article_id, scraped_at)
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_claps ON articles(clap_count DESC)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published_at)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_tag ON articles(source_tag)")
    conn.commit()

    return conn

def save_article(conn: sqlite3.Connection, article: dict, source_tag: str = None):
    """Save an article to the database."""
    now = datetime.now().isoformat()
    author = article.get("author", {})
    pub = article.get("publication")

    conn.execute("""
        INSERT OR REPLACE INTO articles
        (id, title, subtitle, url, clap_count, voter_count, response_count,
         read_time_minutes, word_count, published_at, is_paywalled, is_boosted,
         language, author_name, author_username, author_follower_count,
         publication_name, tags, body_text, body_length, source_tag, scraped_at)
        VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    """, (
        article.get("id"),
        article.get("title"),
        article.get("subtitle"),
        article.get("url"),
        article.get("clap_count", 0),
        article.get("voter_count", 0),
        article.get("response_count", 0),
        article.get("read_time"),
        article.get("word_count"),
        article.get("published_at"),
        int(article.get("is_paywalled", False)),
        int(article.get("is_boosted", False)),
        article.get("language"),
        author.get("name") if isinstance(author, dict) else article.get("author_name"),
        author.get("username") if isinstance(author, dict) else article.get("author_username"),
        author.get("follower_count") if isinstance(author, dict) else None,
        pub.get("name") if isinstance(pub, dict) else None,
        json.dumps(article.get("tags", [])),
        article.get("body_text"),
        article.get("body_length", 0),
        source_tag,
        now,
    ))

def scrape_tag_trends(
    tags: list[str],
    include_body: bool = False,
    proxy: str = None,
    db_path: str = "medium_trends.db",
) -> dict:
    """
    Scrape trending articles across multiple tags.
    If include_body=True, fetches full article text (slower, more requests).
    """
    conn = setup_medium_database(db_path)
    now = datetime.now().isoformat()
    stats = {}

    for tag in tags:
        print(f"\nScraping tag: #{tag}")
        articles = scrape_tag_feed(tag, proxy=proxy)

        if not articles:
            # Try GraphQL fallback
            articles = fetch_surfacing_recommendations(tag, proxy=proxy)

        if not articles:
            print(f"  No articles found for #{tag}")
            stats[tag] = 0
            continue

        print(f"  Found {len(articles)} articles")
        saved = 0

        for i, article in enumerate(articles[:50]):  # Cap at 50 per tag
            article_id = article.get("id")
            if not article_id:
                continue

            # Fetch full content if requested (and article isn't paywalled)
            if include_body and not article.get("is_paywalled"):
                article_url = article.get("url")
                if article_url:
                    time.sleep(random.uniform(3, 7))
                    full_article = scrape_medium_article(article_url, proxy=proxy)
                    if "body_text" in full_article:
                        article.update(full_article)

            save_article(conn, article, source_tag=tag)

            # Record ranking position for trend analysis
            conn.execute(
                "INSERT OR REPLACE INTO tag_trends VALUES (?,?,?,?)",
                (tag, article_id, i + 1, now)
            )

            saved += 1

        conn.commit()
        stats[tag] = saved
        print(f"  Saved {saved} articles")

        # Pause between tags
        time.sleep(random.uniform(5, 12))

    conn.close()
    return stats

# Research content trends across tech topics
tech_tags = [
    "python", "machine-learning", "artificial-intelligence",
    "javascript", "startup", "data-science", "programming",
    "software-engineering",
]

stats = scrape_tag_trends(
    tech_tags,
    include_body=False,  # Set True for full article text (much slower)
    proxy=PROXY,
    db_path="medium_tech_trends.db",
)

print(f"\nScraping complete: {sum(stats.values())} total articles")

Proxy Configuration and Anti-Bot Bypass

Medium's Cloudflare integration blocks most datacenter IPs. From an AWS or GCP IP, you'll see 403s or JS challenges within the first few requests.

ThorData's residential proxy pool works well for Medium:

import random

def get_medium_proxy(session_id: str = None) -> str:
    """
    Get a proxy URL for Medium scraping.
    Use sticky sessions for scraping multiple pages from the same user session.
    """
    base = "http://USERNAME:PASSWORD"
    host = "gate.thordata.com:7777"

    if session_id:
        # Sticky session: same exit IP for this session
        return f"{base}-session-{session_id}@{host}"

    # Rotating: new IP each request
    return f"{base}@{host}"

# Sticky session for a tag + article scraping run
session_id = str(random.randint(10000, 99999))
proxy = get_medium_proxy(session_id)

articles = scrape_tag_feed("artificial-intelligence", proxy=proxy)
print(f"Found {len(articles)} articles in #artificial-intelligence")

for a in articles[:5]:
    paywall = " [MEMBER]" if a.get("is_paywalled") else ""
    print(f"  {a.get('clap_count', 0):>6} claps | {a.get('title', '')[:60]}{paywall}")

time.sleep(random.uniform(3, 8))

Once you have data in SQLite, you can identify trending content patterns:

import sqlite3

def analyze_tag_trends(db_path: str, tag: str) -> None:
    """Analyze what content performs best for a given tag."""
    conn = sqlite3.connect(db_path)

    # Top articles by claps
    print(f"\nTop Articles in #{tag}:")
    cursor = conn.execute("""
        SELECT title, author_name, clap_count, read_time_minutes,
               is_paywalled, response_count
        FROM articles
        WHERE source_tag = ? AND clap_count > 0
        ORDER BY clap_count DESC
        LIMIT 10
    """, (tag,))

    for row in cursor:
        paywall = " [M]" if row[4] else ""
        print(f"  {row[2]:>6} claps | {row[3]:.0f}min | {row[0][:50]}{paywall}")
        print(f"           by {row[1]} | {row[5]} responses")

    # Optimal read time
    avg = conn.execute("""
        SELECT
            ROUND(AVG(read_time_minutes), 1) as avg_read_time,
            ROUND(AVG(CASE WHEN clap_count > 1000 THEN read_time_minutes END), 1) as viral_read_time,
            MAX(clap_count) as max_claps,
            COUNT(*) as total
        FROM articles WHERE source_tag = ? AND read_time_minutes IS NOT NULL
    """, (tag,)).fetchone()

    print(f"\n  Average read time: {avg[0]} min")
    print(f"  Average for 1000+ clap articles: {avg[1]} min")
    print(f"  Max claps in dataset: {avg[2]:,}")
    print(f"  Total articles: {avg[3]}")

    # Paywall ratio
    paywall_stats = conn.execute("""
        SELECT
            SUM(is_paywalled) as paywalled,
            COUNT(*) as total,
            ROUND(100.0 * SUM(is_paywalled) / COUNT(*), 1) as pct
        FROM articles WHERE source_tag = ?
    """, (tag,)).fetchone()
    print(f"  Paywalled: {paywall_stats[0]}/{paywall_stats[1]} ({paywall_stats[2]}%)")

    conn.close()

Medium's Terms of Service prohibit automated access and scraping. Paywalled articles are especially sensitive — accessing paywalled content without a membership may violate terms. Guidelines:

Key Takeaways