How to Scrape Medium Articles in 2026: Content, Tags & User Profiles
How to Scrape Medium Articles in 2026: Content, Tags & User Profiles
Medium hosts millions of articles across technology, business, science, and culture. For content analysis, trend research, or building reading recommendation systems, Medium's data is valuable — full article text, clap counts, read times, user follower counts, and tag taxonomies.
Medium deprecated its public API in 2023, but the platform leaks structured data through several channels: clean URL formats that return parseable JSON, an unofficial GraphQL API, and direct HTML scraping for content not available through JSON endpoints. This guide covers all of them.
What Data Medium Exposes
Medium articles and profiles contain:
- Article content: title, subtitle, full body text (HTML), embedded images and code blocks
- Metadata: publish date, read time, clap count, response count, boost status
- Author data: name, username, bio, follower count, following count, member status
- Tags: associated topic tags (up to 5 per article), tag follower counts
- Responses: threaded comments with author info and clap counts
- Publications: name, follower count, associated writers, article lists
- Tag feeds: trending and latest articles per topic, recommended content
- Series/Lists: curated reading lists by authors
Medium's Anti-Scraping Defenses in 2026
Medium has moderate but layered protections:
- Paywall enforcement: Member-only articles return truncated content (about 40% of popular articles). The paywall check is server-side — you can't bypass it with JavaScript tricks.
- Rate limiting: Aggressive throttling after 50-80 requests per minute. Returns 429 responses with
Retry-Afterheaders. - Bot fingerprinting: Medium tracks browser fingerprints on suspicious traffic. Inconsistent headers or high-frequency access patterns trigger CAPTCHA challenges.
- Dynamic rendering: Articles load via React hydration. The initial HTML contains the full text, but some metadata requires parsing the embedded Apollo state.
- Cloudflare protection: Standard Cloudflare JS challenges on flagged IPs — particularly targeting datacenter ranges that make many requests.
- gRPC API: Medium's mobile app uses a gRPC API that's harder to call from Python than their JSON API.
Setting Up Your Environment
pip install httpx beautifulsoup4 fake-useragent lxml sqlite3
For Cloudflare bypass (when needed):
pip install curl-cffi
Method 1: The ?format=json Trick (Most Reliable)
Medium articles and profile pages can return structured JSON by appending ?format=json to any URL. Medium prefixes the response with ])}while(1);</x> as an anti-XSSI measure — strip that prefix and you get clean, deeply nested JSON with all article data:
import httpx
import json
import time
import random
import re
from fake_useragent import UserAgent
from curl_cffi import requests as cffi_requests
ua = UserAgent()
JSON_HIJACK_PREFIX = "])}while(1);</x>"
# ThorData residential proxy — required for Cloudflare protection
# https://thordata.partnerstack.com/partner/0a0x4nzq (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
PROXY = "http://USERNAME:[email protected]:7777"
def fetch_medium_json(url: str, proxy: str = None) -> dict:
"""
Fetch a Medium URL and return parsed JSON.
Works for: article URLs, profile URLs, tag URLs, publication URLs.
"""
# Append format=json
separator = "&" if "?" in url else "?"
json_url = f"{url}{separator}format=json"
headers = {
"User-Agent": ua.random,
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate",
"Referer": "https://medium.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
}
for attempt in range(3):
try:
if proxy:
session = cffi_requests.Session(impersonate="chrome124")
session.proxies = {"http": proxy, "https": proxy}
resp = session.get(json_url, headers=headers, timeout=20)
else:
with httpx.Client(headers=headers, follow_redirects=True, timeout=20) as client:
resp = client.get(json_url)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 30))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
continue
if resp.status_code != 200:
return {"error": f"HTTP {resp.status_code}", "url": url}
text = resp.text
if text.startswith(JSON_HIJACK_PREFIX):
text = text[len(JSON_HIJACK_PREFIX):]
return json.loads(text)
except json.JSONDecodeError as e:
return {"error": f"JSON parse failed: {e}", "url": url}
except Exception as e:
if attempt == 2:
return {"error": str(e), "url": url}
time.sleep(5 * (attempt + 1))
return {"error": "Max retries exceeded", "url": url}
def scrape_medium_article(article_url: str, proxy: str = None) -> dict:
"""
Scrape a Medium article for full content and metadata.
Works with:
- medium.com/@author/article-slug
- medium.com/publication/article-slug
- medium.com/p/article-id
- custom domain articles (e.g., towardsdatascience.com/article-slug)
"""
data = fetch_medium_json(article_url, proxy)
if "error" in data:
return data
payload = data.get("payload", {})
post = payload.get("value", {})
if not post:
# Some URLs return the post in a different location
posts = payload.get("post")
if posts:
post = posts
else:
return {"error": "No post data found in response", "url": article_url}
# Extract article metadata
article = {
"url": article_url,
"id": post.get("id"),
"title": post.get("title"),
"subtitle": post.get("content", {}).get("subtitle") if isinstance(post.get("content"), dict) else None,
"slug": post.get("slug"),
"clap_count": post.get("clapCount", 0),
"voter_count": post.get("voterCount", 0),
"read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
"read_time_raw": post.get("readingTime"),
"word_count": post.get("wordCount"),
"published_at": post.get("firstPublishedAt"),
"updated_at": post.get("latestPublishedAt"),
"response_count": post.get("responsesCount", 0),
"is_paywalled": post.get("isLockedPreviewOnly", False) or post.get("memberOnly", False),
"is_boosted": post.get("isBoosted", False),
"language": post.get("detectedLanguage"),
"tags": [t.get("slug") for t in post.get("tags", []) if isinstance(t, dict)],
"canonical_url": post.get("canonicalUrl"),
"license": post.get("license"),
}
# Extract author info
creator = post.get("creator", {})
if isinstance(creator, dict):
article["author"] = {
"id": creator.get("userId"),
"name": creator.get("name"),
"username": creator.get("username"),
"bio": creator.get("bio"),
"follower_count": creator.get("socialStats", {}).get("followerCount") if isinstance(creator.get("socialStats"), dict) else None,
"following_count": creator.get("socialStats", {}).get("followingCount") if isinstance(creator.get("socialStats"), dict) else None,
"is_writer_program": creator.get("isWriterProgramEnrolled", False),
"medium_member": creator.get("isMediumMember", False),
}
else:
article["author"] = {}
# Extract publication info
collection = post.get("collection")
if isinstance(collection, dict) and collection:
article["publication"] = {
"id": collection.get("id"),
"name": collection.get("name"),
"slug": collection.get("slug"),
"follower_count": collection.get("followersCount", 0),
}
else:
article["publication"] = None
# Extract body text from content paragraphs
content = post.get("content", {})
if isinstance(content, dict):
body_model = content.get("bodyModel", {})
if isinstance(body_model, dict):
paragraphs = body_model.get("paragraphs", [])
body_parts = []
image_urls = []
for p in paragraphs:
p_type = p.get("type")
text = p.get("text", "")
# Skip images (type 4), embedded media (type 11), and horizontal rules (type 6)
if p_type in (4, 6, 11):
# But still collect image URLs
if p.get("metadata", {}).get("originalWidth"):
img_id = p.get("metadata", {}).get("id")
if img_id:
image_urls.append(f"https://miro.medium.com/v2/resize:fit:1400/{img_id}")
continue
if text:
# Handle code blocks (type 8 = PRE/code block)
if p_type == 8:
body_parts.append(f"```\n{text}\n```")
elif p_type == 3: # Header
body_parts.append(f"\n## {text}\n")
elif p_type == 13: # Small header
body_parts.append(f"\n### {text}\n")
elif p_type == 9: # Quote/pullquote
body_parts.append(f"\n> {text}\n")
else:
body_parts.append(text)
article["body_text"] = "\n\n".join(body_parts)
article["body_length"] = len(article["body_text"])
article["image_urls"] = image_urls[:10] # First 10 images
return article
def scrape_medium_article_html(article_url: str, proxy: str = None) -> dict:
"""
Fallback: scrape article from rendered HTML.
Use when ?format=json returns empty body (some custom domain articles).
"""
if proxy:
session = cffi_requests.Session(impersonate="chrome124")
session.proxies = {"http": proxy, "https": proxy}
resp = session.get(article_url, timeout=20)
else:
with httpx.Client(follow_redirects=True, timeout=20) as client:
resp = client.get(
article_url,
headers={"User-Agent": ua.random, "Accept-Language": "en-US,en;q=0.9"}
)
if resp.status_code != 200:
return {"error": f"HTTP {resp.status_code}"}
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, "lxml")
# Title
title = None
h1 = soup.find("h1")
if h1:
title = h1.get_text(strip=True)
# Article body
article_el = soup.find("article")
if not article_el:
article_el = soup.find("div", {"class": re.compile(r"article|content|post")})
body_text = ""
if article_el:
# Remove nav, footer, ads
for el in article_el.select("nav, footer, [class*='footer'], [class*='paywall'], button"):
el.decompose()
body_text = article_el.get_text(separator="\n", strip=True)
return {
"url": article_url,
"title": title,
"body_text": body_text,
"source": "html_fallback",
}
Method 2: Tag Feed Scraping
Medium organizes content by tags with up to 5 tags per article. Each tag has a "latest" and "trending" feed:
def scrape_tag_feed(
tag: str,
proxy: str = None,
include_paywalled: bool = True,
) -> list[dict]:
"""
Scrape Medium's tag feed for articles.
tag examples: 'python', 'machine-learning', 'startup', 'design'
"""
url = f"https://medium.com/tag/{tag}"
data = fetch_medium_json(url, proxy)
if "error" in data:
return []
payload = data.get("payload", {})
articles = []
# Articles come from multiple locations in the payload
refs = payload.get("references", {})
posts_dict = refs.get("Post", {})
# Also check collection items (publication articles in the tag)
items = (
payload.get("panda", {}).get("items", []) +
payload.get("streamItems", {}).get("items", []) if isinstance(payload.get("streamItems"), dict) else []
)
# Process referenced posts
for post_id, post in posts_dict.items():
if not isinstance(post, dict):
continue
creator = post.get("creator", {})
is_paywalled = post.get("isLockedPreviewOnly", False) or post.get("memberOnly", False)
if not include_paywalled and is_paywalled:
continue
article = {
"id": post_id,
"title": post.get("title"),
"subtitle": post.get("content", {}).get("subtitle") if isinstance(post.get("content"), dict) else None,
"clap_count": post.get("clapCount", 0),
"read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
"published_at": post.get("firstPublishedAt"),
"author_name": creator.get("name") if isinstance(creator, dict) else None,
"author_username": creator.get("username") if isinstance(creator, dict) else None,
"is_paywalled": is_paywalled,
"response_count": post.get("responsesCount", 0),
"voter_count": post.get("voterCount", 0),
"tags": [t.get("slug") for t in post.get("tags", []) if isinstance(t, dict)],
"url": f"https://medium.com/p/{post_id}",
}
# Get canonical URL if available
slug = post.get("slug")
if slug and isinstance(creator, dict) and creator.get("username"):
article["url"] = f"https://medium.com/@{creator['username']}/{slug}"
articles.append(article)
# Sort by clap count (highest first)
return sorted(articles, key=lambda a: a.get("clap_count", 0), reverse=True)
def scrape_tag_top_articles(
tag: str,
period: str = "year",
proxy: str = None,
) -> list[dict]:
"""
Get top articles for a tag filtered by time period.
period options: year, month, week, day
"""
url = f"https://medium.com/tag/{tag}/top/{period}"
data = fetch_medium_json(url, proxy)
if "error" in data:
return []
payload = data.get("payload", {})
refs = payload.get("references", {}).get("Post", {})
panda_items = payload.get("panda", {})
articles = []
for post_id, post in refs.items():
if not isinstance(post, dict):
continue
creator = post.get("creator", {})
articles.append({
"id": post_id,
"title": post.get("title"),
"clap_count": post.get("clapCount", 0),
"read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
"author": creator.get("name") if isinstance(creator, dict) else None,
"is_paywalled": post.get("isLockedPreviewOnly", False),
"url": f"https://medium.com/p/{post_id}",
})
return sorted(articles, key=lambda a: a.get("clap_count", 0), reverse=True)
Method 3: User Profile Scraping
Scrape a writer's full profile and article list:
def scrape_user_profile(username: str, proxy: str = None) -> dict:
"""
Scrape a Medium user's profile including their articles and stats.
username: the @username without the @ symbol
"""
url = f"https://medium.com/@{username}"
data = fetch_medium_json(url, proxy)
if "error" in data:
return data
payload = data.get("payload", {})
user = payload.get("user", {})
if not user:
# Try alternate structure
user = payload.get("value", {})
profile = {
"username": username,
"user_id": user.get("userId"),
"name": user.get("name"),
"bio": user.get("bio"),
"image_url": None,
"follower_count": user.get("socialStats", {}).get("followerCount") if isinstance(user.get("socialStats"), dict) else 0,
"following_count": user.get("socialStats", {}).get("followingCount") if isinstance(user.get("socialStats"), dict) else 0,
"is_writer_program": user.get("isWriterProgramEnrolled", False),
"is_medium_member": user.get("isMediumMember", False),
"is_suspended": user.get("isSuspended", False),
"custom_domain": user.get("customDomain"),
"articles": [],
}
# Extract profile image
image_id = user.get("imageId")
if image_id:
profile["image_url"] = f"https://miro.medium.com/v2/resize:fill:96:96/{image_id}"
# Extract articles from references
refs = payload.get("references", {}).get("Post", {})
for post_id, post in refs.items():
if not isinstance(post, dict):
continue
profile["articles"].append({
"id": post_id,
"title": post.get("title"),
"clap_count": post.get("clapCount", 0),
"published_at": post.get("firstPublishedAt"),
"read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
"is_paywalled": post.get("isLockedPreviewOnly", False),
"response_count": post.get("responsesCount", 0),
"tags": [t.get("slug") for t in post.get("tags", []) if isinstance(t, dict)],
"url": f"https://medium.com/p/{post_id}",
})
profile["articles"].sort(key=lambda a: a.get("clap_count", 0), reverse=True)
profile["total_articles"] = len(profile["articles"])
profile["total_claps"] = sum(a.get("clap_count", 0) for a in profile["articles"])
return profile
def get_user_latest_articles(
username: str,
max_articles: int = 50,
proxy: str = None,
) -> list[dict]:
"""
Get a user's latest articles by paginating their profile feed.
The ?format=json on the profile page only returns ~10 articles —
use the stream API to get more.
"""
all_articles = []
page = 1
while len(all_articles) < max_articles:
# Medium's user stream endpoint
url = f"https://medium.com/@{username}/latest?format=json&page={page}&limit=10"
data = fetch_medium_json(f"https://medium.com/@{username}/latest", proxy)
if "error" in data or not data:
break
payload = data.get("payload", {})
refs = payload.get("references", {}).get("Post", {})
if not refs:
break
for post_id, post in refs.items():
if isinstance(post, dict):
all_articles.append({
"id": post_id,
"title": post.get("title"),
"clap_count": post.get("clapCount", 0),
"published_at": post.get("firstPublishedAt"),
"read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
"is_paywalled": post.get("isLockedPreviewOnly", False),
"url": f"https://medium.com/p/{post_id}",
})
if not payload.get("paging", {}).get("nextPageToken"):
break
page += 1
time.sleep(random.uniform(2, 5))
return all_articles[:max_articles]
Method 4: Medium's GraphQL API
For recommendation data and personalized feeds, Medium's frontend uses a GraphQL API:
def fetch_surfacing_recommendations(
tag: str,
first: int = 25,
after: str = "",
proxy: str = None,
) -> list[dict]:
"""
Fetch recommended articles from Medium's GraphQL API.
This is the same endpoint the Medium web app uses for tag feeds.
"""
url = "https://medium.com/_/graphql"
# This query mirrors what Medium's frontend sends for tag pages
query = {
"operationName": "TopicFeedQuery",
"variables": {
"tagSlug": tag,
"first": first,
"after": after,
},
"query": """
query TopicFeedQuery($tagSlug: String!, $first: Int, $after: String) {
tagFromSlug(tagSlug: $tagSlug) {
name
postCount
followerCount
viewerEdge {
feedItems(first: $first, after: $after) {
pageInfo { hasNextPage endCursor }
edges {
node {
feedId
post {
id
title
clapCount
readingTime
memberOnly
firstPublishedAt
creator {
name
username
imageId
}
tags { name slug }
}
}
}
}
}
}
}
"""
}
headers = {
"User-Agent": ua.random,
"Accept": "application/json",
"Content-Type": "application/json",
"Referer": f"https://medium.com/tag/{tag}",
"Graphql-Operation": "TopicFeedQuery",
}
for attempt in range(3):
try:
if proxy:
session = cffi_requests.Session(impersonate="chrome124")
session.proxies = {"http": proxy, "https": proxy}
resp = session.post(url, json=query, headers=headers, timeout=20)
else:
with httpx.Client(headers=headers, timeout=20) as client:
resp = client.post(url, json=query)
if resp.status_code == 429:
time.sleep(30 * (2 ** attempt))
continue
if resp.status_code != 200:
return []
data = resp.json()
tag_data = data.get("data", {}).get("tagFromSlug", {})
edges = (
tag_data.get("viewerEdge", {})
.get("feedItems", {})
.get("edges", [])
)
articles = []
for edge in edges:
post = edge.get("node", {}).get("post", {})
if not post:
continue
creator = post.get("creator", {})
articles.append({
"id": post.get("id"),
"title": post.get("title"),
"claps": post.get("clapCount"),
"read_time": round(post.get("readingTime", 0) / 60, 1) if post.get("readingTime") else None,
"is_paywalled": post.get("memberOnly", False),
"author": creator.get("name") if isinstance(creator, dict) else None,
"author_username": creator.get("username") if isinstance(creator, dict) else None,
"tags": [t.get("slug") for t in (post.get("tags") or [])],
"published_at": post.get("firstPublishedAt"),
"url": f"https://medium.com/p/{post.get('id')}",
})
return articles
except Exception as e:
if attempt == 2:
print(f"GraphQL error: {e}")
return []
time.sleep(5 * (attempt + 1))
return []
Building a Trend Research Pipeline
Here's a complete pipeline that scrapes trending articles across multiple tags and stores them for analysis:
import sqlite3
from datetime import datetime
def setup_medium_database(db_path: str) -> sqlite3.Connection:
"""Create SQLite schema for Medium article data."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
id TEXT PRIMARY KEY,
title TEXT,
subtitle TEXT,
url TEXT,
clap_count INTEGER DEFAULT 0,
voter_count INTEGER DEFAULT 0,
response_count INTEGER DEFAULT 0,
read_time_minutes REAL,
word_count INTEGER,
published_at INTEGER,
is_paywalled INTEGER DEFAULT 0,
is_boosted INTEGER DEFAULT 0,
language TEXT,
author_name TEXT,
author_username TEXT,
author_follower_count INTEGER,
publication_name TEXT,
tags TEXT,
body_text TEXT,
body_length INTEGER,
source_tag TEXT,
scraped_at TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS tag_trends (
tag TEXT,
article_id TEXT,
rank_position INTEGER,
scraped_at TEXT,
PRIMARY KEY (tag, article_id, scraped_at)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_claps ON articles(clap_count DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_articles_tag ON articles(source_tag)")
conn.commit()
return conn
def save_article(conn: sqlite3.Connection, article: dict, source_tag: str = None):
"""Save an article to the database."""
now = datetime.now().isoformat()
author = article.get("author", {})
pub = article.get("publication")
conn.execute("""
INSERT OR REPLACE INTO articles
(id, title, subtitle, url, clap_count, voter_count, response_count,
read_time_minutes, word_count, published_at, is_paywalled, is_boosted,
language, author_name, author_username, author_follower_count,
publication_name, tags, body_text, body_length, source_tag, scraped_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
article.get("id"),
article.get("title"),
article.get("subtitle"),
article.get("url"),
article.get("clap_count", 0),
article.get("voter_count", 0),
article.get("response_count", 0),
article.get("read_time"),
article.get("word_count"),
article.get("published_at"),
int(article.get("is_paywalled", False)),
int(article.get("is_boosted", False)),
article.get("language"),
author.get("name") if isinstance(author, dict) else article.get("author_name"),
author.get("username") if isinstance(author, dict) else article.get("author_username"),
author.get("follower_count") if isinstance(author, dict) else None,
pub.get("name") if isinstance(pub, dict) else None,
json.dumps(article.get("tags", [])),
article.get("body_text"),
article.get("body_length", 0),
source_tag,
now,
))
def scrape_tag_trends(
tags: list[str],
include_body: bool = False,
proxy: str = None,
db_path: str = "medium_trends.db",
) -> dict:
"""
Scrape trending articles across multiple tags.
If include_body=True, fetches full article text (slower, more requests).
"""
conn = setup_medium_database(db_path)
now = datetime.now().isoformat()
stats = {}
for tag in tags:
print(f"\nScraping tag: #{tag}")
articles = scrape_tag_feed(tag, proxy=proxy)
if not articles:
# Try GraphQL fallback
articles = fetch_surfacing_recommendations(tag, proxy=proxy)
if not articles:
print(f" No articles found for #{tag}")
stats[tag] = 0
continue
print(f" Found {len(articles)} articles")
saved = 0
for i, article in enumerate(articles[:50]): # Cap at 50 per tag
article_id = article.get("id")
if not article_id:
continue
# Fetch full content if requested (and article isn't paywalled)
if include_body and not article.get("is_paywalled"):
article_url = article.get("url")
if article_url:
time.sleep(random.uniform(3, 7))
full_article = scrape_medium_article(article_url, proxy=proxy)
if "body_text" in full_article:
article.update(full_article)
save_article(conn, article, source_tag=tag)
# Record ranking position for trend analysis
conn.execute(
"INSERT OR REPLACE INTO tag_trends VALUES (?,?,?,?)",
(tag, article_id, i + 1, now)
)
saved += 1
conn.commit()
stats[tag] = saved
print(f" Saved {saved} articles")
# Pause between tags
time.sleep(random.uniform(5, 12))
conn.close()
return stats
# Research content trends across tech topics
tech_tags = [
"python", "machine-learning", "artificial-intelligence",
"javascript", "startup", "data-science", "programming",
"software-engineering",
]
stats = scrape_tag_trends(
tech_tags,
include_body=False, # Set True for full article text (much slower)
proxy=PROXY,
db_path="medium_tech_trends.db",
)
print(f"\nScraping complete: {sum(stats.values())} total articles")
Proxy Configuration and Anti-Bot Bypass
Medium's Cloudflare integration blocks most datacenter IPs. From an AWS or GCP IP, you'll see 403s or JS challenges within the first few requests.
ThorData's residential proxy pool works well for Medium:
import random
def get_medium_proxy(session_id: str = None) -> str:
"""
Get a proxy URL for Medium scraping.
Use sticky sessions for scraping multiple pages from the same user session.
"""
base = "http://USERNAME:PASSWORD"
host = "gate.thordata.com:7777"
if session_id:
# Sticky session: same exit IP for this session
return f"{base}-session-{session_id}@{host}"
# Rotating: new IP each request
return f"{base}@{host}"
# Sticky session for a tag + article scraping run
session_id = str(random.randint(10000, 99999))
proxy = get_medium_proxy(session_id)
articles = scrape_tag_feed("artificial-intelligence", proxy=proxy)
print(f"Found {len(articles)} articles in #artificial-intelligence")
for a in articles[:5]:
paywall = " [MEMBER]" if a.get("is_paywalled") else ""
print(f" {a.get('clap_count', 0):>6} claps | {a.get('title', '')[:60]}{paywall}")
time.sleep(random.uniform(3, 8))
Analyzing Medium Trends
Once you have data in SQLite, you can identify trending content patterns:
import sqlite3
def analyze_tag_trends(db_path: str, tag: str) -> None:
"""Analyze what content performs best for a given tag."""
conn = sqlite3.connect(db_path)
# Top articles by claps
print(f"\nTop Articles in #{tag}:")
cursor = conn.execute("""
SELECT title, author_name, clap_count, read_time_minutes,
is_paywalled, response_count
FROM articles
WHERE source_tag = ? AND clap_count > 0
ORDER BY clap_count DESC
LIMIT 10
""", (tag,))
for row in cursor:
paywall = " [M]" if row[4] else ""
print(f" {row[2]:>6} claps | {row[3]:.0f}min | {row[0][:50]}{paywall}")
print(f" by {row[1]} | {row[5]} responses")
# Optimal read time
avg = conn.execute("""
SELECT
ROUND(AVG(read_time_minutes), 1) as avg_read_time,
ROUND(AVG(CASE WHEN clap_count > 1000 THEN read_time_minutes END), 1) as viral_read_time,
MAX(clap_count) as max_claps,
COUNT(*) as total
FROM articles WHERE source_tag = ? AND read_time_minutes IS NOT NULL
""", (tag,)).fetchone()
print(f"\n Average read time: {avg[0]} min")
print(f" Average for 1000+ clap articles: {avg[1]} min")
print(f" Max claps in dataset: {avg[2]:,}")
print(f" Total articles: {avg[3]}")
# Paywall ratio
paywall_stats = conn.execute("""
SELECT
SUM(is_paywalled) as paywalled,
COUNT(*) as total,
ROUND(100.0 * SUM(is_paywalled) / COUNT(*), 1) as pct
FROM articles WHERE source_tag = ?
""", (tag,)).fetchone()
print(f" Paywalled: {paywall_stats[0]}/{paywall_stats[1]} ({paywall_stats[2]}%)")
conn.close()
Legal and Ethical Notes
Medium's Terms of Service prohibit automated access and scraping. Paywalled articles are especially sensitive — accessing paywalled content without a membership may violate terms. Guidelines:
- Only scrape publicly visible (non-paywalled) content for research
- Don't republish Medium articles or article text as your own content
- Respect
Retry-Afterheaders when rate limited - Don't use scraped data to build a competing reading platform
- Cache aggressively — article content doesn't change after publication
Key Takeaways
?format=jsonis the foundation: Append it to any Medium URL to get structured JSON. Strip the XSSI prefix (])}while(1);</x>) before parsing.- Tag feeds are the entry point: Start with tag feeds to get article IDs, then fetch individual articles for full content.
- 60% paywall rate on popular content: Your scraper should flag paywalled articles and respect the paywall. Non-paywalled articles are sufficient for most trend analysis.
- Delays of 3-8 seconds: Medium's rate limiting triggers at ~80 requests/minute. Spread requests with random delays.
- Residential proxies are mandatory: ThorData handles Medium's Cloudflare checks without browser automation overhead.
- GraphQL as fallback: When
?format=jsonreturns incomplete data, Medium's GraphQL endpoint at/_/graphqlprovides the same content with a more predictable schema. - Cache by article ID: Article content doesn't change after publication. Cache with permanent or very long TTLs.