Scraping Twitter Spaces: Metadata, Participants and Topics (2026)
Scraping Twitter Spaces: Metadata, Participants and Topics (2026)
Twitter Spaces has become a serious signal layer for media monitoring, influencer research, and real-time topic tracking. A scheduled Space with 5,000 listeners tells you something about the moment that no tweet thread can — who organized it, who spoke, what the crowd came for. For analysts tracking brand sentiment, journalists mapping influence networks, or researchers watching how narratives form in real time, Spaces metadata is underused and surprisingly accessible through the API.
This guide covers the Twitter API v2 Spaces endpoints in detail, monitoring strategies, SQLite storage, proxy integration, and practical gotchas for 2026.
What Data Is Available
The Twitter API v2 surfaces the following for Spaces:
- Space title and description — set by the host at creation
- Host information — user ID, handle, follower count (via user lookup)
- Speaker list — invited and actual speakers with their user IDs
- Participant count — live listener count during the broadcast
- Scheduled start time — for upcoming Spaces
- Actual start and end times — for completed Spaces
- Duration — calculated from start/end fields
- State —
live,scheduled, orended - Recording availability — whether a replay is posted
- Topics — official topic tags assigned by the host (returned as IDs, mappable to labels)
- Language — auto-detected primary language
- Ticketed status — whether the Space requires a ticket to attend
What you cannot get: the actual audio, a real-time transcript, or historical participant lists after the Space ends. The API is event-oriented — you need to poll or subscribe during the window.
Authentication and Rate Limits
Twitter's API authentication is mandatory for all Spaces endpoints. There is no public JSON trick. You need at minimum a Bearer token from a registered app at developer.twitter.com.
Rate limits in 2026:
| Tier | Monthly read | Spaces endpoints | Rate window |
|---|---|---|---|
| Free | 500K tweets | Not included | — |
| Basic ($100/mo) | Included | 10 req/15min per app | 15 minutes |
| Pro ($5,000/mo) | Included | 300 req/15min | 15 minutes |
| Enterprise | Negotiated | Firehose access | — |
Per-IP throttling is a separate layer from these documented limits. If you're running multiple apps or rotating tokens from the same IP range, you'll hit soft blocks before hitting documented rate limits. Residential proxies from ThorData help distribute this load across multiple IPs.
Core API Client Setup
import httpx
import asyncio
import json
import time
from typing import Optional
BEARER_TOKEN = "YOUR_BEARER_TOKEN"
BASE_URL = "https://api.twitter.com/2"
HEADERS = {"Authorization": f"Bearer {BEARER_TOKEN}"}
# Standard Space fields to request in every call
SPACE_FIELDS = ",".join([
"title", "host_ids", "created_at", "started_at", "ended_at",
"state", "participant_count", "topic_ids", "lang", "is_ticketed",
"scheduled_start", "subscriber_count", "speaker_ids", "invited_user_ids",
])
USER_FIELDS = "name,username,public_metrics,description,location,verified"
async def api_get(endpoint: str, params: dict, proxy: str = None) -> dict:
"""Make an authenticated GET request to the Twitter API v2."""
async with httpx.AsyncClient(
proxy=proxy,
timeout=20,
headers=HEADERS,
) as client:
r = await client.get(f"{BASE_URL}/{endpoint}", params=params)
if r.status_code == 429:
reset_time = int(r.headers.get("x-rate-limit-reset", time.time() + 60))
wait = max(reset_time - time.time(), 1)
print(f"Rate limited. Waiting {wait:.0f}s...")
await asyncio.sleep(wait)
r = await client.get(f"{BASE_URL}/{endpoint}", params=params)
r.raise_for_status()
return r.json()
Searching for Spaces
async def search_spaces(query: str, max_results: int = 10,
proxy: str = None) -> dict:
"""Search for Spaces by keyword."""
params = {
"query": query,
"max_results": min(max_results, 100), # API max is 100
"space.fields": SPACE_FIELDS,
"expansions": "host_ids,speaker_ids,invited_user_ids",
"user.fields": USER_FIELDS,
}
return await api_get("spaces/search", params, proxy)
async def get_space(space_id: str, proxy: str = None) -> dict:
"""Fetch full metadata for a single Space by ID."""
params = {
"space.fields": SPACE_FIELDS,
"expansions": "host_ids,speaker_ids,invited_user_ids",
"user.fields": USER_FIELDS,
}
return await api_get(f"spaces/{space_id}", params, proxy)
async def get_spaces_by_ids(space_ids: list[str], proxy: str = None) -> dict:
"""Batch fetch up to 100 Spaces by their IDs."""
params = {
"ids": ",".join(space_ids[:100]),
"space.fields": SPACE_FIELDS,
"expansions": "host_ids,speaker_ids",
"user.fields": USER_FIELDS,
}
return await api_get("spaces", params, proxy)
async def get_user_spaces(user_id: str, proxy: str = None) -> dict:
"""Get Spaces created or co-hosted by a specific user."""
params = {
"space.fields": SPACE_FIELDS,
"expansions": "host_ids",
"user.fields": USER_FIELDS,
}
return await api_get(f"spaces/by/creator_ids", {**params, "user_ids": user_id}, proxy)
Parsing API Responses
The Voyager-style response with data + includes can be tricky to work with. Here's a helper:
def parse_spaces_response(response: dict) -> list[dict]:
"""
Parse a spaces API response, joining user data from includes.
Returns a flat list of space dicts with host/speaker names resolved.
"""
spaces = response.get("data", [])
if isinstance(spaces, dict):
spaces = [spaces] # Single space lookup returns dict, not list
# Build user lookup from includes
users = {
u["id"]: u
for u in response.get("includes", {}).get("users", [])
}
result = []
for space in spaces:
# Resolve host
host_id = (space.get("host_ids") or [None])[0]
host = users.get(host_id, {})
# Resolve speakers
speaker_ids = space.get("speaker_ids", [])
speakers = [users.get(sid, {"id": sid}) for sid in speaker_ids]
# Calculate duration if possible
duration_minutes = None
if space.get("started_at") and space.get("ended_at"):
from datetime import datetime
fmt = "%Y-%m-%dT%H:%M:%S.%fZ"
try:
started = datetime.strptime(space["started_at"], fmt)
ended = datetime.strptime(space["ended_at"], fmt)
duration_minutes = (ended - started).total_seconds() / 60
except ValueError:
pass
result.append({
"id": space["id"],
"title": space.get("title", ""),
"state": space.get("state"),
"lang": space.get("lang"),
"is_ticketed": space.get("is_ticketed", False),
"participant_count": space.get("participant_count"),
"subscriber_count": space.get("subscriber_count"),
"topic_ids": space.get("topic_ids", []),
"created_at": space.get("created_at"),
"started_at": space.get("started_at"),
"ended_at": space.get("ended_at"),
"scheduled_start": space.get("scheduled_start"),
"duration_minutes": duration_minutes,
"host_id": host_id,
"host_username": host.get("username"),
"host_name": host.get("name"),
"host_followers": host.get("public_metrics", {}).get("followers_count"),
"host_verified": host.get("verified", False),
"speaker_count": len(speaker_ids),
"speakers": [
{
"id": s.get("id"),
"username": s.get("username"),
"followers": s.get("public_metrics", {}).get("followers_count"),
}
for s in speakers
],
})
return result
Getting Participant Lists
The buyers endpoint returns participant data (Pro tier required):
async def get_space_buyers(space_id: str, proxy: str = None) -> Optional[list[dict]]:
"""Get participant list. Requires Pro tier or above."""
params = {"user.fields": USER_FIELDS}
try:
response = await api_get(f"spaces/{space_id}/buyers", params, proxy)
users = response.get("data", [])
return [
{
"id": u["id"],
"username": u.get("username"),
"name": u.get("name"),
"followers": u.get("public_metrics", {}).get("followers_count"),
"following": u.get("public_metrics", {}).get("following_count"),
}
for u in users
]
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
print("Participant list requires Pro tier ($5,000/mo)")
return None
raise
Extracting Topics and Trends
Twitter returns topic IDs, not labels. Build your own mapping:
from collections import Counter
import re
# Known topic ID mappings (build and expand this over time)
TOPIC_ID_MAP = {
"847895172357959680": "Technology",
"781974596148793345": "Crypto & Blockchain",
"839123456789012345": "Politics",
"913534554358464512": "Sports",
"131272609": "Music",
"10058":"Finance",
"848920587458609152": "Science",
"745273178788196352": "News",
"683762048983572480": "Entertainment",
}
def resolve_topics(topic_ids: list[str]) -> list[str]:
"""Map topic IDs to human-readable labels."""
return [TOPIC_ID_MAP.get(tid, f"topic:{tid}") for tid in (topic_ids or [])]
def build_topic_map_from_data(spaces: list[dict]) -> dict:
"""
Learn topic ID -> label mappings from a corpus of spaces.
Useful when you have spaces from known categories.
"""
# This is a manual process: scrape spaces you know belong to a category
# and record which topic IDs appear, then build the mapping
topic_counts = Counter()
for space in spaces:
for tid in space.get("topic_ids", []):
topic_counts[tid] += 1
return dict(topic_counts.most_common())
def extract_keywords(spaces: list[dict], top_n: int = 50) -> Counter:
"""Pull high-frequency terms from Space titles."""
stopwords = {
"the", "a", "an", "in", "on", "for", "and", "of", "to", "with",
"is", "are", "at", "by", "or", "we", "our", "your", "this", "that",
"from", "about", "how", "what", "will", "have", "has",
}
counts: Counter = Counter()
for space in spaces:
title = space.get("title", "") or ""
words = re.findall(r"\b[a-zA-Z]{4,}\b", title.lower())
counts.update(w for w in words if w not in stopwords)
return counts
def aggregate_by_host_reach(spaces: list[dict]) -> list[dict]:
"""Sort Spaces by total host + speaker follower reach."""
enriched = []
for space in spaces:
total_reach = (space.get("host_followers") or 0)
for speaker in space.get("speakers", []):
total_reach += (speaker.get("followers") or 0)
enriched.append({
**space,
"total_reach": total_reach,
})
return sorted(enriched, key=lambda x: x["total_reach"], reverse=True)
SQLite Storage
For sustained monitoring, store everything in SQLite:
import sqlite3
from datetime import datetime
def init_spaces_db(db_path: str = "twitter_spaces.db") -> sqlite3.Connection:
"""Initialize SQLite database for Twitter Spaces data."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS spaces (
id TEXT PRIMARY KEY,
title TEXT,
state TEXT,
lang TEXT,
is_ticketed INTEGER DEFAULT 0,
participant_count INTEGER,
subscriber_count INTEGER,
topic_ids TEXT, -- JSON array of IDs
topics TEXT, -- JSON array of resolved labels
created_at TEXT,
started_at TEXT,
ended_at TEXT,
scheduled_start TEXT,
duration_minutes REAL,
host_id TEXT,
host_username TEXT,
host_name TEXT,
host_followers INTEGER,
host_verified INTEGER DEFAULT 0,
speaker_count INTEGER,
speakers TEXT, -- JSON array
keywords TEXT, -- JSON array extracted from title
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS search_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
query TEXT NOT NULL,
spaces_found INTEGER,
ran_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS participants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
space_id TEXT NOT NULL,
user_id TEXT NOT NULL,
username TEXT,
name TEXT,
followers INTEGER,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(space_id, user_id),
FOREIGN KEY (space_id) REFERENCES spaces(id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_state ON spaces(state)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_host ON spaces(host_username)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_started ON spaces(started_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_spaces_participants ON spaces(participant_count)")
conn.commit()
return conn
def save_space(conn: sqlite3.Connection, space: dict) -> bool:
"""Save or update a Space record. Returns True if new."""
existing = conn.execute(
"SELECT 1 FROM spaces WHERE id = ?", (space["id"],)
).fetchone()
title_words = re.findall(r"\b[a-zA-Z]{4,}\b", (space.get("title") or "").lower())
keywords = list(set(title_words))[:20]
if existing:
conn.execute(
"""UPDATE spaces SET
state = ?, participant_count = ?, subscriber_count = ?,
ended_at = ?, duration_minutes = ?, last_updated = CURRENT_TIMESTAMP
WHERE id = ?""",
(
space.get("state"),
space.get("participant_count"),
space.get("subscriber_count"),
space.get("ended_at"),
space.get("duration_minutes"),
space["id"],
)
)
conn.commit()
return False
else:
conn.execute(
"""INSERT INTO spaces
(id, title, state, lang, is_ticketed, participant_count, subscriber_count,
topic_ids, topics, created_at, started_at, ended_at, scheduled_start,
duration_minutes, host_id, host_username, host_name, host_followers,
host_verified, speaker_count, speakers, keywords)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
space["id"], space.get("title"), space.get("state"),
space.get("lang"), 1 if space.get("is_ticketed") else 0,
space.get("participant_count"), space.get("subscriber_count"),
json.dumps(space.get("topic_ids", [])),
json.dumps(resolve_topics(space.get("topic_ids", []))),
space.get("created_at"), space.get("started_at"),
space.get("ended_at"), space.get("scheduled_start"),
space.get("duration_minutes"),
space.get("host_id"), space.get("host_username"),
space.get("host_name"), space.get("host_followers"),
1 if space.get("host_verified") else 0,
space.get("speaker_count"),
json.dumps(space.get("speakers", [])),
json.dumps(keywords),
)
)
conn.commit()
return True
Monitoring Scheduled Spaces
Poll the search endpoint on a schedule and filter by state: scheduled:
from pathlib import Path
SEEN_FILE = Path("seen_spaces.json")
def load_seen() -> set:
if SEEN_FILE.exists():
return set(json.loads(SEEN_FILE.read_text()))
return set()
def save_seen(seen: set) -> None:
SEEN_FILE.write_text(json.dumps(list(seen)))
async def monitor_topic(
query: str,
db_path: str = "twitter_spaces.db",
poll_interval: int = 300,
proxy: str = None,
) -> None:
"""Continuously monitor for new Spaces on a topic."""
conn = init_spaces_db(db_path)
seen = load_seen()
print(f"Monitoring Spaces for: '{query}' (polling every {poll_interval}s)")
while True:
try:
raw = await search_spaces(query, max_results=50, proxy=proxy)
spaces = parse_spaces_response(raw)
new_count = 0
for space in spaces:
is_new = save_space(conn, space)
if is_new and space["id"] not in seen:
seen.add(space["id"])
new_count += 1
state = space.get("state")
if state == "scheduled":
print(f"[NEW SCHEDULED] {space.get('title')}")
print(f" Host: @{space.get('host_username')} ({space.get('host_followers', 0):,} followers)")
print(f" Starts: {space.get('scheduled_start')}")
elif state == "live":
print(f"[LIVE] {space.get('title')}")
print(f" Listeners: {space.get('participant_count', 'unknown')}")
save_seen(seen)
# Log run
conn.execute(
"INSERT INTO search_runs (query, spaces_found) VALUES (?, ?)",
(query, len(spaces))
)
conn.commit()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Found {len(spaces)} spaces ({new_count} new)")
except httpx.HTTPStatusError as e:
print(f"API error {e.response.status_code}: {e.response.text[:200]}")
if e.response.status_code == 429:
await asyncio.sleep(60)
except Exception as e:
print(f"Unexpected error: {e}")
await asyncio.sleep(poll_interval)
async def monitor_multiple_topics(
queries: list[str],
db_path: str = "twitter_spaces.db",
proxy: str = None,
) -> None:
"""Monitor multiple topics concurrently with staggered polling."""
tasks = []
for i, query in enumerate(queries):
# Stagger start times to avoid simultaneous API calls
stagger = i * 30
async def run_monitor(q=query, s=stagger):
await asyncio.sleep(s)
await monitor_topic(q, db_path=db_path, proxy=proxy)
tasks.append(run_monitor())
await asyncio.gather(*tasks)
Proxy Configuration
Running multiple keyword searches or monitoring dozens of hosts pushes against per-app rate limits quickly. ThorData's residential proxies are useful in two scenarios:
- Distributing API requests when you have multiple token pools across different IPs
- Supplementary web scraping for data points the API does not expose (like the Spaces tab on a user profile)
PROXY = "http://USER:[email protected]:9000"
# Example: concurrent multi-keyword monitoring through proxied client
async def search_with_semaphore(
queries: list[str],
max_concurrent: int = 3,
proxy: str = None,
) -> list[dict]:
"""Search multiple queries with concurrency control."""
semaphore = asyncio.Semaphore(max_concurrent)
all_spaces = []
async def bounded_search(query: str):
async with semaphore:
raw = await search_spaces(query, proxy=proxy)
spaces = parse_spaces_response(raw)
print(f"'{query}': {len(spaces)} spaces")
all_spaces.extend(spaces)
await asyncio.sleep(2) # Small delay between requests
await asyncio.gather(*[bounded_search(q) for q in queries])
return all_spaces
# Usage
MONITORING_QUERIES = [
"AI regulation",
"crypto trading",
"startup fundraising",
"tech layoffs",
"product launch",
]
async def main():
proxy = "http://USER:[email protected]:9000"
spaces = await search_with_semaphore(MONITORING_QUERIES, proxy=proxy)
print(f"Total: {len(spaces)} spaces across {len(MONITORING_QUERIES)} topics")
# Save to database
conn = init_spaces_db()
for space in spaces:
save_space(conn, space)
conn.close()
asyncio.run(main())
Analytics Queries
Once data accumulates in your SQLite database:
def get_top_spaces_by_reach(db_path: str, min_participants: int = 100,
state: str = "ended", limit: int = 20) -> list:
"""Find highest-reach ended Spaces."""
conn = sqlite3.connect(db_path)
rows = conn.execute(
"""SELECT title, host_username, host_followers, participant_count,
duration_minutes, started_at, topics
FROM spaces
WHERE state = ? AND participant_count >= ?
ORDER BY participant_count DESC
LIMIT ?""",
(state, min_participants, limit)
).fetchall()
conn.close()
return rows
def trending_keywords_last_24h(db_path: str) -> list:
"""Get most frequent keywords from spaces in the last 24 hours."""
conn = sqlite3.connect(db_path)
rows = conn.execute(
"""SELECT keywords FROM spaces
WHERE first_seen >= datetime('now', '-1 day')""",
).fetchall()
conn.close()
all_keywords = Counter()
for (kw_json,) in rows:
try:
keywords = json.loads(kw_json or "[]")
all_keywords.update(keywords)
except json.JSONDecodeError:
pass
return all_keywords.most_common(30)
def host_leaderboard(db_path: str, min_spaces: int = 3) -> list:
"""Rank hosts by average participant count."""
conn = sqlite3.connect(db_path)
rows = conn.execute(
"""SELECT host_username, host_followers,
COUNT(*) as space_count,
AVG(participant_count) as avg_participants,
MAX(participant_count) as max_participants
FROM spaces
WHERE host_username IS NOT NULL
AND participant_count IS NOT NULL
GROUP BY host_username
HAVING COUNT(*) >= ?
ORDER BY avg_participants DESC
LIMIT 25""",
(min_spaces,)
).fetchall()
conn.close()
return rows
Practical Tips and Gotchas
- Ended Spaces disappear fast. Once a Space ends and the recording is not saved, the metadata often becomes unavailable within a few hours. Build collection into your monitoring loop, not as a backfill step.
- Participant counts are live only. The
participant_countfield is populated during the broadcast. After the Space ends it is often null or zero. Capture it while live if you need it. - Speaker IDs vs. host IDs. The
speaker_idsfield includes everyone who spoke, including the host.host_idsis always a single-element array. Do not assume the host is the most prominent speaker. - Topic IDs are not in the official docs. Query a known Space in a known category and note the IDs returned — build your own mapping table. The IDs are stable across runs.
- Residential proxies from ThorData are worth it if you're running a multi-keyword, multi-host monitoring setup. Per-IP soft blocks kick in well before documented rate limits on some endpoints.
- Use
asyncio.gathercarefully. Firing 10 simultaneous requests to the Spaces API will trigger rate limiting. Throttle with a semaphore or stagger with small delays between requests. - Bearer token vs. OAuth 2.0. For read-only Spaces data, Bearer token (app-only auth) is sufficient and simpler. OAuth is only needed if you want to act on behalf of a user account.
- Pagination is limited. The Spaces search endpoint does not support deep pagination — you get at most a few hundred results per query. For comprehensive collection, run the same query multiple times throughout the day.
- State transitions matter. Track state changes (scheduled -> live -> ended) to capture the full lifecycle. A Space you first see as "scheduled" should be re-queried once it goes live to capture participant counts.