← Back to blog

How to Scrape Tumblr Data in 2026 (API + Web Scraping)

Tumblr remains a surprisingly rich data source in 2026. The platform hosts billions of posts spanning art, fandom content, long-form writing, and niche communities that exist nowhere else. Combining the official API with targeted web scraping gives solid coverage for datasets, blog archives, and reblog chain analysis.

Why Scrape Tumblr? Practical Use Cases

Tumblr occupies a unique niche in the social media landscape. Here's what people actually build with Tumblr data:

Getting a Tumblr API Key

Tumblr's API v2 requires an OAuth consumer key for read-only access. You do not need full OAuth for most scraping tasks — the consumer key alone works for public data.

  1. Create a Tumblr account (or use an existing one).
  2. Go to https://www.tumblr.com/oauth/apps and register a new application.
  3. Fill in any values for the required fields — the name and description don't matter for read-only access.
  4. Copy your Consumer Key (also called the API key).

Tumblr throttles read requests at roughly 1,000 calls per hour per key. Keep a few keys in rotation for higher throughput.

Complete Working Script: Blog Scraper

Save this as tumblr_scraper.py — it handles blog info, post pagination, media extraction, and export:

#!/usr/bin/env python3
"""Tumblr blog scraper using API v2 + web fallback.

Usage:
    pip install httpx beautifulsoup4
    python3 tumblr_scraper.py staff.tumblr.com
"""

import httpx
import time
import json
import csv
import sys
from datetime import datetime

API_KEY = "your_consumer_key_here"  # Get from tumblr.com/oauth/apps
BASE = "https://api.tumblr.com/v2"


def get_blog_info(blog: str) -> dict:
    """Fetch blog metadata."""
    url = f"{BASE}/blog/{blog}/info"
    resp = httpx.get(url, params={"api_key": API_KEY}, timeout=15)
    resp.raise_for_status()
    return resp.json()["response"]["blog"]


def get_blog_posts(
    blog: str,
    post_type: str = None,
    limit: int = 20,
    offset: int = 0,
    tag: str = None,
) -> dict:
    """Fetch a page of blog posts."""
    params = {
        "api_key": API_KEY,
        "limit": limit,
        "offset": offset,
    }
    if post_type:
        params["type"] = post_type
    if tag:
        params["tag"] = tag

    url = f"{BASE}/blog/{blog}/posts"
    resp = httpx.get(url, params=params, timeout=15)
    resp.raise_for_status()
    return resp.json()["response"]


def paginate_blog(
    blog: str,
    post_type: str = None,
    tag: str = None,
    max_posts: int = None,
) -> list[dict]:
    """Paginate through all posts on a blog."""
    all_posts = []
    offset = 0

    while True:
        data = get_blog_posts(
            blog, post_type=post_type, offset=offset, tag=tag
        )
        posts = data.get("posts", [])
        total = data.get("total_posts", 0)

        if not posts:
            break

        all_posts.extend(posts)
        offset += len(posts)

        if max_posts and len(all_posts) >= max_posts:
            all_posts = all_posts[:max_posts]
            break

        print(f"  Fetched {len(all_posts)}/{total} posts...")
        time.sleep(0.4)  # stay well under rate limit

    return all_posts


def extract_media(post: dict) -> list[dict]:
    """Extract all media URLs from a post."""
    urls = []
    ptype = post.get("type")

    if ptype == "photo":
        for photo in post.get("photos", []):
            sizes = photo.get("alt_sizes", [])
            if sizes:
                # First entry is highest resolution
                best = sizes[0]
                urls.append({
                    "type": "image",
                    "url": best["url"],
                    "width": best.get("width", 0),
                    "height": best.get("height", 0),
                })

    elif ptype == "video":
        video_url = post.get("video_url")
        if video_url:
            urls.append({
                "type": "video",
                "url": video_url,
                "duration": post.get("duration", 0),
            })
        # Also check for embedded players
        player = post.get("player", [])
        if player and isinstance(player, list):
            for p in player:
                embed = p.get("embed_code", "")
                if "youtube.com" in embed or "vimeo.com" in embed:
                    urls.append({
                        "type": "embed",
                        "html": embed,
                    })

    elif ptype == "audio":
        audio_url = post.get("audio_url")
        if audio_url:
            urls.append({
                "type": "audio",
                "url": audio_url,
                "artist": post.get("artist", ""),
                "track_name": post.get("track_name", ""),
            })

    return urls


def process_post(post: dict) -> dict:
    """Normalize a post into a clean structure."""
    return {
        "id": post.get("id"),
        "type": post.get("type"),
        "url": post.get("post_url", ""),
        "slug": post.get("slug", ""),
        "date": post.get("date", ""),
        "timestamp": post.get("timestamp", 0),
        "tags": post.get("tags", []),
        "note_count": post.get("note_count", 0),
        "summary": post.get("summary", ""),
        "title": post.get("title", ""),
        "body": post.get("body", ""),
        "caption": post.get("caption", ""),
        "media": extract_media(post),
        "is_reblog": post.get("reblogged_from_id") is not None,
        "reblogged_from": post.get("reblogged_from_name"),
        "reblog_root": post.get("reblogged_root_name"),
        "source_url": post.get("source_url", ""),
    }


def main():
    blog = sys.argv[1] if len(sys.argv) > 1 else "staff.tumblr.com"
    max_posts = int(sys.argv[2]) if len(sys.argv) > 2 else 100

    # Clean up blog identifier
    blog = blog.replace("https://", "").replace("http://", "")
    if not blog.endswith(".tumblr.com") and "." not in blog:
        blog = f"{blog}.tumblr.com"

    print(f"Scraping {blog}...\n")

    # Get blog info
    try:
        info = get_blog_info(blog)
        print(f"  Blog: {info.get('title', blog)}")
        print(f"  Description: {info.get('description', '')[:100]}")
        print(f"  Total posts: {info.get('posts', 0):,}")
        print(f"  Updated: {info.get('updated', '')}")
        print()
    except Exception as e:
        print(f"  Could not fetch blog info: {e}")
        info = {}

    # Fetch posts
    raw_posts = paginate_blog(blog, max_posts=max_posts)
    posts = [process_post(p) for p in raw_posts]

    # Print summary
    print(f"\n{'='*60}")
    print(f"Scraped {len(posts)} posts from {blog}")
    print(f"{'='*60}\n")

    # Stats
    types = {}
    total_notes = 0
    total_media = 0
    reblog_count = 0

    for p in posts:
        types[p["type"]] = types.get(p["type"], 0) + 1
        total_notes += p["note_count"]
        total_media += len(p["media"])
        if p["is_reblog"]:
            reblog_count += 1

    print("Post types:")
    for ptype, count in sorted(
        types.items(), key=lambda x: x[1], reverse=True
    ):
        print(f"  {ptype}: {count}")

    print(f"\nTotal notes (likes + reblogs): {total_notes:,}")
    print(f"Media files found: {total_media}")
    print(f"Reblogs: {reblog_count} "
          f"({reblog_count/len(posts)*100:.1f}%)")

    # Top posts
    top = sorted(posts, key=lambda p: p["note_count"],
                 reverse=True)[:5]
    print(f"\nTop 5 posts by engagement:")
    for i, p in enumerate(top, 1):
        summary = (
            p["summary"][:60] or p["title"][:60] or "(no text)"
        )
        print(f"  {i}. {p['note_count']:>8,} notes | "
              f"{p['type']:>6} | {summary}")

    # Save
    outfile = f"tumblr_{blog.split('.')[0]}.json"
    with open(outfile, "w", encoding="utf-8") as f:
        json.dump({
            "blog": info,
            "posts": posts,
            "scraped_at": datetime.now().isoformat(),
        }, f, indent=2, ensure_ascii=False)
    print(f"\nSaved to {outfile}")


if __name__ == "__main__":
    main()

Example Output

Scraping staff.tumblr.com...

  Blog: Tumblr Staff
  Description: Humans and bots working together to bring you the best of Tumblr.
  Total posts: 4,230
  Updated: 2026-03-29 18:42:00 GMT

  Fetched 20/4230 posts...
  Fetched 40/4230 posts...
  Fetched 60/4230 posts...
  Fetched 80/4230 posts...
  Fetched 100/4230 posts...

============================================================
Scraped 100 posts from staff.tumblr.com
============================================================

Post types:
  text: 52
  photo: 31
  video: 12
  link: 5

Total notes (likes + reblogs): 1,847,200
Media files found: 48
Reblogs: 15 (15.0%)

Top 5 posts by engagement:
  1.  245,000 notes |   text | We're bringing back the old editor. Here's why.
  2.  189,400 notes |  photo | Year in Review 2025: Your most reblogged posts
  3.  142,800 notes |   text | New feature: post scheduling is here
  4.   98,500 notes |  video | Behind the scenes at Tumblr HQ
  5.   76,200 notes |  photo | Tumblrween 2025 winners announced

Saved to tumblr_staff.json

Following Reblog Chains

Every post carries reblogged_root_id (the original post) and reblogged_from_id (the direct parent). To reconstruct a chain, walk backwards by fetching each parent post by ID.

def fetch_post_by_id(blog: str, post_id: int) -> dict | None:
    """Fetch a specific post by its ID."""
    url = f"{BASE}/blog/{blog}/posts"
    params = {"api_key": API_KEY, "id": post_id}
    try:
        resp = httpx.get(url, params=params, timeout=15)
        resp.raise_for_status()
        posts = resp.json()["response"]["posts"]
        return posts[0] if posts else None
    except Exception:
        return None


def trace_reblog_chain(post: dict) -> list[dict]:
    """Walk a reblog chain back to the original post."""
    chain = [post]
    current = post
    seen = {post["id"]}

    while current.get("reblogged_from_id"):
        parent_id = current["reblogged_from_id"]
        parent_blog = current.get("reblogged_from_name", "")
        if not parent_blog or parent_id in seen:
            break

        parent = fetch_post_by_id(parent_blog, parent_id)
        if not parent:
            chain.append({
                "id": parent_id,
                "blog": parent_blog,
                "status": "deleted_or_private",
            })
            break

        chain.append(parent)
        seen.add(parent_id)
        current = parent
        time.sleep(0.3)

    return list(reversed(chain))  # oldest first

Reblog Chain Output Example

chain = trace_reblog_chain(reblogged_post)
# Returns (oldest to newest):
[
    {
        "id": 7291034,
        "blog_name": "originalartist",
        "type": "photo",
        "note_count": 145000,
        "tags": ["my art", "digital art", "fantasy"],
        "date": "2026-01-15 10:30:00 GMT"
    },
    {
        "id": 7291035,
        "blog_name": "artcurator",
        "note_count": 23000,
        "reblogged_from_name": "originalartist",
        "tags": ["signal boost", "artists on tumblr"],
        "date": "2026-01-15 14:22:00 GMT"
    },
    {
        "id": 7291040,
        "blog_name": "fantasyart",
        "note_count": 8900,
        "reblogged_from_name": "artcurator",
        "tags": ["fantasy art", "queue"],
        "date": "2026-01-16 09:15:00 GMT"
    }
]

Note that deleted or private posts in the chain will return empty results. The API does not surface content that has been removed, which is where web scraping becomes necessary.

Web Scraping Fallback with BeautifulSoup

For deleted posts, content behind Tumblr's NSFW flag, or older paginated content that the API handles inconsistently, scraping the rendered HTML is your fallback. Tumblr's blog pages follow predictable URL patterns: https://{blog}.tumblr.com/page/{n}.

from bs4 import BeautifulSoup

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0.0.0 Safari/537.36"
    ),
    "Accept-Language": "en-US,en;q=0.9",
}


def scrape_blog_page(
    blog: str, page: int = 1, proxies: dict = None
) -> list[dict]:
    """Scrape posts from the rendered HTML of a blog page."""
    url = f"https://{blog}.tumblr.com/page/{page}"
    client_kwargs = {
        "headers": HEADERS,
        "timeout": 20,
        "follow_redirects": True,
    }
    if proxies:
        client_kwargs["proxies"] = proxies

    with httpx.Client(**client_kwargs) as client:
        resp = client.get(url)
        resp.raise_for_status()

    soup = BeautifulSoup(resp.text, "html.parser")
    posts = []

    for article in soup.select("article[data-post-id]"):
        post_id = article.get("data-post-id")
        post_type = article.get("data-type", "unknown")
        images = [
            img["src"]
            for img in article.select("img[src]")
            if "media.tumblr.com" in img.get("src", "")
        ]
        text_blocks = [
            p.get_text(strip=True) for p in article.select("p")
        ]

        # Get tags from tag links
        tags = [
            a.get_text(strip=True).lstrip("#")
            for a in article.select("a[href*='/tagged/']")
        ]

        # Get note count
        note_el = article.select_one("[class*='note_count']")
        note_count = 0
        if note_el:
            import re
            nums = re.findall(r'[\d,]+', note_el.get_text())
            if nums:
                note_count = int(nums[0].replace(",", ""))

        posts.append({
            "id": post_id,
            "type": post_type,
            "images": images,
            "text": " ".join(text_blocks),
            "tags": tags,
            "note_count": note_count,
            "url": f"https://{blog}.tumblr.com/post/{post_id}",
        })

    return posts


def scrape_full_blog_web(
    blog: str, max_pages: int = 10, proxies: dict = None
) -> list[dict]:
    """Scrape multiple pages of a blog via web."""
    all_posts = []

    for page_num in range(1, max_pages + 1):
        print(f"  Scraping page {page_num}...")
        posts = scrape_blog_page(blog, page=page_num,
                                 proxies=proxies)

        if not posts:
            print(f"  No posts on page {page_num} — stopping")
            break

        all_posts.extend(posts)
        time.sleep(random.uniform(2.0, 4.0))

    return all_posts

Web Scraping Output Example

posts = scrape_blog_page("artblog")
# Returns:
[
    {
        "id": "738291045",
        "type": "photo",
        "images": [
            "https://64.media.tumblr.com/abc123/s1280x1920/image.jpg",
            "https://64.media.tumblr.com/def456/s1280x1920/image2.jpg"
        ],
        "text": "New piece I finished today. Watercolor on cold press paper, 11x14.",
        "tags": ["my art", "watercolor", "landscape"],
        "note_count": 3420,
        "url": "https://artblog.tumblr.com/post/738291045"
    }
]

Searching by Tag

Tag search is one of the more useful endpoints for dataset collection. It returns recent posts across all blogs for a given tag.

import random


def search_tag(
    tag: str, before: int = None, max_results: int = 200
) -> list[dict]:
    """Search for posts across all blogs by tag.

    Paginates backwards in time using the 'before' timestamp.
    """
    all_posts = []
    current_before = before

    while len(all_posts) < max_results:
        params = {"api_key": API_KEY, "tag": tag, "limit": 20}
        if current_before:
            params["before"] = current_before

        resp = httpx.get(
            f"{BASE}/tagged", params=params, timeout=15
        )
        resp.raise_for_status()
        posts = resp.json()["response"]

        if not posts:
            break

        all_posts.extend(posts)

        # Use last post's timestamp for next page
        current_before = posts[-1].get("timestamp")
        if not current_before:
            break

        time.sleep(0.4)

    return all_posts[:max_results]


# Example: search for "digital art" posts
results = search_tag("digital art", max_results=100)
print(f"Found {len(results)} posts tagged #digital art")
for p in results[:5]:
    blog = p.get("blog_name", "unknown")
    notes = p.get("note_count", 0)
    summary = p.get("summary", "")[:50]
    print(f"  {blog}: {notes:,} notes — {summary}")

Tag Search Output

Found 100 posts tagged #digital art

  artcurator: 12,400 notes — New character design commission for @client
  painterly-dreams: 8,900 notes — Speed painting process video (3 hours → 30 sec)
  digitalartist_k: 5,200 notes — Tutorial: how I paint realistic eyes
  studioghoul: 3,800 notes — Inktober 2025 collection — all 31 pieces
  pixelwitch: 2,100 notes — Landscape study from reference photo

Downloading Media Files

For archival or dataset building, you'll want to download the actual media files:

import os
from pathlib import Path
from urllib.parse import urlparse


def download_media(
    posts: list[dict], output_dir: str = "tumblr_media"
) -> dict:
    """Download all media from processed posts."""
    Path(output_dir).mkdir(exist_ok=True)
    stats = {"downloaded": 0, "skipped": 0, "failed": 0}

    with httpx.Client(timeout=30, follow_redirects=True) as client:
        for post in posts:
            for media in post.get("media", []):
                url = media.get("url")
                if not url or media.get("type") == "embed":
                    continue

                # Generate filename from URL
                parsed = urlparse(url)
                ext = os.path.splitext(parsed.path)[1] or ".jpg"
                filename = f"{post['id']}_{media['type']}{ext}"
                filepath = os.path.join(output_dir, filename)

                if os.path.exists(filepath):
                    stats["skipped"] += 1
                    continue

                try:
                    resp = client.get(url)
                    resp.raise_for_status()
                    with open(filepath, "wb") as f:
                        f.write(resp.content)
                    stats["downloaded"] += 1
                except Exception as e:
                    print(f"  Failed: {filename} — {e}")
                    stats["failed"] += 1

                time.sleep(0.2)

    print(f"\nMedia download complete:")
    print(f"  Downloaded: {stats['downloaded']}")
    print(f"  Skipped (exists): {stats['skipped']}")
    print(f"  Failed: {stats['failed']}")
    return stats

Anti-Bot Measures and Rate Limiting

Tumblr enforces rate limits at two layers. The API applies a per-key hourly cap. The web layer uses Cloudflare, which challenges IPs that request too quickly, use datacenter ranges, or lack realistic browser headers.

API Rate Limits

Limit Value What Happens
Requests per hour ~1,000 per key 429 response
Requests per second ~5 sustained Temporary block
Posts per request Max 20 Silently capped
Tag search depth ~1,000 posts back Empty responses

Practical Mitigations

  1. Space API requests 300-500ms apart. Burst requests get queued or dropped.

  2. Rotate API keys for high-volume work. Register 3-5 apps under different accounts.

  3. For web scraping at scale, residential proxies are necessary. Datacenter IPs get flagged by Cloudflare quickly. ThorData provides residential proxy pools that rotate by request, handling Cloudflare's IP reputation checks without requiring you to manage your own infrastructure.

# Using proxies with httpx
proxies = {
    "http://": "http://user:[email protected]:9000",
    "https://": "http://user:[email protected]:9000",
}

with httpx.Client(proxies=proxies, timeout=20) as client:
    resp = client.get(url, headers=HEADERS)
  1. Vary your User-Agent string across requests at volume.

  2. NSFW content requires the requesting OAuth account to have adult content enabled in its preferences. The API key alone won't surface flagged posts.

  3. Handle 429s gracefully:

def api_request_with_backoff(url: str, params: dict,
                              max_retries: int = 3) -> dict:
    """Make API request with exponential backoff."""
    for attempt in range(max_retries):
        resp = httpx.get(url, params=params, timeout=15)

        if resp.status_code == 200:
            return resp.json()

        if resp.status_code == 429:
            wait = 2 ** (attempt + 1) * 10  # 20s, 40s, 80s
            print(f"  Rate limited — waiting {wait}s "
                  f"(attempt {attempt + 1}/{max_retries})")
            time.sleep(wait)
            continue

        resp.raise_for_status()

    raise Exception(f"Failed after {max_retries} retries")

Building a Tag Analytics Dashboard

A practical application — analyze tag popularity and co-occurrence for a blog:

from collections import Counter


def analyze_tags(posts: list[dict]) -> dict:
    """Analyze tag usage patterns across posts."""
    tag_counts = Counter()
    tag_notes = {}  # tag -> total notes
    co_occurrence = Counter()

    for post in posts:
        tags = [t.lower() for t in post.get("tags", [])]
        notes = post.get("note_count", 0)

        for tag in tags:
            tag_counts[tag] += 1
            tag_notes[tag] = tag_notes.get(tag, 0) + notes

        # Track which tags appear together
        for i, tag_a in enumerate(tags):
            for tag_b in tags[i + 1:]:
                pair = tuple(sorted([tag_a, tag_b]))
                co_occurrence[pair] += 1

    # Calculate avg engagement per tag
    tag_engagement = {
        tag: tag_notes[tag] / tag_counts[tag]
        for tag in tag_counts
    }

    print("Top 10 tags by frequency:")
    for tag, count in tag_counts.most_common(10):
        avg = tag_engagement[tag]
        print(f"  #{tag}: {count} posts, "
              f"avg {avg:,.0f} notes/post")

    print("\nTop 5 tag pairs (co-occurrence):")
    for (a, b), count in co_occurrence.most_common(5):
        print(f"  #{a} + #{b}: {count} posts together")

    print("\nHighest engagement tags (min 3 posts):")
    engaged = sorted(
        [(t, e) for t, e in tag_engagement.items()
         if tag_counts[t] >= 3],
        key=lambda x: x[1],
        reverse=True,
    )
    for tag, avg_notes in engaged[:5]:
        print(f"  #{tag}: avg {avg_notes:,.0f} notes "
              f"({tag_counts[tag]} posts)")

    return {
        "tag_counts": dict(tag_counts.most_common(50)),
        "tag_engagement": tag_engagement,
        "co_occurrence": {
            f"{a}+{b}": c
            for (a, b), c in co_occurrence.most_common(20)
        },
    }

Tag Analytics Output

Top 10 tags by frequency:
  #my art: 45 posts, avg 2,340 notes/post
  #digital art: 38 posts, avg 1,890 notes/post
  #fanart: 29 posts, avg 4,120 notes/post
  #wip: 22 posts, avg 890 notes/post
  #commission: 18 posts, avg 1,200 notes/post
  #tutorial: 15 posts, avg 5,600 notes/post
  #process video: 12 posts, avg 3,400 notes/post
  #character design: 11 posts, avg 2,100 notes/post
  #artists on tumblr: 10 posts, avg 1,780 notes/post
  #queue: 8 posts, avg 1,100 notes/post

Top 5 tag pairs (co-occurrence):
  #my art + #digital art: 28 posts together
  #fanart + #my art: 18 posts together
  #digital art + #artists on tumblr: 9 posts together
  #commission + #my art: 8 posts together
  #tutorial + #digital art: 7 posts together

Highest engagement tags (min 3 posts):
  #tutorial: avg 5,600 notes (15 posts)
  #fanart: avg 4,120 notes (29 posts)
  #process video: avg 3,400 notes (12 posts)
  #my art: avg 2,340 notes (45 posts)
  #character design: avg 2,100 notes (11 posts)

Summary

Tumblr's API v2 is one of the more developer-friendly social media APIs still available — a consumer key gets you public blog data, post pagination, tag search, and reblog chain traversal without OAuth. Combine that with BeautifulSoup for content the API doesn't surface (deleted posts, NSFW-flagged content, inconsistent pagination), and you can build comprehensive datasets.

Key numbers to remember: 1,000 API calls per hour per key, 20 posts per request, 300-500ms between calls to stay safe. For web scraping at volume, residential proxies handle Cloudflare. Store raw API responses before processing — Tumblr posts are mutable, and timestamped snapshots matter for longitudinal research.

The API reference lives at https://www.tumblr.com/docs/en/api/v2. The data-post-id and data-type attributes on article elements have been reliable scraping anchors for several years.