← Back to blog

Scraping Twitter Spaces: Metadata, Participants and Topics (2026)

Scraping Twitter Spaces: Metadata, Participants and Topics (2026)

Twitter Spaces has become a serious signal layer for media monitoring, influencer research, and real-time topic tracking. A scheduled Space with 5,000 listeners tells you something about the moment that no tweet thread can — who organized it, who spoke, what the crowd came for. For analysts tracking brand sentiment, journalists mapping influence networks, or researchers watching how narratives form in real time, Spaces metadata is underused and surprisingly accessible through the API.

This guide covers the Twitter API v2 Spaces endpoints in detail, monitoring strategies, SQLite storage, proxy integration, and practical gotchas for 2026.

What Data Is Available

The Twitter API v2 surfaces the following for Spaces:

What you cannot get: the actual audio, a real-time transcript, or historical participant lists after the Space ends. The API is event-oriented — you need to poll or subscribe during the window.

Authentication and Rate Limits

Twitter's API authentication is mandatory for all Spaces endpoints. There is no public JSON trick. You need at minimum a Bearer token from a registered app at developer.twitter.com.

Rate limits in 2026:

Tier Monthly read Spaces endpoints Rate window
Free 500K tweets Not included
Basic ($100/mo) Included 10 req/15min per app 15 minutes
Pro ($5,000/mo) Included 300 req/15min 15 minutes
Enterprise Negotiated Firehose access

Per-IP throttling is a separate layer from these documented limits. If you're running multiple apps or rotating tokens from the same IP range, you'll hit soft blocks before hitting documented rate limits. Residential proxies from ThorData help distribute this load across multiple IPs.

Core API Client Setup

import httpx
import asyncio
import json
import time
from typing import Optional

BEARER_TOKEN = "YOUR_BEARER_TOKEN"
BASE_URL = "https://api.twitter.com/2"
HEADERS = {"Authorization": f"Bearer {BEARER_TOKEN}"}

# Standard Space fields to request in every call
SPACE_FIELDS = ",".join([
    "title", "host_ids", "created_at", "started_at", "ended_at",
    "state", "participant_count", "topic_ids", "lang", "is_ticketed",
    "scheduled_start", "subscriber_count", "speaker_ids", "invited_user_ids",
])

USER_FIELDS = "name,username,public_metrics,description,location,verified"


async def api_get(endpoint: str, params: dict, proxy: str = None) -> dict:
    """Make an authenticated GET request to the Twitter API v2."""
    async with httpx.AsyncClient(
        proxy=proxy,
        timeout=20,
        headers=HEADERS,
    ) as client:
        r = await client.get(f"{BASE_URL}/{endpoint}", params=params)

        if r.status_code == 429:
            reset_time = int(r.headers.get("x-rate-limit-reset", time.time() + 60))
            wait = max(reset_time - time.time(), 1)
            print(f"Rate limited. Waiting {wait:.0f}s...")
            await asyncio.sleep(wait)
            r = await client.get(f"{BASE_URL}/{endpoint}", params=params)

        r.raise_for_status()
        return r.json()

Searching for Spaces

async def search_spaces(query: str, max_results: int = 10,
                          proxy: str = None) -> dict:
    """Search for Spaces by keyword."""
    params = {
        "query": query,
        "max_results": min(max_results, 100),  # API max is 100
        "space.fields": SPACE_FIELDS,
        "expansions": "host_ids,speaker_ids,invited_user_ids",
        "user.fields": USER_FIELDS,
    }
    return await api_get("spaces/search", params, proxy)


async def get_space(space_id: str, proxy: str = None) -> dict:
    """Fetch full metadata for a single Space by ID."""
    params = {
        "space.fields": SPACE_FIELDS,
        "expansions": "host_ids,speaker_ids,invited_user_ids",
        "user.fields": USER_FIELDS,
    }
    return await api_get(f"spaces/{space_id}", params, proxy)


async def get_spaces_by_ids(space_ids: list[str], proxy: str = None) -> dict:
    """Batch fetch up to 100 Spaces by their IDs."""
    params = {
        "ids": ",".join(space_ids[:100]),
        "space.fields": SPACE_FIELDS,
        "expansions": "host_ids,speaker_ids",
        "user.fields": USER_FIELDS,
    }
    return await api_get("spaces", params, proxy)


async def get_user_spaces(user_id: str, proxy: str = None) -> dict:
    """Get Spaces created or co-hosted by a specific user."""
    params = {
        "space.fields": SPACE_FIELDS,
        "expansions": "host_ids",
        "user.fields": USER_FIELDS,
    }
    return await api_get(f"spaces/by/creator_ids", {**params, "user_ids": user_id}, proxy)

Parsing API Responses

The Voyager-style response with data + includes can be tricky to work with. Here's a helper:

def parse_spaces_response(response: dict) -> list[dict]:
    """
    Parse a spaces API response, joining user data from includes.
    Returns a flat list of space dicts with host/speaker names resolved.
    """
    spaces = response.get("data", [])
    if isinstance(spaces, dict):
        spaces = [spaces]  # Single space lookup returns dict, not list

    # Build user lookup from includes
    users = {
        u["id"]: u
        for u in response.get("includes", {}).get("users", [])
    }

    result = []
    for space in spaces:
        # Resolve host
        host_id = (space.get("host_ids") or [None])[0]
        host = users.get(host_id, {})

        # Resolve speakers
        speaker_ids = space.get("speaker_ids", [])
        speakers = [users.get(sid, {"id": sid}) for sid in speaker_ids]

        # Calculate duration if possible
        duration_minutes = None
        if space.get("started_at") and space.get("ended_at"):
            from datetime import datetime
            fmt = "%Y-%m-%dT%H:%M:%S.%fZ"
            try:
                started = datetime.strptime(space["started_at"], fmt)
                ended = datetime.strptime(space["ended_at"], fmt)
                duration_minutes = (ended - started).total_seconds() / 60
            except ValueError:
                pass

        result.append({
            "id": space["id"],
            "title": space.get("title", ""),
            "state": space.get("state"),
            "lang": space.get("lang"),
            "is_ticketed": space.get("is_ticketed", False),
            "participant_count": space.get("participant_count"),
            "subscriber_count": space.get("subscriber_count"),
            "topic_ids": space.get("topic_ids", []),
            "created_at": space.get("created_at"),
            "started_at": space.get("started_at"),
            "ended_at": space.get("ended_at"),
            "scheduled_start": space.get("scheduled_start"),
            "duration_minutes": duration_minutes,
            "host_id": host_id,
            "host_username": host.get("username"),
            "host_name": host.get("name"),
            "host_followers": host.get("public_metrics", {}).get("followers_count"),
            "host_verified": host.get("verified", False),
            "speaker_count": len(speaker_ids),
            "speakers": [
                {
                    "id": s.get("id"),
                    "username": s.get("username"),
                    "followers": s.get("public_metrics", {}).get("followers_count"),
                }
                for s in speakers
            ],
        })

    return result

Getting Participant Lists

The buyers endpoint returns participant data (Pro tier required):

async def get_space_buyers(space_id: str, proxy: str = None) -> Optional[list[dict]]:
    """Get participant list. Requires Pro tier or above."""
    params = {"user.fields": USER_FIELDS}
    try:
        response = await api_get(f"spaces/{space_id}/buyers", params, proxy)
        users = response.get("data", [])
        return [
            {
                "id": u["id"],
                "username": u.get("username"),
                "name": u.get("name"),
                "followers": u.get("public_metrics", {}).get("followers_count"),
                "following": u.get("public_metrics", {}).get("following_count"),
            }
            for u in users
        ]
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 403:
            print("Participant list requires Pro tier ($5,000/mo)")
            return None
        raise

Twitter returns topic IDs, not labels. Build your own mapping:

from collections import Counter
import re

# Known topic ID mappings (build and expand this over time)
TOPIC_ID_MAP = {
    "847895172357959680": "Technology",
    "781974596148793345": "Crypto & Blockchain",
    "839123456789012345": "Politics",
    "913534554358464512": "Sports",
    "131272609": "Music",
    "10058":"Finance",
    "848920587458609152": "Science",
    "745273178788196352": "News",
    "683762048983572480": "Entertainment",
}


def resolve_topics(topic_ids: list[str]) -> list[str]:
    """Map topic IDs to human-readable labels."""
    return [TOPIC_ID_MAP.get(tid, f"topic:{tid}") for tid in (topic_ids or [])]


def build_topic_map_from_data(spaces: list[dict]) -> dict:
    """
    Learn topic ID -> label mappings from a corpus of spaces.
    Useful when you have spaces from known categories.
    """
    # This is a manual process: scrape spaces you know belong to a category
    # and record which topic IDs appear, then build the mapping
    topic_counts = Counter()
    for space in spaces:
        for tid in space.get("topic_ids", []):
            topic_counts[tid] += 1
    return dict(topic_counts.most_common())


def extract_keywords(spaces: list[dict], top_n: int = 50) -> Counter:
    """Pull high-frequency terms from Space titles."""
    stopwords = {
        "the", "a", "an", "in", "on", "for", "and", "of", "to", "with",
        "is", "are", "at", "by", "or", "we", "our", "your", "this", "that",
        "from", "about", "how", "what", "will", "have", "has",
    }
    counts: Counter = Counter()
    for space in spaces:
        title = space.get("title", "") or ""
        words = re.findall(r"\b[a-zA-Z]{4,}\b", title.lower())
        counts.update(w for w in words if w not in stopwords)
    return counts


def aggregate_by_host_reach(spaces: list[dict]) -> list[dict]:
    """Sort Spaces by total host + speaker follower reach."""
    enriched = []
    for space in spaces:
        total_reach = (space.get("host_followers") or 0)
        for speaker in space.get("speakers", []):
            total_reach += (speaker.get("followers") or 0)

        enriched.append({
            **space,
            "total_reach": total_reach,
        })

    return sorted(enriched, key=lambda x: x["total_reach"], reverse=True)

SQLite Storage

For sustained monitoring, store everything in SQLite:

import sqlite3
from datetime import datetime


def init_spaces_db(db_path: str = "twitter_spaces.db") -> sqlite3.Connection:
    """Initialize SQLite database for Twitter Spaces data."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")

    conn.execute("""
        CREATE TABLE IF NOT EXISTS spaces (
            id TEXT PRIMARY KEY,
            title TEXT,
            state TEXT,
            lang TEXT,
            is_ticketed INTEGER DEFAULT 0,
            participant_count INTEGER,
            subscriber_count INTEGER,
            topic_ids TEXT,  -- JSON array of IDs
            topics TEXT,     -- JSON array of resolved labels
            created_at TEXT,
            started_at TEXT,
            ended_at TEXT,
            scheduled_start TEXT,
            duration_minutes REAL,
            host_id TEXT,
            host_username TEXT,
            host_name TEXT,
            host_followers INTEGER,
            host_verified INTEGER DEFAULT 0,
            speaker_count INTEGER,
            speakers TEXT,  -- JSON array
            keywords TEXT,  -- JSON array extracted from title
            first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS search_runs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            query TEXT NOT NULL,
            spaces_found INTEGER,
            ran_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS participants (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            space_id TEXT NOT NULL,
            user_id TEXT NOT NULL,
            username TEXT,
            name TEXT,
            followers INTEGER,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(space_id, user_id),
            FOREIGN KEY (space_id) REFERENCES spaces(id)
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_state ON spaces(state)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_host ON spaces(host_username)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_started ON spaces(started_at)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_participants ON spaces(participant_count)")

    conn.commit()
    return conn


def save_space(conn: sqlite3.Connection, space: dict) -> bool:
    """Save or update a Space record. Returns True if new."""
    existing = conn.execute(
        "SELECT 1 FROM spaces WHERE id = ?", (space["id"],)
    ).fetchone()

    title_words = re.findall(r"\b[a-zA-Z]{4,}\b", (space.get("title") or "").lower())
    keywords = list(set(title_words))[:20]

    if existing:
        conn.execute(
            """UPDATE spaces SET
               state = ?, participant_count = ?, subscriber_count = ?,
               ended_at = ?, duration_minutes = ?, last_updated = CURRENT_TIMESTAMP
               WHERE id = ?""",
            (
                space.get("state"),
                space.get("participant_count"),
                space.get("subscriber_count"),
                space.get("ended_at"),
                space.get("duration_minutes"),
                space["id"],
            )
        )
        conn.commit()
        return False
    else:
        conn.execute(
            """INSERT INTO spaces
               (id, title, state, lang, is_ticketed, participant_count, subscriber_count,
                topic_ids, topics, created_at, started_at, ended_at, scheduled_start,
                duration_minutes, host_id, host_username, host_name, host_followers,
                host_verified, speaker_count, speakers, keywords)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                space["id"], space.get("title"), space.get("state"),
                space.get("lang"), 1 if space.get("is_ticketed") else 0,
                space.get("participant_count"), space.get("subscriber_count"),
                json.dumps(space.get("topic_ids", [])),
                json.dumps(resolve_topics(space.get("topic_ids", []))),
                space.get("created_at"), space.get("started_at"),
                space.get("ended_at"), space.get("scheduled_start"),
                space.get("duration_minutes"),
                space.get("host_id"), space.get("host_username"),
                space.get("host_name"), space.get("host_followers"),
                1 if space.get("host_verified") else 0,
                space.get("speaker_count"),
                json.dumps(space.get("speakers", [])),
                json.dumps(keywords),
            )
        )
        conn.commit()
        return True

Monitoring Scheduled Spaces

Poll the search endpoint on a schedule and filter by state: scheduled:

from pathlib import Path

SEEN_FILE = Path("seen_spaces.json")


def load_seen() -> set:
    if SEEN_FILE.exists():
        return set(json.loads(SEEN_FILE.read_text()))
    return set()


def save_seen(seen: set) -> None:
    SEEN_FILE.write_text(json.dumps(list(seen)))


async def monitor_topic(
    query: str,
    db_path: str = "twitter_spaces.db",
    poll_interval: int = 300,
    proxy: str = None,
) -> None:
    """Continuously monitor for new Spaces on a topic."""
    conn = init_spaces_db(db_path)
    seen = load_seen()
    print(f"Monitoring Spaces for: '{query}' (polling every {poll_interval}s)")

    while True:
        try:
            raw = await search_spaces(query, max_results=50, proxy=proxy)
            spaces = parse_spaces_response(raw)

            new_count = 0
            for space in spaces:
                is_new = save_space(conn, space)
                if is_new and space["id"] not in seen:
                    seen.add(space["id"])
                    new_count += 1

                    state = space.get("state")
                    if state == "scheduled":
                        print(f"[NEW SCHEDULED] {space.get('title')}")
                        print(f"  Host: @{space.get('host_username')} ({space.get('host_followers', 0):,} followers)")
                        print(f"  Starts: {space.get('scheduled_start')}")
                    elif state == "live":
                        print(f"[LIVE] {space.get('title')}")
                        print(f"  Listeners: {space.get('participant_count', 'unknown')}")

            save_seen(seen)

            # Log run
            conn.execute(
                "INSERT INTO search_runs (query, spaces_found) VALUES (?, ?)",
                (query, len(spaces))
            )
            conn.commit()

            print(f"[{datetime.now().strftime('%H:%M:%S')}] Found {len(spaces)} spaces ({new_count} new)")

        except httpx.HTTPStatusError as e:
            print(f"API error {e.response.status_code}: {e.response.text[:200]}")
            if e.response.status_code == 429:
                await asyncio.sleep(60)

        except Exception as e:
            print(f"Unexpected error: {e}")

        await asyncio.sleep(poll_interval)


async def monitor_multiple_topics(
    queries: list[str],
    db_path: str = "twitter_spaces.db",
    proxy: str = None,
) -> None:
    """Monitor multiple topics concurrently with staggered polling."""
    tasks = []
    for i, query in enumerate(queries):
        # Stagger start times to avoid simultaneous API calls
        stagger = i * 30
        async def run_monitor(q=query, s=stagger):
            await asyncio.sleep(s)
            await monitor_topic(q, db_path=db_path, proxy=proxy)
        tasks.append(run_monitor())

    await asyncio.gather(*tasks)

Proxy Configuration

Running multiple keyword searches or monitoring dozens of hosts pushes against per-app rate limits quickly. ThorData's residential proxies are useful in two scenarios:

  1. Distributing API requests when you have multiple token pools across different IPs
  2. Supplementary web scraping for data points the API does not expose (like the Spaces tab on a user profile)
PROXY = "http://USER:[email protected]:9000"

# Example: concurrent multi-keyword monitoring through proxied client
async def search_with_semaphore(
    queries: list[str],
    max_concurrent: int = 3,
    proxy: str = None,
) -> list[dict]:
    """Search multiple queries with concurrency control."""
    semaphore = asyncio.Semaphore(max_concurrent)
    all_spaces = []

    async def bounded_search(query: str):
        async with semaphore:
            raw = await search_spaces(query, proxy=proxy)
            spaces = parse_spaces_response(raw)
            print(f"'{query}': {len(spaces)} spaces")
            all_spaces.extend(spaces)
            await asyncio.sleep(2)  # Small delay between requests

    await asyncio.gather(*[bounded_search(q) for q in queries])
    return all_spaces


# Usage
MONITORING_QUERIES = [
    "AI regulation",
    "crypto trading",
    "startup fundraising",
    "tech layoffs",
    "product launch",
]

async def main():
    proxy = "http://USER:[email protected]:9000"
    spaces = await search_with_semaphore(MONITORING_QUERIES, proxy=proxy)
    print(f"Total: {len(spaces)} spaces across {len(MONITORING_QUERIES)} topics")

    # Save to database
    conn = init_spaces_db()
    for space in spaces:
        save_space(conn, space)
    conn.close()

asyncio.run(main())

Analytics Queries

Once data accumulates in your SQLite database:

def get_top_spaces_by_reach(db_path: str, min_participants: int = 100,
                              state: str = "ended", limit: int = 20) -> list:
    """Find highest-reach ended Spaces."""
    conn = sqlite3.connect(db_path)
    rows = conn.execute(
        """SELECT title, host_username, host_followers, participant_count,
                  duration_minutes, started_at, topics
           FROM spaces
           WHERE state = ? AND participant_count >= ?
           ORDER BY participant_count DESC
           LIMIT ?""",
        (state, min_participants, limit)
    ).fetchall()
    conn.close()
    return rows


def trending_keywords_last_24h(db_path: str) -> list:
    """Get most frequent keywords from spaces in the last 24 hours."""
    conn = sqlite3.connect(db_path)
    rows = conn.execute(
        """SELECT keywords FROM spaces
           WHERE first_seen >= datetime('now', '-1 day')""",
    ).fetchall()
    conn.close()

    all_keywords = Counter()
    for (kw_json,) in rows:
        try:
            keywords = json.loads(kw_json or "[]")
            all_keywords.update(keywords)
        except json.JSONDecodeError:
            pass

    return all_keywords.most_common(30)


def host_leaderboard(db_path: str, min_spaces: int = 3) -> list:
    """Rank hosts by average participant count."""
    conn = sqlite3.connect(db_path)
    rows = conn.execute(
        """SELECT host_username, host_followers,
                  COUNT(*) as space_count,
                  AVG(participant_count) as avg_participants,
                  MAX(participant_count) as max_participants
           FROM spaces
           WHERE host_username IS NOT NULL
             AND participant_count IS NOT NULL
           GROUP BY host_username
           HAVING COUNT(*) >= ?
           ORDER BY avg_participants DESC
           LIMIT 25""",
        (min_spaces,)
    ).fetchall()
    conn.close()
    return rows

Practical Tips and Gotchas