Scraping Twitch Clips, VODs, and Viewer Data with Python (2026)
Scraping Twitch Clips, VODs, and Viewer Data with Python (2026)
Twitch generates more real-time behavioral data than almost any other platform on the internet. Millions of viewers, thousands of concurrent streams, chat messages flying at hundreds per second, clips being created and shared every few minutes — and underneath it all sits a dataset that most analysts never fully tap.
The use cases that drive serious Twitch data work in 2026 are varied and all legitimately valuable:
Esports analytics. Competitive gaming organizations track which players generate the most clip virality, which tournament moments drive peak concurrent viewers, and how viewership trends correlate with in-game performance. A team's VOD archive is a searchable record of everything that happened on stream.
Content creator performance tracking. Multi-channel networks and management companies monitor dozens of streamers simultaneously — growth rates, average concurrent viewers, clip generation rate, VOD completion rates. The patterns predict sponsorship value better than follower count alone.
Brand sponsorship valuation. Brands paying for stream integrations need actual viewership data, not self-reported numbers. Scraping viewer counts at regular intervals across a campaign gives them ground-truth CPM calculations.
Gaming trend analysis. Game publishers watch category viewer counts obsessively. A new game's Twitch debut tells the industry more about player interest than any survey. Tracking which games gain viewers, which lose them, and when — at hourly granularity — is predictive intelligence.
Community sentiment via chat. Chat is Twitch's most distinctive asset. Real-time crowd reaction to in-game moments, product announcements, and controversy plays out in a torrent of messages and emotes. NLP on chat logs gives you something closer to real-time sentiment than any polling method.
Clip virality research. Which clips spread outside Twitch? Which stay internal? Tracking a clip's view count trajectory over its first 24 hours reveals the mechanics of virality in the gaming content ecosystem.
VOD content indexing. Long-form VOD archives are an underutilized content corpus. Researchers studying gaming culture, language use, and community dynamics have years of recorded material available if they can programmatically index and retrieve it.
This guide covers both Twitch's official Helix API and the undocumented GraphQL endpoint the website uses internally. Every script is complete, uses httpx, proper type hints, and dataclasses. The anti-detection section addresses what's actually running in production environments in 2026.
What Data Is Available
The combination of Helix API and GQL gives you access to:
| Data Type | Source | Key Fields |
|---|---|---|
| Clips | Helix + GQL | ID, title, creator, game, views, duration, thumbnail, VOD offset, created_at |
| VODs | Helix + GQL | ID, title, duration, views, stream_date, type, muted_segments, chapters |
| Live streams | Helix | viewer_count, game, title, language, tags, started_at |
| Channel info | Helix | follower_count, broadcaster_type, description, created_at |
| Chat messages | IRC/TMI | username, message text, emotes, badges, timestamp |
| Category stats | Helix | viewer_count per game, top games ranked |
| Teams | Helix | team members, member stats |
| Schedule | Helix | upcoming broadcasts, segment titles |
| Clips recommendation score | GQL only | internal virality scoring |
| VOD chapters | GQL only | chapter markers, game changes within a VOD |
| BTTV/FFZ/7TV emotes | Third-party API | emote names, usage counts |
Anti-Bot Architecture
Before writing a line of scraping code, understand what you're working with.
OAuth enforcement. Every Helix request requires a valid Bearer token. No anonymous access. You need a registered application at dev.twitch.tv with a client_id and client_secret. App-level tokens (client credentials grant) work for read-only data collection without user authorization.
Rate limiting. Helix enforces 800 requests per minute per token. The response headers Ratelimit-Remaining and Ratelimit-Reset tell you exactly where you stand. Exceeding limits returns 429 and can trigger temporary token suspension. The reset is a Unix timestamp, not a duration.
GQL Client-Integrity tokens. Since late 2024, the GraphQL endpoint at gql.twitch.tv requires a Client-Integrity header on most queries. This is a signed JWT generated by Twitch's front-end JavaScript. Without it, many GQL queries return 401 or 403. The signing process rotates, which is why working implementations use cached integrity tokens obtained via headless browser, or fall back to the subset of GQL queries that still accept anonymous access.
Fastly CDN fingerprinting. Twitch routes through Fastly, which does request fingerprinting beyond IP reputation — TLS fingerprint (JA3), HTTP/2 settings frames, header ordering. httpx with HTTP/2 enabled produces a more browser-like fingerprint than requests.
IP reputation scoring. Datacenter IPs (AWS, GCP, DigitalOcean) are known to Fastly and receive harsher rate limiting at the edge before requests even reach Twitch's application layer. Residential proxies sidestep this. ThorData offers a residential proxy network with good Twitch compatibility — rotating residential IPs avoid the datacenter reputation penalty.
User-Agent validation. GQL with a bot-like or missing User-Agent gets deprioritized. Use a real browser UA string and keep it consistent.
Part 1 — OAuth Token Management
The foundation of all Helix API work. This module handles token acquisition, validation, auto-refresh, and pool management for multi-token setups.
"""
twitch_auth.py — OAuth token management for Twitch Helix API
Handles single tokens and multi-token pools with auto-refresh.
"""
import time
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
TOKEN_URL = "https://id.twitch.tv/oauth2/token"
VALIDATE_URL = "https://id.twitch.tv/oauth2/validate"
@dataclass
class TwitchToken:
access_token: str
client_id: str
expires_at: float # Unix timestamp
requests_remaining: int = 800
rate_reset_at: float = 0.0
@property
def is_expired(self) -> bool:
return time.time() >= self.expires_at - 60 # 60-second buffer
@property
def is_rate_limited(self) -> bool:
if self.requests_remaining <= 0:
return time.time() < self.rate_reset_at
return False
def update_rate_limits(self, headers: httpx.Headers) -> None:
remaining = headers.get("Ratelimit-Remaining")
reset = headers.get("Ratelimit-Reset")
if remaining is not None:
self.requests_remaining = int(remaining)
if reset is not None:
self.rate_reset_at = float(reset)
def auth_headers(self) -> dict[str, str]:
return {
"Client-ID": self.client_id,
"Authorization": f"Bearer {self.access_token}",
}
@dataclass
class AppCredential:
client_id: str
client_secret: str
class TwitchTokenManager:
"""
Manages one or more app tokens with auto-refresh and rate limit tracking.
Pass multiple credentials for pool-based rotation.
"""
def __init__(
self,
credentials: list[AppCredential],
cache_path: Optional[Path] = None,
) -> None:
self.credentials = credentials
self.cache_path = cache_path or Path(".twitch_tokens.json")
self.tokens: list[TwitchToken] = []
self._current_idx = 0
self._load_cache()
self._ensure_tokens()
def _load_cache(self) -> None:
if not self.cache_path.exists():
return
try:
data = json.loads(self.cache_path.read_text())
for entry in data:
token = TwitchToken(**entry)
if not token.is_expired:
self.tokens.append(token)
logger.info(f"Loaded {len(self.tokens)} cached token(s)")
except Exception as e:
logger.warning(f"Cache load failed: {e}")
def _save_cache(self) -> None:
data = [
{
"access_token": t.access_token,
"client_id": t.client_id,
"expires_at": t.expires_at,
}
for t in self.tokens
]
self.cache_path.write_text(json.dumps(data, indent=2))
def _fetch_token(self, cred: AppCredential) -> TwitchToken:
resp = httpx.post(
TOKEN_URL,
data={
"client_id": cred.client_id,
"client_secret": cred.client_secret,
"grant_type": "client_credentials",
},
timeout=15,
)
resp.raise_for_status()
body = resp.json()
token = TwitchToken(
access_token=body["access_token"],
client_id=cred.client_id,
expires_at=time.time() + body["expires_in"],
)
logger.info(f"Fetched new token for client {cred.client_id[:8]}...")
return token
def _ensure_tokens(self) -> None:
existing_client_ids = {t.client_id for t in self.tokens}
for cred in self.credentials:
if cred.client_id not in existing_client_ids:
token = self._fetch_token(cred)
self.tokens.append(token)
self._save_cache()
def validate_token(self, token: TwitchToken) -> bool:
"""Call Twitch's validate endpoint — good practice before long jobs."""
try:
resp = httpx.get(
VALIDATE_URL,
headers={"Authorization": f"OAuth {token.access_token}"},
timeout=10,
)
return resp.status_code == 200
except Exception:
return False
def get_token(self) -> TwitchToken:
"""Return the next usable token, refreshing expired ones."""
for _ in range(len(self.tokens)):
token = self.tokens[self._current_idx % len(self.tokens)]
self._current_idx += 1
if token.is_expired:
cred = next(
c for c in self.credentials if c.client_id == token.client_id
)
token = self._fetch_token(cred)
self.tokens[self._current_idx % len(self.tokens) - 1] = token
self._save_cache()
if not token.is_rate_limited:
return token
# All tokens rate-limited — wait for the earliest reset
earliest_reset = min(t.rate_reset_at for t in self.tokens)
wait = max(0, earliest_reset - time.time()) + 1
logger.warning(f"All tokens rate-limited. Waiting {wait:.1f}s")
time.sleep(wait)
return self.get_token()
# --- Usage example ---
def make_manager(client_id: str, client_secret: str) -> TwitchTokenManager:
return TwitchTokenManager(
credentials=[AppCredential(client_id=client_id, client_secret=client_secret)]
)
Part 2 — Channel and User Info Scraper
Broadcaster details, follower count, schedule, teams, and panel data.
"""
twitch_channels.py — Channel and user info collection
"""
import httpx
import logging
from dataclasses import dataclass, field
from typing import Optional
from twitch_auth import TwitchTokenManager
logger = logging.getLogger(__name__)
HELIX = "https://api.twitch.tv/helix"
@dataclass
class ChannelInfo:
broadcaster_id: str
login: str
display_name: str
broadcaster_type: str # "partner", "affiliate", or ""
description: str
profile_image_url: str
created_at: str
follower_count: int = 0
stream_title: str = ""
game_name: str = ""
game_id: str = ""
tags: list[str] = field(default_factory=list)
team_names: list[str] = field(default_factory=list)
@dataclass
class ScheduleSegment:
segment_id: str
start_time: str
end_time: str
title: str
category_name: str
is_recurring: bool
canceled_until: Optional[str]
class ChannelScraper:
def __init__(self, manager: TwitchTokenManager) -> None:
self.manager = manager
self.client = httpx.Client(timeout=20, http2=True)
def _get(self, path: str, params: dict) -> dict:
token = self.manager.get_token()
resp = self.client.get(
f"{HELIX}{path}",
headers=token.auth_headers(),
params=params,
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
return resp.json()
def get_users(self, logins: list[str]) -> list[ChannelInfo]:
"""Fetch up to 100 users by login name in one call."""
results = []
for batch_start in range(0, len(logins), 100):
batch = logins[batch_start : batch_start + 100]
data = self._get("/users", {"login": batch})
for user in data.get("data", []):
results.append(
ChannelInfo(
broadcaster_id=user["id"],
login=user["login"],
display_name=user["display_name"],
broadcaster_type=user["broadcaster_type"],
description=user["description"],
profile_image_url=user["profile_image_url"],
created_at=user["created_at"],
)
)
return results
def get_follower_count(self, broadcaster_id: str) -> int:
"""Get total follower count for a channel."""
data = self._get(
"/channels/followers",
{"broadcaster_id": broadcaster_id, "first": 1},
)
return data.get("total", 0)
def get_channel_info(self, broadcaster_id: str) -> dict:
"""Get current stream title, game, tags."""
data = self._get("/channels", {"broadcaster_id": broadcaster_id})
items = data.get("data", [])
return items[0] if items else {}
def get_teams(self, broadcaster_id: str) -> list[str]:
"""Get team names the broadcaster belongs to."""
data = self._get("/teams/channel", {"broadcaster_id": broadcaster_id})
return [t["team_name"] for t in data.get("data", [])]
def get_schedule(
self, broadcaster_id: str, max_segments: int = 20
) -> list[ScheduleSegment]:
"""Get upcoming scheduled broadcasts."""
try:
data = self._get(
"/schedule",
{"broadcaster_id": broadcaster_id, "first": min(max_segments, 25)},
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return [] # Channel has no schedule
raise
segments = []
for seg in data.get("data", {}).get("segments", []):
segments.append(
ScheduleSegment(
segment_id=seg["id"],
start_time=seg["start_time"],
end_time=seg["end_time"] or "",
title=seg.get("title") or "",
category_name=(seg.get("category") or {}).get("name", ""),
is_recurring=seg.get("is_recurring", False),
canceled_until=seg.get("canceled_until"),
)
)
return segments
def enrich_channel(self, info: ChannelInfo) -> ChannelInfo:
"""Fill in follower count, current stream info, and teams."""
info.follower_count = self.get_follower_count(info.broadcaster_id)
channel_data = self.get_channel_info(info.broadcaster_id)
info.stream_title = channel_data.get("title", "")
info.game_name = channel_data.get("game_name", "")
info.game_id = channel_data.get("game_id", "")
info.tags = channel_data.get("tags", [])
info.team_names = self.get_teams(info.broadcaster_id)
return info
Part 3 — Clip Scraper
Full pagination, cursor handling, time-range filtering, and sorting by view count.
"""
twitch_clips.py — Clip metadata scraper with full pagination
"""
import time
import logging
import httpx
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Iterator
from twitch_auth import TwitchTokenManager
logger = logging.getLogger(__name__)
HELIX = "https://api.twitch.tv/helix"
@dataclass
class TwitchClip:
clip_id: str
url: str
embed_url: str
broadcaster_id: str
broadcaster_name: str
creator_id: str
creator_name: str
video_id: str # Parent VOD ID (empty if VOD deleted)
game_id: str
game_name: str
title: str
view_count: int
created_at: str
thumbnail_url: str
duration: float # Seconds
vod_offset: Optional[int] # Offset into parent VOD in seconds
language: str
is_featured: bool = False
def parse_datetime_utc(dt_str: str) -> datetime:
return datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
class ClipScraper:
def __init__(self, manager: TwitchTokenManager) -> None:
self.manager = manager
self.client = httpx.Client(timeout=20, http2=True)
def _get_page(
self, params: dict
) -> tuple[list[TwitchClip], Optional[str]]:
token = self.manager.get_token()
resp = self.client.get(
f"{HELIX}/clips",
headers=token.auth_headers(),
params=params,
)
token.update_rate_limits(resp.headers)
if resp.status_code == 429:
reset_at = float(resp.headers.get("Ratelimit-Reset", time.time() + 60))
wait = max(1, reset_at - time.time())
logger.warning(f"Rate limited on clips. Waiting {wait:.1f}s")
time.sleep(wait)
return self._get_page(params)
resp.raise_for_status()
body = resp.json()
clips = [self._parse_clip(c) for c in body.get("data", [])]
cursor = body.get("pagination", {}).get("cursor")
return clips, cursor
def _parse_clip(self, raw: dict) -> TwitchClip:
return TwitchClip(
clip_id=raw["id"],
url=raw["url"],
embed_url=raw["embed_url"],
broadcaster_id=raw["broadcaster_id"],
broadcaster_name=raw["broadcaster_name"],
creator_id=raw["creator_id"],
creator_name=raw["creator_name"],
video_id=raw.get("video_id", ""),
game_id=raw["game_id"],
game_name=raw.get("game_name", ""),
title=raw["title"],
view_count=raw["view_count"],
created_at=raw["created_at"],
thumbnail_url=raw["thumbnail_url"],
duration=raw["duration"],
vod_offset=raw.get("vod_offset"),
language=raw.get("language", ""),
is_featured=raw.get("is_featured", False),
)
def iter_clips_by_broadcaster(
self,
broadcaster_id: str,
started_at: Optional[str] = None,
ended_at: Optional[str] = None,
min_views: int = 0,
max_results: int = 1000,
) -> Iterator[TwitchClip]:
"""
Iterate clips for a broadcaster, newest first.
started_at / ended_at: RFC3339 strings, e.g. "2026-03-01T00:00:00Z"
"""
params: dict = {"broadcaster_id": broadcaster_id, "first": 100}
if started_at:
params["started_at"] = started_at
if ended_at:
params["ended_at"] = ended_at
count = 0
cursor = None
while count < max_results:
if cursor:
params["after"] = cursor
batch, cursor = self._get_page(params)
if not batch:
break
for clip in batch:
if clip.view_count < min_views:
continue
yield clip
count += 1
if count >= max_results:
break
if not cursor:
break
def iter_clips_by_game(
self,
game_id: str,
started_at: Optional[str] = None,
ended_at: Optional[str] = None,
min_views: int = 100,
max_results: int = 500,
) -> Iterator[TwitchClip]:
"""Iterate top clips for a game category."""
params: dict = {"game_id": game_id, "first": 100}
if started_at:
params["started_at"] = started_at
if ended_at:
params["ended_at"] = ended_at
count = 0
cursor = None
while count < max_results:
if cursor:
params["after"] = cursor
batch, cursor = self._get_page(params)
if not batch:
break
for clip in batch:
if clip.view_count < min_views:
continue
yield clip
count += 1
if count >= max_results:
break
if not cursor:
break
def get_clip_by_id(self, clip_id: str) -> Optional[TwitchClip]:
token = self.manager.get_token()
resp = self.client.get(
f"{HELIX}/clips",
headers=token.auth_headers(),
params={"id": clip_id},
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
data = resp.json().get("data", [])
return self._parse_clip(data[0]) if data else None
Part 4 — VOD Metadata Scraper
Archives, highlights, uploads with duration parsing and muted segment detection.
"""
twitch_vods.py — VOD metadata scraper
Fetches archives, highlights, and uploads with full pagination.
"""
import re
import logging
import httpx
from dataclasses import dataclass, field
from typing import Optional, Iterator
from twitch_auth import TwitchTokenManager
logger = logging.getLogger(__name__)
HELIX = "https://api.twitch.tv/helix"
@dataclass
class MutedSegment:
offset: int # Seconds into VOD where mute starts
duration: int # Seconds of muted audio
@dataclass
class TwitchVOD:
video_id: str
stream_id: str
user_id: str
user_login: str
user_name: str
title: str
description: str
created_at: str
published_at: str
url: str
thumbnail_url: str
viewable: str # "public" or "private"
view_count: int
language: str
vod_type: str # "archive", "highlight", "upload"
duration_raw: str # e.g. "3h24m15s"
duration_secs: int # Parsed to seconds
muted_segments: list[MutedSegment] = field(default_factory=list)
has_muted_audio: bool = False
def parse_duration(duration_str: str) -> int:
"""Convert '3h24m15s' format to total seconds."""
total = 0
for match in re.finditer(r"(\d+)([hms])", duration_str):
value, unit = int(match.group(1)), match.group(2)
if unit == "h":
total += value * 3600
elif unit == "m":
total += value * 60
elif unit == "s":
total += value
return total
class VODScraper:
def __init__(self, manager: TwitchTokenManager) -> None:
self.manager = manager
self.client = httpx.Client(timeout=20, http2=True)
def _parse_vod(self, raw: dict) -> TwitchVOD:
muted = []
for seg in raw.get("muted_segments") or []:
muted.append(MutedSegment(offset=seg["offset"], duration=seg["duration"]))
duration_raw = raw.get("duration", "0s")
return TwitchVOD(
video_id=raw["id"],
stream_id=raw.get("stream_id") or "",
user_id=raw["user_id"],
user_login=raw["user_login"],
user_name=raw["user_name"],
title=raw["title"],
description=raw.get("description", ""),
created_at=raw["created_at"],
published_at=raw["published_at"],
url=raw["url"],
thumbnail_url=raw.get("thumbnail_url", ""),
viewable=raw.get("viewable", "public"),
view_count=raw["view_count"],
language=raw["language"],
vod_type=raw["type"],
duration_raw=duration_raw,
duration_secs=parse_duration(duration_raw),
muted_segments=muted,
has_muted_audio=len(muted) > 0,
)
def iter_vods(
self,
user_id: str,
vod_type: str = "all", # "all", "archive", "highlight", "upload"
max_results: int = 500,
) -> Iterator[TwitchVOD]:
"""
Iterate VODs for a user. vod_type filters by video type.
"""
params: dict = {"user_id": user_id, "first": 100, "type": vod_type}
cursor = None
count = 0
while count < max_results:
if cursor:
params["after"] = cursor
token = self.manager.get_token()
resp = self.client.get(
f"{HELIX}/videos",
headers=token.auth_headers(),
params=params,
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
body = resp.json()
batch = body.get("data", [])
if not batch:
break
for raw in batch:
yield self._parse_vod(raw)
count += 1
if count >= max_results:
break
cursor = body.get("pagination", {}).get("cursor")
if not cursor:
break
def get_vod_by_id(self, video_id: str) -> Optional[TwitchVOD]:
token = self.manager.get_token()
resp = self.client.get(
f"{HELIX}/videos",
headers=token.auth_headers(),
params={"id": video_id},
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
data = resp.json().get("data", [])
return self._parse_vod(data[0]) if data else None
def get_muted_summary(self, vod: TwitchVOD) -> str:
if not vod.muted_segments:
return "No muted segments"
total_muted = sum(s.duration for s in vod.muted_segments)
pct = (total_muted / vod.duration_secs * 100) if vod.duration_secs else 0
return (
f"{len(vod.muted_segments)} muted segment(s), "
f"{total_muted}s total ({pct:.1f}% of VOD)"
)
Part 5 — Live Stream Monitor
Poll current viewers, detect game and title changes, build a viewer timeline.
"""
twitch_monitor.py — Live stream monitor with viewer timeline
Polls stream state at regular intervals and records changes.
"""
import time
import logging
import sqlite3
import httpx
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from twitch_auth import TwitchTokenManager
logger = logging.getLogger(__name__)
HELIX = "https://api.twitch.tv/helix"
@dataclass
class StreamSnapshot:
broadcaster_id: str
broadcaster_login: str
timestamp: float
viewer_count: int
game_id: str
game_name: str
title: str
tags: list[str] = field(default_factory=list)
is_mature: bool = False
@dataclass
class StreamEvent:
event_type: str # "game_change", "title_change", "online", "offline"
broadcaster_id: str
timestamp: float
old_value: str
new_value: str
class StreamMonitor:
"""
Monitors one or more live streams. Records viewer counts and detects
game/title changes. Stores everything in SQLite.
"""
def __init__(
self,
manager: TwitchTokenManager,
db_path: str = "stream_monitor.db",
poll_interval: int = 60, # seconds
) -> None:
self.manager = manager
self.db_path = db_path
self.poll_interval = poll_interval
self.client = httpx.Client(timeout=20, http2=True)
self._last_state: dict[str, StreamSnapshot] = {}
self._init_db()
def _init_db(self) -> None:
conn = sqlite3.connect(self.db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS stream_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
broadcaster_id TEXT NOT NULL,
broadcaster_login TEXT NOT NULL,
timestamp REAL NOT NULL,
viewer_count INTEGER NOT NULL,
game_id TEXT,
game_name TEXT,
title TEXT,
tags TEXT
);
CREATE TABLE IF NOT EXISTS stream_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
broadcaster_id TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp REAL NOT NULL,
old_value TEXT,
new_value TEXT
);
CREATE INDEX IF NOT EXISTS idx_snapshots_broadcaster
ON stream_snapshots(broadcaster_id, timestamp);
""")
conn.commit()
conn.close()
def _get_streams(self, user_logins: list[str]) -> list[StreamSnapshot]:
token = self.manager.get_token()
resp = self.client.get(
f"{HELIX}/streams",
headers=token.auth_headers(),
params={"user_login": user_logins, "first": 100},
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
snapshots = []
now = time.time()
for s in resp.json().get("data", []):
snapshots.append(
StreamSnapshot(
broadcaster_id=s["user_id"],
broadcaster_login=s["user_login"],
timestamp=now,
viewer_count=s["viewer_count"],
game_id=s.get("game_id", ""),
game_name=s.get("game_name", ""),
title=s["title"],
tags=s.get("tags", []),
is_mature=s.get("is_mature", False),
)
)
return snapshots
def _save_snapshot(self, conn: sqlite3.Connection, snap: StreamSnapshot) -> None:
conn.execute(
"""INSERT INTO stream_snapshots
(broadcaster_id, broadcaster_login, timestamp, viewer_count,
game_id, game_name, title, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
snap.broadcaster_id,
snap.broadcaster_login,
snap.timestamp,
snap.viewer_count,
snap.game_id,
snap.game_name,
snap.title,
",".join(snap.tags),
),
)
def _detect_events(
self, snap: StreamSnapshot
) -> list[StreamEvent]:
events = []
prev = self._last_state.get(snap.broadcaster_id)
if prev is None:
events.append(StreamEvent(
event_type="online",
broadcaster_id=snap.broadcaster_id,
timestamp=snap.timestamp,
old_value="",
new_value=snap.game_name,
))
else:
if prev.game_id != snap.game_id:
events.append(StreamEvent(
event_type="game_change",
broadcaster_id=snap.broadcaster_id,
timestamp=snap.timestamp,
old_value=prev.game_name,
new_value=snap.game_name,
))
if prev.title != snap.title:
events.append(StreamEvent(
event_type="title_change",
broadcaster_id=snap.broadcaster_id,
timestamp=snap.timestamp,
old_value=prev.title,
new_value=snap.title,
))
return events
def _save_events(
self, conn: sqlite3.Connection, events: list[StreamEvent]
) -> None:
for ev in events:
conn.execute(
"""INSERT INTO stream_events
(broadcaster_id, event_type, timestamp, old_value, new_value)
VALUES (?, ?, ?, ?, ?)""",
(ev.broadcaster_id, ev.event_type, ev.timestamp,
ev.old_value, ev.new_value),
)
def poll_once(self, user_logins: list[str]) -> list[StreamSnapshot]:
snapshots = self._get_streams(user_logins)
live_ids = {s.broadcaster_id for s in snapshots}
# Detect offline channels
for bid, prev in list(self._last_state.items()):
if bid not in live_ids:
offline_event = StreamEvent(
event_type="offline",
broadcaster_id=bid,
timestamp=time.time(),
old_value=prev.game_name,
new_value="",
)
conn = sqlite3.connect(self.db_path)
self._save_events(conn, [offline_event])
conn.commit()
conn.close()
del self._last_state[bid]
conn = sqlite3.connect(self.db_path)
for snap in snapshots:
events = self._detect_events(snap)
self._save_snapshot(conn, snap)
self._save_events(conn, events)
self._last_state[snap.broadcaster_id] = snap
if events:
for ev in events:
logger.info(
f"[{snap.broadcaster_login}] {ev.event_type}: "
f"{ev.old_value!r} -> {ev.new_value!r}"
)
conn.commit()
conn.close()
return snapshots
def run_forever(self, user_logins: list[str]) -> None:
"""Poll continuously until interrupted."""
logger.info(
f"Monitoring {len(user_logins)} channel(s) "
f"every {self.poll_interval}s"
)
while True:
try:
snaps = self.poll_once(user_logins)
for s in snaps:
logger.info(
f"{s.broadcaster_login}: {s.viewer_count:,} viewers | "
f"{s.game_name} | {s.title[:60]}"
)
except Exception as e:
logger.error(f"Poll error: {e}")
time.sleep(self.poll_interval)
def get_viewer_timeline(
self,
broadcaster_id: str,
since_timestamp: float,
) -> list[dict]:
"""Return viewer count history as list of {timestamp, viewer_count} dicts."""
conn = sqlite3.connect(self.db_path)
rows = conn.execute(
"""SELECT timestamp, viewer_count FROM stream_snapshots
WHERE broadcaster_id = ? AND timestamp >= ?
ORDER BY timestamp""",
(broadcaster_id, since_timestamp),
).fetchall()
conn.close()
return [{"timestamp": r[0], "viewer_count": r[1]} for r in rows]
Part 6 — GQL Endpoint Usage
The Twitch GraphQL endpoint exposes richer data not available in Helix. It uses persisted queries identified by SHA-256 hashes. The hashes are embedded in the Twitch web app's JavaScript bundles and occasionally rotate.
"""
twitch_gql.py — GraphQL endpoint for richer Twitch data
Uses persisted queries (operation name + SHA-256 hash).
WARNING: Undocumented endpoint. May require Client-Integrity header for some queries.
"""
import json
import logging
import httpx
from dataclasses import dataclass
from typing import Any, Optional
from twitch_auth import TwitchTokenManager
logger = logging.getLogger(__name__)
GQL_URL = "https://gql.twitch.tv/gql"
# Known stable persisted query hashes (as of early 2026)
# These are extracted from Twitch's front-end JS bundles.
GQL_QUERIES = {
"StreamMetadata": "1c719a40e481453e5c48d9bb585d971b8b372f8ead1d0fdc454a6f53d71c5d01",
"ClipsCards__User": "b73ad2bfaecfd30a9e6c28fada15bd97032c83ec77a0440585a9a1c60b0b9f6a",
"VideoMetadata": "45111672eea2e507f153820ab22c19b9e4f09c550c52a2e23c2dd5b28f4df4b3",
"TopClipsForGame": "7bc6ab24e2e5ba3c8f84ed5e83de6ae17b55a48af3e12b2d17aba83c58ae098e",
"ChannelPage_AboutPanel": "6089531acef6a09b69d9d9b9f70df2b35e05fe1c5f8fa23c787d33c559b36a17",
"StreamSummary": "158f5b03e1dfe7e3dd90de35a0a96e2bdb60f7f07aca9c34e1a52ae24f3e1df0",
}
# GQL queries that still work without Client-Integrity (as of Q1 2026)
ANON_COMPATIBLE = {"StreamMetadata", "TopClipsForGame", "StreamSummary"}
@dataclass
class GQLClient:
client_id: str # Use the same client_id as your Helix app
integrity_token: Optional[str] = None # Client-Integrity JWT if available
device_id: Optional[str] = None
def _build_headers(self) -> dict[str, str]:
headers = {
"Client-ID": self.client_id,
"Content-Type": "application/json",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
}
if self.integrity_token:
headers["Client-Integrity"] = self.integrity_token
if self.device_id:
headers["X-Device-Id"] = self.device_id
return headers
def query(
self,
operation_name: str,
variables: dict[str, Any],
http_client: Optional[httpx.Client] = None,
) -> dict:
"""Execute a persisted GQL query by operation name."""
query_hash = GQL_QUERIES.get(operation_name)
if not query_hash:
raise ValueError(f"Unknown GQL operation: {operation_name}")
payload = [
{
"operationName": operation_name,
"variables": variables,
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": query_hash,
}
},
}
]
client = http_client or httpx.Client(timeout=20, http2=True)
resp = client.post(
GQL_URL,
headers=self._build_headers(),
content=json.dumps(payload),
)
if resp.status_code == 401:
raise RuntimeError(
"GQL requires Client-Integrity token. "
"Obtain via headless browser or fall back to Helix API."
)
resp.raise_for_status()
results = resp.json()
if isinstance(results, list):
return results[0].get("data", {})
return results.get("data", {})
def get_stream_metadata_gql(
gql: GQLClient, channel_login: str, client: httpx.Client
) -> dict:
"""
Fetch stream metadata via GQL — includes fields not in Helix:
clip count, subscriber count estimate, community points status.
"""
data = gql.query(
"StreamMetadata",
{"channelLogin": channel_login},
http_client=client,
)
return data.get("user", {})
def get_top_clips_gql(
gql: GQLClient,
game_slug: str,
time_range: str = "LAST_WEEK", # LAST_DAY, LAST_WEEK, LAST_MONTH, ALL_TIME
limit: int = 20,
client: Optional[httpx.Client] = None,
) -> list[dict]:
"""
Fetch top clips for a game via GQL.
time_range options: LAST_DAY, LAST_WEEK, LAST_MONTH, ALL_TIME
"""
data = gql.query(
"TopClipsForGame",
{
"gameName": game_slug,
"criteria": {"timeRange": time_range, "shouldFilterByDiscoverySetting": False},
"limit": limit,
},
http_client=client,
)
edges = (
data.get("game", {})
.get("clips", {})
.get("edges", [])
)
return [e.get("node", {}) for e in edges]
def get_vod_chapters_gql(
gql: GQLClient, video_id: str, client: Optional[httpx.Client] = None
) -> list[dict]:
"""
Get chapter markers within a VOD (game changes, titled segments).
This data is only available via GQL, not Helix.
"""
data = gql.query(
"VideoMetadata",
{"videoID": video_id, "channelLogin": ""},
http_client=client,
)
moments = (
data.get("video", {})
.get("moments", {})
.get("edges", [])
)
return [
{
"type": m["node"].get("type"),
"position_secs": m["node"].get("positionMilliseconds", 0) // 1000,
"duration_secs": m["node"].get("durationMilliseconds", 0) // 1000,
"description": m["node"].get("description", ""),
"game": (m["node"].get("details") or {}).get("game", {}).get("displayName", ""),
}
for m in moments
]
Getting a Client-Integrity Token
For GQL queries that require the integrity header, the most reliable approach is a one-time browser automation step to extract the token:
"""
get_integrity_token.py — Extract Client-Integrity token via headless browser
Requires: playwright (pip install playwright && playwright install chromium)
Token is valid for ~1 hour.
"""
import asyncio
import json
from playwright.async_api import async_playwright
async def capture_integrity_token(channel_url: str = "https://www.twitch.tv/xqc") -> str:
integrity_token = None
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
# Intercept GQL requests to capture the integrity header
async def handle_request(request):
nonlocal integrity_token
if "gql.twitch.tv" in request.url:
token = request.headers.get("client-integrity")
if token:
integrity_token = token
page.on("request", handle_request)
await page.goto(channel_url, wait_until="networkidle", timeout=30000)
await browser.close()
if not integrity_token:
raise RuntimeError("No integrity token captured. Check if page loaded correctly.")
return integrity_token
if __name__ == "__main__":
token = asyncio.run(capture_integrity_token())
# Cache for reuse — valid ~1 hour
with open(".integrity_token", "w") as f:
json.dump({"token": token, "captured_at": __import__("time").time()}, f)
print(f"Captured integrity token: {token[:40]}...")
Part 7 — Chat Log Collection
Connect to Twitch IRC/TMI for real-time chat. Parse messages, extract emotes, detect spam patterns.
"""
twitch_chat.py — IRC-based Twitch chat collector
Connects to Twitch's IRC gateway (TMI) and records all messages.
Requires a user OAuth token with chat:read scope (not app token).
"""
import re
import time
import socket
import logging
import sqlite3
import threading
from dataclasses import dataclass, field
from typing import Optional, Callable
logger = logging.getLogger(__name__)
IRC_HOST = "irc.chat.twitch.tv"
IRC_PORT = 6667
PING_INTERVAL = 180 # seconds
@dataclass
class ChatMessage:
channel: str
username: str
display_name: str
user_id: str
message: str
timestamp: float
emotes: list[str] = field(default_factory=list)
badges: list[str] = field(default_factory=list)
is_subscriber: bool = False
is_moderator: bool = False
is_vip: bool = False
bits: int = 0
color: str = ""
message_id: str = ""
def parse_irc_tags(raw_tags: str) -> dict[str, str]:
"""Parse @key=value;key=value tag string from Twitch IRC."""
tags = {}
for part in raw_tags.split(";"):
if "=" in part:
k, v = part.split("=", 1)
tags[k] = v
return tags
def parse_emotes_from_tag(emote_tag: str) -> list[str]:
"""Extract emote IDs from the emotes tag value."""
if not emote_tag:
return []
emote_ids = []
for entry in emote_tag.split("/"):
if ":" in entry:
emote_id = entry.split(":")[0]
emote_ids.append(emote_id)
return list(set(emote_ids))
def parse_badges(badge_tag: str) -> list[str]:
if not badge_tag:
return []
return [b.split("/")[0] for b in badge_tag.split(",")]
class TwitchChatCollector:
"""
Collects chat from one or more Twitch channels via IRC.
Stores messages in SQLite. Optionally calls a callback per message.
"""
def __init__(
self,
username: str,
oauth_token: str, # Format: "oauth:xxxxxxxxxxxxxxxxxxxxxxx"
db_path: str = "chat.db",
message_callback: Optional[Callable[[ChatMessage], None]] = None,
) -> None:
self.username = username.lower()
self.oauth_token = oauth_token
self.db_path = db_path
self.message_callback = message_callback
self._sock: Optional[socket.socket] = None
self._running = False
self._channels: set[str] = set()
self._init_db()
def _init_db(self) -> None:
conn = sqlite3.connect(self.db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel TEXT NOT NULL,
username TEXT NOT NULL,
display_name TEXT,
user_id TEXT,
message TEXT NOT NULL,
timestamp REAL NOT NULL,
emotes TEXT,
badges TEXT,
is_subscriber INTEGER DEFAULT 0,
is_moderator INTEGER DEFAULT 0,
is_vip INTEGER DEFAULT 0,
bits INTEGER DEFAULT 0,
color TEXT,
message_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_chat_channel_ts
ON chat_messages(channel, timestamp);
CREATE INDEX IF NOT EXISTS idx_chat_username
ON chat_messages(username);
""")
conn.commit()
conn.close()
def _connect(self) -> None:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((IRC_HOST, IRC_PORT))
self._sock.settimeout(300)
self._send(f"PASS {self.oauth_token}")
self._send(f"NICK {self.username}")
# Request tags and membership capabilities
self._send("CAP REQ :twitch.tv/tags twitch.tv/commands")
logger.info("Connected to Twitch IRC")
def _send(self, msg: str) -> None:
if self._sock:
self._sock.sendall(f"{msg}\r\n".encode())
def join(self, channel: str) -> None:
channel = channel.lower().lstrip("#")
self._channels.add(channel)
if self._sock:
self._send(f"JOIN #{channel}")
logger.info(f"Joined #{channel}")
def _parse_privmsg(self, raw_line: str) -> Optional[ChatMessage]:
"""Parse a PRIVMSG line into a ChatMessage."""
# Format: @tags :[email protected] PRIVMSG #channel :message
tag_match = re.match(r"^@([^ ]+) :([^!]+)![^ ]+ PRIVMSG #([^ ]+) :(.+)$", raw_line)
if not tag_match:
return None
raw_tags, nick, channel, message = tag_match.groups()
tags = parse_irc_tags(raw_tags)
badges = parse_badges(tags.get("badges", ""))
return ChatMessage(
channel=channel,
username=nick,
display_name=tags.get("display-name", nick),
user_id=tags.get("user-id", ""),
message=message.rstrip("\r\n"),
timestamp=time.time(),
emotes=parse_emotes_from_tag(tags.get("emotes", "")),
badges=badges,
is_subscriber="subscriber" in badges,
is_moderator=tags.get("mod", "0") == "1",
is_vip="vip" in badges,
bits=int(tags.get("bits", 0)),
color=tags.get("color", ""),
message_id=tags.get("id", ""),
)
def _save_message(self, msg: ChatMessage) -> None:
conn = sqlite3.connect(self.db_path)
conn.execute(
"""INSERT INTO chat_messages
(channel, username, display_name, user_id, message, timestamp,
emotes, badges, is_subscriber, is_moderator, is_vip, bits, color, message_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
msg.channel, msg.username, msg.display_name, msg.user_id,
msg.message, msg.timestamp,
",".join(msg.emotes), ",".join(msg.badges),
int(msg.is_subscriber), int(msg.is_moderator),
int(msg.is_vip), msg.bits, msg.color, msg.message_id,
),
)
conn.commit()
conn.close()
def _ping_loop(self) -> None:
while self._running:
time.sleep(PING_INTERVAL)
if self._running:
self._send("PING :tmi.twitch.tv")
def run(self, channels: list[str]) -> None:
"""Connect and collect chat. Blocks until stopped."""
self._connect()
self._running = True
# Start keepalive thread
ping_thread = threading.Thread(target=self._ping_loop, daemon=True)
ping_thread.start()
for ch in channels:
self.join(ch)
buffer = ""
while self._running:
try:
data = self._sock.recv(4096).decode("utf-8", errors="replace")
buffer += data
while "\r\n" in buffer:
line, buffer = buffer.split("\r\n", 1)
if line.startswith("PING"):
self._send("PONG :tmi.twitch.tv")
continue
if "PRIVMSG" in line:
msg = self._parse_privmsg(line)
if msg:
self._save_message(msg)
if self.message_callback:
self.message_callback(msg)
except socket.timeout:
logger.warning("IRC socket timeout — reconnecting")
self._connect()
for ch in self._channels:
self._send(f"JOIN #{ch}")
except Exception as e:
logger.error(f"IRC error: {e}")
if self._running:
time.sleep(5)
try:
self._connect()
for ch in self._channels:
self._send(f"JOIN #{ch}")
except Exception as reconnect_err:
logger.error(f"Reconnect failed: {reconnect_err}")
def stop(self) -> None:
self._running = False
if self._sock:
self._send("QUIT")
self._sock.close()
Part 8 — BTTV / FFZ / 7TV Emote Usage Tracker
Third-party emotes dominate Twitch chat. Track their frequency separately from native Twitch emotes.
"""
twitch_emotes.py — Third-party emote usage tracker
Tracks BTTV, FFZ, and 7TV emote frequency in collected chat logs.
"""
import re
import sqlite3
import logging
from dataclasses import dataclass, field
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
BTTV_GLOBAL_URL = "https://api.betterttv.net/3/cached/emotes/global"
BTTV_CHANNEL_URL = "https://api.betterttv.net/3/cached/users/twitch/{broadcaster_id}"
FFZ_CHANNEL_URL = "https://api.frankerfacez.com/v1/room/id/{broadcaster_id}"
SEVENTV_CHANNEL_URL = "https://7tv.io/v3/users/twitch/{broadcaster_id}"
SEVENTV_GLOBAL_URL = "https://7tv.io/v3/emote-sets/global"
@dataclass
class ThirdPartyEmote:
name: str
emote_id: str
source: str # "bttv", "ffz", "7tv"
url: str
@dataclass
class EmoteStats:
name: str
source: str
count: int
unique_users: int
def fetch_bttv_emotes(broadcaster_id: str) -> list[ThirdPartyEmote]:
client = httpx.Client(timeout=15)
emotes = []
# Global BTTV emotes
try:
resp = client.get(BTTV_GLOBAL_URL)
resp.raise_for_status()
for e in resp.json():
emotes.append(ThirdPartyEmote(
name=e["code"],
emote_id=e["id"],
source="bttv",
url=f"https://cdn.betterttv.net/emote/{e['id']}/1x",
))
except Exception as e:
logger.warning(f"BTTV global fetch failed: {e}")
# Channel-specific BTTV emotes
try:
resp = client.get(BTTV_CHANNEL_URL.format(broadcaster_id=broadcaster_id))
resp.raise_for_status()
data = resp.json()
for e in data.get("channelEmotes", []) + data.get("sharedEmotes", []):
emotes.append(ThirdPartyEmote(
name=e["code"],
emote_id=e["id"],
source="bttv",
url=f"https://cdn.betterttv.net/emote/{e['id']}/1x",
))
except Exception as e:
logger.warning(f"BTTV channel fetch failed: {e}")
return emotes
def fetch_ffz_emotes(broadcaster_id: str) -> list[ThirdPartyEmote]:
client = httpx.Client(timeout=15)
emotes = []
try:
resp = client.get(FFZ_CHANNEL_URL.format(broadcaster_id=broadcaster_id))
resp.raise_for_status()
data = resp.json()
for room_id, emoteset in data.get("sets", {}).items():
for e in emoteset.get("emoticons", []):
urls = e.get("urls", {})
url = urls.get("1") or urls.get("2") or ""
if url and not url.startswith("http"):
url = f"https:{url}"
emotes.append(ThirdPartyEmote(
name=e["name"],
emote_id=str(e["id"]),
source="ffz",
url=url,
))
except Exception as e:
logger.warning(f"FFZ fetch failed: {e}")
return emotes
def fetch_7tv_emotes(broadcaster_id: str) -> list[ThirdPartyEmote]:
client = httpx.Client(timeout=15)
emotes = []
try:
resp = client.get(SEVENTV_CHANNEL_URL.format(broadcaster_id=broadcaster_id))
resp.raise_for_status()
data = resp.json()
emoteset = data.get("emote_set") or {}
for e in emoteset.get("emotes", []):
emote_data = e.get("data", {})
emotes.append(ThirdPartyEmote(
name=e["name"],
emote_id=e["id"],
source="7tv",
url=f"https://cdn.7tv.app/emote/{e['id']}/1x.webp",
))
except Exception as e:
logger.warning(f"7TV fetch failed: {e}")
return emotes
def count_emote_usage(
db_path: str,
channel: str,
emotes: list[ThirdPartyEmote],
since_timestamp: float = 0.0,
) -> list[EmoteStats]:
"""
Scan collected chat messages for third-party emote usage.
Returns emotes sorted by frequency.
"""
emote_map = {e.name: e for e in emotes}
# Build regex that matches whole words (emote names are case-sensitive on Twitch)
if not emote_map:
return []
# Escape and build pattern
pattern = re.compile(
r"\b(" + "|".join(re.escape(name) for name in emote_map.keys()) + r")\b"
)
conn = sqlite3.connect(db_path)
rows = conn.execute(
"""SELECT username, message FROM chat_messages
WHERE channel = ? AND timestamp >= ?""",
(channel.lstrip("#"), since_timestamp),
).fetchall()
conn.close()
counts: dict[str, int] = {}
users: dict[str, set[str]] = {}
for username, message in rows:
found = set(pattern.findall(message))
for emote_name in found:
counts[emote_name] = counts.get(emote_name, 0) + message.count(emote_name)
users.setdefault(emote_name, set()).add(username)
stats = [
EmoteStats(
name=name,
source=emote_map[name].source,
count=count,
unique_users=len(users.get(name, set())),
)
for name, count in counts.items()
]
return sorted(stats, key=lambda s: s.count, reverse=True)
Part 9 — Top Clips Aggregator
Find viral clips across multiple channels and games in a time window.
"""
twitch_aggregator.py — Viral clip aggregation across channels and games
Finds the most-viewed clips created within a time window.
"""
import time
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Optional
import httpx
from twitch_auth import TwitchTokenManager
from twitch_clips import ClipScraper, TwitchClip
logger = logging.getLogger(__name__)
@dataclass
class AggregatedClip:
clip: TwitchClip
rank: int
views_per_day: float # Normalized virality score
def rfc3339_days_ago(days: int) -> str:
dt = datetime.now(timezone.utc) - timedelta(days=days)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
class ClipAggregator:
def __init__(self, manager: TwitchTokenManager) -> None:
self.scraper = ClipScraper(manager)
def top_clips_for_channels(
self,
broadcaster_ids: list[str],
days_back: int = 7,
min_views: int = 500,
max_per_channel: int = 50,
top_n: int = 100,
) -> list[AggregatedClip]:
"""
Collect top clips from multiple channels and return
the top N sorted by view count.
"""
started_at = rfc3339_days_ago(days_back)
ended_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
all_clips: list[TwitchClip] = []
for bid in broadcaster_ids:
logger.info(f"Fetching clips for broadcaster {bid}")
clips = list(
self.scraper.iter_clips_by_broadcaster(
broadcaster_id=bid,
started_at=started_at,
ended_at=ended_at,
min_views=min_views,
max_results=max_per_channel,
)
)
all_clips.extend(clips)
logger.info(f" Got {len(clips)} clips (>{min_views} views)")
# Deduplicate by clip_id
seen = set()
unique_clips = []
for clip in all_clips:
if clip.clip_id not in seen:
seen.add(clip.clip_id)
unique_clips.append(clip)
# Sort by view count descending
unique_clips.sort(key=lambda c: c.view_count, reverse=True)
top = unique_clips[:top_n]
# Compute views-per-day for each clip
now = datetime.now(timezone.utc)
result = []
for rank, clip in enumerate(top, 1):
created = datetime.fromisoformat(clip.created_at.replace("Z", "+00:00"))
age_days = max(0.1, (now - created).total_seconds() / 86400)
vpd = clip.view_count / age_days
result.append(AggregatedClip(clip=clip, rank=rank, views_per_day=vpd))
return result
def top_clips_for_games(
self,
game_ids: list[str],
days_back: int = 7,
min_views: int = 1000,
max_per_game: int = 100,
top_n: int = 50,
) -> list[AggregatedClip]:
"""
Collect top clips across multiple game categories.
"""
started_at = rfc3339_days_ago(days_back)
ended_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
all_clips: list[TwitchClip] = []
for game_id in game_ids:
logger.info(f"Fetching clips for game {game_id}")
clips = list(
self.scraper.iter_clips_by_game(
game_id=game_id,
started_at=started_at,
ended_at=ended_at,
min_views=min_views,
max_results=max_per_game,
)
)
all_clips.extend(clips)
seen = set()
unique_clips = [
c for c in all_clips
if c.clip_id not in seen and not seen.add(c.clip_id)
]
unique_clips.sort(key=lambda c: c.view_count, reverse=True)
top = unique_clips[:top_n]
now = datetime.now(timezone.utc)
return [
AggregatedClip(
clip=c,
rank=i + 1,
views_per_day=c.view_count / max(
0.1,
(now - datetime.fromisoformat(c.created_at.replace("Z", "+00:00"))).total_seconds() / 86400,
),
)
for i, c in enumerate(top)
]
Part 10 — Channel Comparison Tool
Compare metrics across multiple streamers in a single report.
"""
twitch_compare.py — Multi-channel comparison tool
Compares avg viewers, clip output, VOD frequency, and growth across streamers.
"""
import time
import logging
import statistics
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from twitch_auth import TwitchTokenManager
from twitch_channels import ChannelScraper, ChannelInfo
from twitch_clips import ClipScraper
from twitch_vods import VODScraper
logger = logging.getLogger(__name__)
@dataclass
class ChannelMetrics:
channel: ChannelInfo
clip_count_7d: int = 0
top_clip_views_7d: int = 0
avg_clip_views_7d: float = 0.0
vod_count_30d: int = 0
avg_vod_duration_mins: float = 0.0
total_vod_views_30d: int = 0
has_active_schedule: bool = False
team_count: int = 0
class ChannelComparator:
def __init__(self, manager: TwitchTokenManager) -> None:
self.channel_scraper = ChannelScraper(manager)
self.clip_scraper = ClipScraper(manager)
self.vod_scraper = VODScraper(manager)
def compare(self, logins: list[str]) -> list[ChannelMetrics]:
"""
Build a comparison report for the given channel logins.
Pulls clips from last 7 days and VODs from last 30 days.
"""
channels = self.channel_scraper.get_users(logins)
for ch in channels:
self.channel_scraper.enrich_channel(ch)
now = datetime.now(timezone.utc)
started_7d = (now - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
started_30d = (now - timedelta(days=30)).strftime("%Y-%m-%dT%H:%M:%SZ")
ended_at = now.strftime("%Y-%m-%dT%H:%M:%SZ")
results = []
for ch in channels:
logger.info(f"Analyzing {ch.login}...")
metrics = ChannelMetrics(channel=ch)
# Clip metrics (7 days)
clips = list(
self.clip_scraper.iter_clips_by_broadcaster(
broadcaster_id=ch.broadcaster_id,
started_at=started_7d,
ended_at=ended_at,
max_results=200,
)
)
metrics.clip_count_7d = len(clips)
if clips:
view_counts = [c.view_count for c in clips]
metrics.top_clip_views_7d = max(view_counts)
metrics.avg_clip_views_7d = statistics.mean(view_counts)
# VOD metrics (30 days)
vod_durations = []
vod_views = []
cutoff = (now - timedelta(days=30)).timestamp()
for vod in self.vod_scraper.iter_vods(ch.broadcaster_id, max_results=50):
created = datetime.fromisoformat(
vod.created_at.replace("Z", "+00:00")
)
if created.timestamp() < cutoff:
break
vod_durations.append(vod.duration_secs / 60)
vod_views.append(vod.view_count)
metrics.vod_count_30d = len(vod_durations)
metrics.avg_vod_duration_mins = (
statistics.mean(vod_durations) if vod_durations else 0
)
metrics.total_vod_views_30d = sum(vod_views)
# Schedule and teams
schedule = self.channel_scraper.get_schedule(ch.broadcaster_id)
metrics.has_active_schedule = len(schedule) > 0
metrics.team_count = len(ch.team_names)
results.append(metrics)
return results
def print_comparison(self, results: list[ChannelMetrics]) -> None:
print(f"\n{'Channel':<20} {'Followers':>12} {'Clips 7d':>10} "
f"{'Avg Clip Views':>16} {'VODs 30d':>10} "
f"{'VOD Views 30d':>14} {'Schedule':>10}")
print("-" * 100)
for m in sorted(results, key=lambda x: x.channel.follower_count, reverse=True):
print(
f"{m.channel.display_name:<20} "
f"{m.channel.follower_count:>12,} "
f"{m.clip_count_7d:>10} "
f"{m.avg_clip_views_7d:>16,.0f} "
f"{m.vod_count_30d:>10} "
f"{m.total_vod_views_30d:>14,} "
f"{'Yes' if m.has_active_schedule else 'No':>10}"
)
Part 11 — Game and Category Analytics
Track which games are trending, viewer distribution across categories.
"""
twitch_categories.py — Game and category analytics
Tracks viewer distribution across games and spots trending categories.
"""
import time
import logging
import sqlite3
import httpx
from dataclasses import dataclass
from typing import Iterator
from twitch_auth import TwitchTokenManager
logger = logging.getLogger(__name__)
HELIX = "https://api.twitch.tv/helix"
@dataclass
class GameStats:
game_id: str
game_name: str
box_art_url: str
viewer_count: int
stream_count: int
rank: int
@dataclass
class StreamerInCategory:
broadcaster_id: str
broadcaster_login: str
viewer_count: int
title: str
started_at: str
language: str
is_mature: bool
class CategoryAnalyzer:
def __init__(self, manager: TwitchTokenManager, db_path: str = "analytics.db") -> None:
self.manager = manager
self.db_path = db_path
self.client = httpx.Client(timeout=20, http2=True)
self._init_db()
def _init_db(self) -> None:
conn = sqlite3.connect(self.db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS category_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
game_id TEXT NOT NULL,
game_name TEXT NOT NULL,
viewer_count INTEGER NOT NULL,
stream_count INTEGER NOT NULL,
rank INTEGER NOT NULL,
timestamp REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cat_game_ts
ON category_snapshots(game_id, timestamp);
""")
conn.commit()
conn.close()
def _get(self, path: str, params: dict) -> dict:
token = self.manager.get_token()
resp = self.client.get(
f"{HELIX}{path}",
headers=token.auth_headers(),
params=params,
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
return resp.json()
def get_top_games(self, top_n: int = 50) -> list[GameStats]:
"""Fetch top games by current viewer count."""
games = []
cursor = None
while len(games) < top_n:
params: dict = {"first": min(100, top_n - len(games))}
if cursor:
params["after"] = cursor
body = self._get("/games/top", params)
for i, g in enumerate(body.get("data", []), len(games) + 1):
games.append(GameStats(
game_id=g["id"],
game_name=g["name"],
box_art_url=g.get("box_art_url", ""),
viewer_count=0,
stream_count=0,
rank=i,
))
cursor = body.get("pagination", {}).get("cursor")
if not cursor:
break
# Enrich with stream counts and viewer totals
for game in games:
game.stream_count, game.viewer_count = self._count_streams_for_game(
game.game_id, sample_limit=100
)
return games
def _count_streams_for_game(
self, game_id: str, sample_limit: int = 100
) -> tuple[int, int]:
"""Count streams and sum viewers for a game (sampled)."""
body = self._get("/streams", {"game_id": game_id, "first": sample_limit})
streams = body.get("data", [])
return len(streams), sum(s["viewer_count"] for s in streams)
def snapshot_top_games(self, top_n: int = 50) -> list[GameStats]:
"""Record current top-game state to SQLite."""
games = self.get_top_games(top_n)
now = time.time()
conn = sqlite3.connect(self.db_path)
for g in games:
conn.execute(
"""INSERT INTO category_snapshots
(game_id, game_name, viewer_count, stream_count, rank, timestamp)
VALUES (?, ?, ?, ?, ?, ?)""",
(g.game_id, g.game_name, g.viewer_count, g.stream_count, g.rank, now),
)
conn.commit()
conn.close()
logger.info(f"Snapshotted {len(games)} game categories")
return games
def get_streams_for_game(
self,
game_id: str,
language: str = "",
max_results: int = 100,
) -> Iterator[StreamerInCategory]:
"""Iterate live streams in a specific game category."""
params: dict = {"game_id": game_id, "first": 100}
if language:
params["language"] = language
cursor = None
count = 0
while count < max_results:
if cursor:
params["after"] = cursor
body = self._get("/streams", params)
for s in body.get("data", []):
yield StreamerInCategory(
broadcaster_id=s["user_id"],
broadcaster_login=s["user_login"],
viewer_count=s["viewer_count"],
title=s["title"],
started_at=s["started_at"],
language=s["language"],
is_mature=s.get("is_mature", False),
)
count += 1
if count >= max_results:
break
cursor = body.get("pagination", {}).get("cursor")
if not cursor:
break
def trending_games(self, hours_back: int = 24) -> list[dict]:
"""
Compare current top games to those from hours_back hours ago.
Returns games with biggest viewer count increases.
"""
cutoff = time.time() - hours_back * 3600
conn = sqlite3.connect(self.db_path)
# Most recent snapshot per game
current = conn.execute(
"""SELECT game_id, game_name, viewer_count, rank
FROM category_snapshots
WHERE timestamp = (SELECT MAX(timestamp) FROM category_snapshots)"""
).fetchall()
# Snapshot nearest to cutoff
past = conn.execute(
"""SELECT game_id, viewer_count FROM category_snapshots
WHERE timestamp <= ? ORDER BY timestamp DESC LIMIT 100""",
(cutoff,),
).fetchall()
conn.close()
past_map = {r[0]: r[1] for r in past}
trends = []
for game_id, game_name, current_viewers, rank in current:
past_viewers = past_map.get(game_id, 0)
if past_viewers > 0:
change_pct = (current_viewers - past_viewers) / past_viewers * 100
else:
change_pct = 100.0 if current_viewers > 0 else 0.0
trends.append({
"game_id": game_id,
"game_name": game_name,
"current_viewers": current_viewers,
"past_viewers": past_viewers,
"change_pct": change_pct,
"rank": rank,
})
return sorted(trends, key=lambda x: x["change_pct"], reverse=True)
Part 12 — Anti-Detection Deep Dive
Token Pool Management
For sustained collection at scale, run multiple app tokens. Each token has its own 800 req/min limit. With 5 tokens you get 4,000 req/min — enough to monitor hundreds of channels continuously.
# Multi-credential token pool setup
from twitch_auth import TwitchTokenManager, AppCredential
manager = TwitchTokenManager(
credentials=[
AppCredential(client_id="cid1", client_secret="cs1"),
AppCredential(client_id="cid2", client_secret="cs2"),
AppCredential(client_id="cid3", client_secret="cs3"),
]
)
Each call to manager.get_token() round-robins across available, non-rate-limited tokens. When all tokens are exhausted, it waits for the earliest reset.
Rate Limit Header Parsing
Always parse Twitch's rate limit headers and adapt accordingly:
def adaptive_sleep(remaining: int, reset_at: float) -> None:
"""
Sleep proportionally based on how close we are to the rate limit.
If >100 requests remaining: no sleep.
If 10-100 remaining: 50ms sleep between requests.
If <10 remaining: wait for reset.
"""
if remaining <= 0:
wait = max(1.0, reset_at - time.time())
time.sleep(wait)
elif remaining < 10:
time.sleep(0.5)
elif remaining < 100:
time.sleep(0.05)
# else: no sleep needed
GQL Client-Integrity Challenges
The integrity token system uses a challenge-response flow:
- Twitch's JS calls an internal endpoint to receive a challenge nonce
- The JS solves the challenge using browser fingerprint data (canvas hash, WebGL info, installed fonts, etc.)
- The solved challenge is sent to
https://gql.twitch.tv/integrityfor a signed JWT - That JWT is attached as
Client-Integrityon GQL requests
The JWT is valid for approximately one hour. The most reliable approach for production scrapers is to use Playwright to automate step 1-3 once per hour (see the get_integrity_token.py script above), then reuse the captured token for all GQL requests until it expires.
An alternative is to use the subset of GQL queries that still accept anonymous requests (listed in ANON_COMPATIBLE in the GQL module). These are mostly stream metadata queries — enough for most monitoring use cases without needing the integrity token at all.
Fastly CDN Fingerprinting
Fastly performs TLS fingerprinting (JA3/JA3S) and HTTP/2 settings fingerprinting. httpx with HTTP/2 enabled produces settings frames closer to Chrome than requests + urllib3. For maximum fingerprint compatibility:
import httpx
# HTTP/2 client with browser-realistic settings
client = httpx.Client(
http2=True,
timeout=httpx.Timeout(20.0, connect=10.0),
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Origin": "https://www.twitch.tv",
"Referer": "https://www.twitch.tv/",
"Sec-Ch-Ua": '"Chromium";v="124", "Google Chrome";v="124"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
},
)
For TLS fingerprint manipulation beyond what httpx provides, curl_cffi offers libcurl with Chrome-impersonation at the TLS level:
# pip install curl-cffi
from curl_cffi import requests as cffi_requests
response = cffi_requests.get(
"https://gql.twitch.tv/gql",
impersonate="chrome124",
headers=your_headers,
data=your_payload,
)
IP Reputation and Residential Proxies
Datacenter IPs (AWS us-east-1, DigitalOcean NYC, Hetzner) are known to Fastly's IP intelligence. They receive lower rate limits at the CDN edge, before your requests reach Twitch's application servers. Symptoms: 429s arriving faster than your rate limit headers suggest, or GQL returning 403 on queries that should work.
Residential proxies solve this because the IPs are associated with real ISP subscribers, not cloud providers.
ThorData provides a residential proxy network that works well for Twitch collection:
import httpx
PROXY_URL = "http://user:[email protected]:9000"
client = httpx.Client(
proxy=PROXY_URL,
http2=True,
timeout=30,
)
For sustained monitoring where you don't want IP rotation mid-session (which can look suspicious), use sticky sessions if your proxy provider supports them — this keeps a consistent IP for the duration of a monitoring window.
Request Timing for Sustained Polling
For long-running monitoring jobs, avoid perfectly regular request cadences — they're a pattern Twitch's anomaly detection can flag. Add small jitter:
import random
import time
def jittered_sleep(base_secs: float, jitter_pct: float = 0.2) -> None:
"""Sleep for base_secs ± jitter_pct."""
jitter = base_secs * jitter_pct
time.sleep(base_secs + random.uniform(-jitter, jitter))
Part 13 — SQLite Data Storage Schema
A production schema for storing all collected Twitch data with indexes optimized for common queries.
-- Full schema for Twitch data collection
-- Run this once to set up the database
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -32000; -- 32MB page cache
CREATE TABLE IF NOT EXISTS channels (
broadcaster_id TEXT PRIMARY KEY,
login TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
broadcaster_type TEXT NOT NULL, -- 'partner', 'affiliate', ''
description TEXT,
profile_image_url TEXT,
created_at TEXT,
follower_count INTEGER DEFAULT 0,
last_updated REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS clips (
clip_id TEXT PRIMARY KEY,
url TEXT NOT NULL,
broadcaster_id TEXT NOT NULL,
broadcaster_name TEXT NOT NULL,
creator_id TEXT,
creator_name TEXT,
video_id TEXT,
game_id TEXT,
game_name TEXT,
title TEXT,
view_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
thumbnail_url TEXT,
duration REAL,
vod_offset INTEGER,
language TEXT,
is_featured INTEGER DEFAULT 0,
collected_at REAL NOT NULL,
FOREIGN KEY (broadcaster_id) REFERENCES channels(broadcaster_id)
);
CREATE TABLE IF NOT EXISTS vods (
video_id TEXT PRIMARY KEY,
stream_id TEXT,
user_id TEXT NOT NULL,
user_login TEXT NOT NULL,
title TEXT,
description TEXT,
created_at TEXT NOT NULL,
published_at TEXT,
url TEXT,
thumbnail_url TEXT,
viewable TEXT,
view_count INTEGER DEFAULT 0,
language TEXT,
vod_type TEXT NOT NULL, -- 'archive', 'highlight', 'upload'
duration_raw TEXT,
duration_secs INTEGER DEFAULT 0,
has_muted_audio INTEGER DEFAULT 0,
muted_segments TEXT, -- JSON array
collected_at REAL NOT NULL,
FOREIGN KEY (user_id) REFERENCES channels(broadcaster_id)
);
CREATE TABLE IF NOT EXISTS stream_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
broadcaster_id TEXT NOT NULL,
broadcaster_login TEXT NOT NULL,
timestamp REAL NOT NULL,
viewer_count INTEGER NOT NULL,
game_id TEXT,
game_name TEXT,
title TEXT,
tags TEXT, -- comma-separated
FOREIGN KEY (broadcaster_id) REFERENCES channels(broadcaster_id)
);
CREATE TABLE IF NOT EXISTS stream_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
broadcaster_id TEXT NOT NULL,
event_type TEXT NOT NULL, -- 'online', 'offline', 'game_change', 'title_change'
timestamp REAL NOT NULL,
old_value TEXT,
new_value TEXT
);
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel TEXT NOT NULL,
username TEXT NOT NULL,
display_name TEXT,
user_id TEXT,
message TEXT NOT NULL,
timestamp REAL NOT NULL,
emotes TEXT, -- comma-separated emote IDs
badges TEXT, -- comma-separated badge names
is_subscriber INTEGER DEFAULT 0,
is_moderator INTEGER DEFAULT 0,
is_vip INTEGER DEFAULT 0,
bits INTEGER DEFAULT 0,
color TEXT,
message_id TEXT
);
CREATE TABLE IF NOT EXISTS category_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
game_id TEXT NOT NULL,
game_name TEXT NOT NULL,
viewer_count INTEGER NOT NULL,
stream_count INTEGER NOT NULL,
rank INTEGER NOT NULL,
timestamp REAL NOT NULL
);
-- Performance indexes
CREATE INDEX IF NOT EXISTS idx_clips_broadcaster
ON clips(broadcaster_id, created_at);
CREATE INDEX IF NOT EXISTS idx_clips_game
ON clips(game_id, view_count DESC);
CREATE INDEX IF NOT EXISTS idx_clips_views
ON clips(view_count DESC);
CREATE INDEX IF NOT EXISTS idx_clips_created
ON clips(created_at);
CREATE INDEX IF NOT EXISTS idx_vods_user
ON vods(user_id, created_at);
CREATE INDEX IF NOT EXISTS idx_vods_type
ON vods(vod_type, created_at);
CREATE INDEX IF NOT EXISTS idx_snapshots_broadcaster_ts
ON stream_snapshots(broadcaster_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_chat_channel_ts
ON chat_messages(channel, timestamp);
CREATE INDEX IF NOT EXISTS idx_chat_username
ON chat_messages(username);
CREATE INDEX IF NOT EXISTS idx_chat_subscriber
ON chat_messages(channel, is_subscriber);
CREATE INDEX IF NOT EXISTS idx_category_game_ts
ON category_snapshots(game_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_category_ts
ON category_snapshots(timestamp, rank);
Part 14 — Output Examples
Clip JSON
{
"clip_id": "ObedientGoldenSpiderdoomTheTarFu-jh7K2v1COfPC6SnJ",
"url": "https://www.twitch.tv/xqc/clip/ObedientGoldenSpiderdoomTheTarFu-jh7K2v1COfPC6SnJ",
"broadcaster_id": "71092938",
"broadcaster_name": "xQc",
"creator_id": "12345678",
"creator_name": "someuser",
"video_id": "2089432156",
"game_id": "27471",
"game_name": "Minecraft",
"title": "xQc reaction to chat prediction gone wrong",
"view_count": 284931,
"created_at": "2026-03-25T14:22:07Z",
"thumbnail_url": "https://clips-media-assets2.twitch.tv/obedient.../preview-480x272.jpg",
"duration": 28.5,
"vod_offset": 14823,
"language": "en",
"is_featured": false
}
VOD JSON
{
"video_id": "2089432156",
"stream_id": "43021876543",
"user_id": "71092938",
"user_login": "xqc",
"title": "!MINECRAFT HARDCORE DAY 47 | !crosshair !prime",
"created_at": "2026-03-25T10:00:14Z",
"published_at": "2026-03-25T10:00:14Z",
"url": "https://www.twitch.tv/videos/2089432156",
"viewable": "public",
"view_count": 142034,
"language": "en",
"vod_type": "archive",
"duration_raw": "11h47m32s",
"duration_secs": 42452,
"has_muted_audio": true,
"muted_segments": [
{"offset": 8340, "duration": 420},
{"offset": 19800, "duration": 180}
]
}
Stream Snapshot JSON
{
"broadcaster_id": "71092938",
"broadcaster_login": "xqc",
"timestamp": 1743421200.0,
"viewer_count": 87432,
"game_id": "27471",
"game_name": "Minecraft",
"title": "MINECRAFT HARDCORE DAY 47 | !prime !crosshair",
"tags": ["English", "Competitive"],
"is_mature": false
}
Chat Message JSON
{
"channel": "xqc",
"username": "pogchamp_fan99",
"display_name": "PogChamp_Fan99",
"user_id": "98765432",
"message": "xqcL xqcL xqcL this is insane POGGERS",
"timestamp": 1743421205.3,
"emotes": ["emotesv2_abc123", "301695605"],
"badges": ["subscriber", "bits"],
"is_subscriber": true,
"is_moderator": false,
"is_vip": false,
"bits": 0,
"color": "#FF4500",
"message_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
}
Part 15 — Complete End-to-End Script
Authenticate, find top channels for a game, pull clips and VODs from the last 7 days, store everything in SQLite, print summary stats.
"""
twitch_pipeline.py — End-to-end data collection pipeline
Usage: python3 twitch_pipeline.py --game "League of Legends" --days 7
"""
import os
import sys
import json
import time
import logging
import sqlite3
import argparse
from datetime import datetime, timedelta, timezone
import httpx
from twitch_auth import TwitchTokenManager, AppCredential
from twitch_channels import ChannelScraper
from twitch_clips import ClipScraper
from twitch_vods import VODScraper
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s — %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("pipeline")
HELIX = "https://api.twitch.tv/helix"
DB_PATH = "twitch_pipeline.db"
def init_db(db_path: str) -> None:
conn = sqlite3.connect(db_path)
conn.executescript("""
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS channels (
broadcaster_id TEXT PRIMARY KEY, login TEXT, display_name TEXT,
broadcaster_type TEXT, follower_count INTEGER, collected_at REAL
);
CREATE TABLE IF NOT EXISTS clips (
clip_id TEXT PRIMARY KEY, broadcaster_id TEXT, broadcaster_name TEXT,
game_name TEXT, title TEXT, view_count INTEGER, created_at TEXT,
duration REAL, vod_offset INTEGER, collected_at REAL
);
CREATE TABLE IF NOT EXISTS vods (
video_id TEXT PRIMARY KEY, user_id TEXT, user_login TEXT,
title TEXT, vod_type TEXT, duration_secs INTEGER, view_count INTEGER,
created_at TEXT, has_muted_audio INTEGER, collected_at REAL
);
CREATE INDEX IF NOT EXISTS idx_clips_views ON clips(view_count DESC);
CREATE INDEX IF NOT EXISTS idx_clips_broadcaster ON clips(broadcaster_id);
""")
conn.commit()
conn.close()
def get_game_id(client: httpx.Client, manager: TwitchTokenManager, game_name: str) -> str:
token = manager.get_token()
resp = client.get(
f"{HELIX}/games",
headers=token.auth_headers(),
params={"name": game_name},
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
data = resp.json().get("data", [])
if not data:
raise ValueError(f"Game not found: {game_name!r}")
return data[0]["id"]
def get_top_channels_for_game(
client: httpx.Client,
manager: TwitchTokenManager,
game_id: str,
top_n: int = 20,
) -> list[dict]:
"""Get the top N channels currently live in a game category."""
token = manager.get_token()
resp = client.get(
f"{HELIX}/streams",
headers=token.auth_headers(),
params={"game_id": game_id, "first": min(top_n, 100)},
)
token.update_rate_limits(resp.headers)
resp.raise_for_status()
return resp.json().get("data", [])[:top_n]
def save_channel(conn: sqlite3.Connection, ch: dict, follower_count: int) -> None:
conn.execute(
"""INSERT OR REPLACE INTO channels
(broadcaster_id, login, display_name, broadcaster_type, follower_count, collected_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(ch["user_id"], ch["user_login"], ch["user_name"], "", follower_count, time.time()),
)
def save_clip(conn: sqlite3.Connection, clip) -> None:
conn.execute(
"""INSERT OR IGNORE INTO clips
(clip_id, broadcaster_id, broadcaster_name, game_name, title,
view_count, created_at, duration, vod_offset, collected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
clip.clip_id, clip.broadcaster_id, clip.broadcaster_name,
clip.game_name, clip.title, clip.view_count, clip.created_at,
clip.duration, clip.vod_offset, time.time(),
),
)
def save_vod(conn: sqlite3.Connection, vod) -> None:
conn.execute(
"""INSERT OR IGNORE INTO vods
(video_id, user_id, user_login, title, vod_type, duration_secs,
view_count, created_at, has_muted_audio, collected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
vod.video_id, vod.user_id, vod.user_login, vod.title,
vod.vod_type, vod.duration_secs, vod.view_count,
vod.created_at, int(vod.has_muted_audio), time.time(),
),
)
def print_summary(db_path: str) -> None:
conn = sqlite3.connect(db_path)
channel_count = conn.execute("SELECT COUNT(*) FROM channels").fetchone()[0]
clip_count = conn.execute("SELECT COUNT(*) FROM clips").fetchone()[0]
vod_count = conn.execute("SELECT COUNT(*) FROM vods").fetchone()[0]
print(f"\n{'='*60}")
print(f" COLLECTION SUMMARY")
print(f"{'='*60}")
print(f" Channels collected : {channel_count}")
print(f" Clips collected : {clip_count}")
print(f" VODs collected : {vod_count}")
if clip_count > 0:
top_clips = conn.execute(
"""SELECT broadcaster_name, title, view_count, created_at
FROM clips ORDER BY view_count DESC LIMIT 5"""
).fetchall()
print(f"\n TOP 5 CLIPS BY VIEW COUNT:")
for i, (name, title, views, created) in enumerate(top_clips, 1):
print(f" {i}. [{name}] {title[:50]} — {views:,} views ({created[:10]})")
if vod_count > 0:
muted_count = conn.execute(
"SELECT COUNT(*) FROM vods WHERE has_muted_audio = 1"
).fetchone()[0]
avg_duration = conn.execute(
"SELECT AVG(duration_secs) FROM vods"
).fetchone()[0] or 0
print(f"\n VOD STATS:")
print(f" Avg duration : {avg_duration/3600:.1f} hours")
print(f" With muted audio: {muted_count}/{vod_count}")
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(description="Twitch data pipeline")
parser.add_argument("--game", required=True, help="Game name to collect data for")
parser.add_argument("--days", type=int, default=7, help="Days back to collect clips")
parser.add_argument("--top-channels", type=int, default=20, help="Top N channels to analyze")
parser.add_argument("--min-clip-views", type=int, default=100, help="Min views per clip")
args = parser.parse_args()
client_id = os.environ.get("TWITCH_CLIENT_ID")
client_secret = os.environ.get("TWITCH_CLIENT_SECRET")
if not client_id or not client_secret:
sys.exit("Set TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET env vars")
init_db(DB_PATH)
manager = TwitchTokenManager(
credentials=[AppCredential(client_id=client_id, client_secret=client_secret)]
)
clip_scraper = ClipScraper(manager)
vod_scraper = VODScraper(manager)
channel_scraper = ChannelScraper(manager)
http_client = httpx.Client(timeout=20, http2=True)
# Step 1: Find the game
logger.info(f"Looking up game: {args.game!r}")
game_id = get_game_id(http_client, manager, args.game)
logger.info(f"Game ID: {game_id}")
# Step 2: Get top channels currently live
logger.info(f"Fetching top {args.top_channels} live channels")
live_streams = get_top_channels_for_game(http_client, manager, game_id, args.top_channels)
logger.info(f"Found {len(live_streams)} live channels")
# Step 3: For each channel, pull clips and VODs
now = datetime.now(timezone.utc)
started_at = (now - timedelta(days=args.days)).strftime("%Y-%m-%dT%H:%M:%SZ")
ended_at = now.strftime("%Y-%m-%dT%H:%M:%SZ")
conn = sqlite3.connect(DB_PATH)
total_clips = 0
total_vods = 0
for stream in live_streams:
broadcaster_id = stream["user_id"]
login = stream["user_login"]
logger.info(f"Processing {login} ({stream['viewer_count']:,} viewers live)")
# Follower count
try:
follower_count = channel_scraper.get_follower_count(broadcaster_id)
except Exception:
follower_count = 0
save_channel(conn, stream, follower_count)
# Clips
clip_batch = list(clip_scraper.iter_clips_by_broadcaster(
broadcaster_id=broadcaster_id,
started_at=started_at,
ended_at=ended_at,
min_views=args.min_clip_views,
max_results=100,
))
for clip in clip_batch:
save_clip(conn, clip)
total_clips += len(clip_batch)
logger.info(f" {len(clip_batch)} clips (>{args.min_clip_views} views, last {args.days}d)")
# VODs (last 30 days, up to 20 per channel)
vod_batch = []
cutoff_ts = (now - timedelta(days=30)).timestamp()
for vod in vod_scraper.iter_vods(broadcaster_id, max_results=20):
created = datetime.fromisoformat(vod.created_at.replace("Z", "+00:00"))
if created.timestamp() < cutoff_ts:
break
save_vod(conn, vod)
vod_batch.append(vod)
total_vods += len(vod_batch)
logger.info(f" {len(vod_batch)} VODs (last 30d)")
conn.commit()
conn.close()
logger.info(f"Done. Collected {total_clips} clips and {total_vods} VODs")
print_summary(DB_PATH)
if __name__ == "__main__":
main()
Run it:
export TWITCH_CLIENT_ID="your_client_id"
export TWITCH_CLIENT_SECRET="your_client_secret"
python3 twitch_pipeline.py --game "League of Legends" --days 7 --top-channels 20
Sample output:
10:14:32 INFO pipeline — Looking up game: 'League of Legends'
10:14:32 INFO pipeline — Game ID: 21779
10:14:33 INFO pipeline — Fetching top 20 live channels
10:14:33 INFO pipeline — Found 20 live channels
10:14:33 INFO pipeline — Processing faker (42891 viewers live)
10:14:34 INFO pipeline — 87 clips (>100 views, last 7d)
10:14:35 INFO pipeline — 20 VODs (last 30d)
...
============================================================
COLLECTION SUMMARY
============================================================
Channels collected : 20
Clips collected : 1243
VODs collected : 312
TOP 5 CLIPS BY VIEW COUNT:
1. [faker] Faker's 10,000th kill — 891,234 views (2026-03-27)
2. [caedrel] Perfect prediction call — 342,091 views (2026-03-28)
3. [tyler1] Rage quit speedrun — 289,443 views (2026-03-26)
4. [doublelift] 1v5 pentakill — 201,887 views (2026-03-29)
5. [imaqtpie] Chat controls movement — 178,334 views (2026-03-25)
VOD STATS:
Avg duration : 5.3 hours
With muted audio: 47/312
Part 16 — Scheduling Regular Data Collection
For trend analysis you need consistent snapshots over time. Set up cron jobs for continuous collection.
Cron Setup
# Edit crontab: crontab -e
# Category snapshot every 30 minutes
*/30 * * * * cd /home/user/twitch-scraper && python3 snapshot_categories.py >> /var/log/twitch/categories.log 2>&1
# Viewer count monitoring every 2 minutes for tracked channels
*/2 * * * * cd /home/user/twitch-scraper && python3 poll_streams.py >> /var/log/twitch/streams.log 2>&1
# Clip collection once per hour
0 * * * * cd /home/user/twitch-scraper && python3 collect_clips.py --hours-back 2 >> /var/log/twitch/clips.log 2>&1
# Weekly VOD archive for tracked channels
0 6 * * 1 cd /home/user/twitch-scraper && python3 collect_vods.py >> /var/log/twitch/vods.log 2>&1
# Token refresh check every 6 hours
0 */6 * * * cd /home/user/twitch-scraper && python3 -c "from twitch_auth import *; make_manager('$TWITCH_CLIENT_ID', '$TWITCH_CLIENT_SECRET')" >> /var/log/twitch/auth.log 2>&1
Viewer Trend Queries
After a few days of collection, query your SQLite database to analyze trends:
"""
analyze_trends.py — Viewer trend analysis from collected data
"""
import sqlite3
from datetime import datetime
DB_PATH = "stream_monitor.db"
def peak_viewers_by_day(broadcaster_id: str) -> list[dict]:
conn = sqlite3.connect(DB_PATH)
rows = conn.execute(
"""SELECT
DATE(timestamp, 'unixepoch') AS day,
MAX(viewer_count) AS peak,
AVG(viewer_count) AS avg_viewers,
COUNT(*) AS sample_count
FROM stream_snapshots
WHERE broadcaster_id = ?
GROUP BY day
ORDER BY day""",
(broadcaster_id,),
).fetchall()
conn.close()
return [
{"day": r[0], "peak": r[1], "avg": round(r[2], 0), "samples": r[3]}
for r in rows
]
def game_time_distribution(broadcaster_id: str) -> list[dict]:
"""How many minutes was each game played (approx from snapshots)."""
conn = sqlite3.connect(DB_PATH)
rows = conn.execute(
"""SELECT game_name, COUNT(*) * 2 AS approx_minutes
FROM stream_snapshots
WHERE broadcaster_id = ?
GROUP BY game_name
ORDER BY approx_minutes DESC""",
(broadcaster_id,),
).fetchall()
conn.close()
return [{"game": r[0], "approx_minutes": r[1]} for r in rows]
def category_viewer_trend(game_id: str, limit_snapshots: int = 100) -> list[dict]:
"""Category viewer count over time for trend analysis."""
conn = sqlite3.connect(DB_PATH)
rows = conn.execute(
"""SELECT timestamp, viewer_count, rank
FROM category_snapshots
WHERE game_id = ?
ORDER BY timestamp DESC LIMIT ?""",
(game_id, limit_snapshots),
).fetchall()
conn.close()
return [
{
"timestamp": r[0],
"date": datetime.fromtimestamp(r[0]).strftime("%Y-%m-%d %H:%M"),
"viewer_count": r[1],
"rank": r[2],
}
for r in reversed(rows)
]
def chat_activity_by_hour(channel: str) -> list[dict]:
"""Messages per hour to find peak chat activity times."""
conn = sqlite3.connect(DB_PATH)
rows = conn.execute(
"""SELECT
STRFTIME('%H', timestamp, 'unixepoch') AS hour,
COUNT(*) AS message_count,
COUNT(DISTINCT username) AS unique_chatters,
SUM(is_subscriber) AS subscriber_messages
FROM chat_messages
WHERE channel = ?
GROUP BY hour
ORDER BY hour""",
(channel.lstrip("#"),),
).fetchall()
conn.close()
return [
{
"hour_utc": int(r[0]),
"messages": r[1],
"unique_chatters": r[2],
"subscriber_messages": r[3],
}
for r in rows
]
if __name__ == "__main__":
# Example: show peak viewers by day for a tracked channel
broadcaster_id = "71092938" # xQc
print("Peak viewers by day:")
for row in peak_viewers_by_day(broadcaster_id):
print(f" {row['day']}: peak={row['peak']:,}, avg={row['avg']:,.0f}, samples={row['samples']}")
Ethics and Legal Considerations
Helix API — Official and Clear
The Helix API is Twitch's official developer API. Accessing it requires agreeing to the Twitch Developer Services Agreement. Key relevant terms:
- App tokens can be used for read-only data collection without user consent
- You cannot resell raw API data to third parties without Twitch's permission
- Automated access that degrades service for other users is prohibited
- Rate limits are contractual, not just technical — bypassing them violates ToS
For most analytics, research, and tooling use cases, Helix is entirely appropriate. The ToS is designed to prevent commercial data resale and platform abuse, not academic or operational analytics.
GQL Endpoint — Gray Area
The GraphQL endpoint at gql.twitch.tv is undocumented and intended for internal use by Twitch's own web application. Using it is not explicitly authorized by Twitch's developer terms. In practice:
- The endpoint has been publicly accessible for years and the community openly discusses it
- Many popular Twitch tools (chat analytics platforms, clip tools, schedule trackers) use it
- Twitch has not pursued legal action against GQL usage, but could theoretically cite the CFAA or ToS violation
- The Client-Integrity requirement was added specifically to discourage third-party GQL access
The pragmatic assessment: GQL usage for non-commercial, non-abusive purposes exists in a legal gray zone. The risk profile is low, but it's not zero. For any production commercial application, build on Helix where possible and treat GQL data as supplementary.
Privacy Considerations
- Twitch usernames and public clip data are publicly accessible — no special privacy concern
- Chat logs contain pseudonymous public messages, but bulk collection creates a quasi-surveillance record
- Do not collect private information (emails, IP addresses, payment data) — this isn't accessible through these APIs anyway
- If storing data long-term, consider a retention policy for chat logs
- In the EU, GDPR may apply to chat message data if you're processing it commercially
Troubleshooting
Token Expired (401 on Helix)
httpx.HTTPStatusError: Client error '401 Unauthorized'
The access token has expired (Helix tokens last 60 days but can be revoked). The TwitchTokenManager handles this automatically — call manager.get_token() before every request rather than caching the token object.
Manual check:
curl -s -o /dev/null -w "%{http_code}" \
https://id.twitch.tv/oauth2/validate \
-H "Authorization: OAuth YOUR_TOKEN"
# 200 = valid, 401 = expired
Rate Limited (429)
httpx.HTTPStatusError: Client error '429 Too Many Requests'
Parse Ratelimit-Reset from the response headers — it's a Unix timestamp for when the limit resets. Wait until that timestamp, then retry. Do not retry immediately. The TwitchTokenManager handles this when you call token.update_rate_limits(resp.headers) on every response.
If you're hitting 429 faster than the documented 800 req/min limit, you're likely on a datacenter IP that's being throttled at the Fastly edge layer before requests reach Twitch's application. Switch to residential proxies.
GQL Hash Expired (Persisted Query Not Found)
{"errors": [{"message": "PersistedQueryNotFound"}]}
Twitch has rotated the persisted query hash for that operation. You need to extract the new hash from Twitch's front-end JavaScript:
# Download and search the main Twitch JS bundle for the operation name
curl -s https://www.twitch.tv | grep -o 'assets/[^"]*\.js' | head -5 | while read f; do
curl -s "https://www.twitch.tv/$f" | python3 -c "
import sys, re
content = sys.stdin.read()
# Find sha256Hash values near the operation name
for m in re.finditer(r'\"sha256Hash\":\"([a-f0-9]{64})\"', content):
print(m.group(1))
" 2>/dev/null | head -20
done
Alternatively, open Twitch in Chrome DevTools, filter network requests to gql.twitch.tv, and inspect the payload of any GraphQL request to see the current hash for that operation.
Empty Clip Results
If iter_clips_by_broadcaster returns nothing despite a channel having many clips, check:
- Date range too narrow — Clips API has an upper limit of
started_attoended_atspans. Try a wider range. - Wrong broadcaster_id — Verify with
/users?login=channelnamefirst. - Broadcaster has no clips — New or niche channels sometimes have zero clips.
- API lag — Clips can take 5-10 minutes to appear in the API after creation.
Pagination Edge Cases
The Helix clips endpoint can return fewer than first results even when more clips exist. Never assume an empty result means end of pagination — always check the cursor field in the pagination object. If cursor is absent or empty, you've reached the end.
Some endpoints cap total results regardless of pagination: Helix /clips for a broadcaster effectively stops returning results after roughly 1,000 clips in a single query session, even with valid cursors. If you need the full historical archive, split requests by time window (e.g., month-by-month).
Chat Collector Disconnects
Twitch's IRC server sends a PING every few minutes and expects a PONG response within the timeout window. The TwitchChatCollector handles this in the receive loop. If you see frequent disconnects, ensure your socket timeout is set longer than the expected PING interval (300s in the implementation above handles this). Also, if you're joining more than 20 channels, Twitch may throttle JOIN commands — add a 0.3-second delay between joins for large channel lists.
GQL Returns 403 After Integrity Token Capture
Integrity tokens are tied to the browser session that generated them. If you capture the token and then make GQL requests from a different IP or with a different TLS fingerprint, Twitch may reject it. For best results:
- Capture the integrity token using the same network and client setup you'll use for requests
- Use the same
User-AgentandClient-IDin both the browser session and API calls - Re-capture the token if requests start failing — it may have expired or been invalidated