Extracting Twitch Data in 2026: Streams, Clips, Channels, and VODs via the Helix API
Extracting Twitch Data in 2026: Streams, Clips, Channels, and VODs via the Helix API
Twitch generates an enormous amount of real-time data — hundreds of thousands of live streams at any moment, millions of clips, chat logs, subscriber counts, and viewing metrics. If you're building a streaming analytics tool, tracking esports viewership, studying online communities, or doing market research on the gaming industry, Twitch's Helix API is your entry point.
Unlike platforms that force you to reverse-engineer undocumented endpoints, Twitch provides a proper documented API. The tricky parts are the OAuth requirements, the surprisingly strict rate limits once you push past casual use, and the gaps in what the API exposes (chat data and historical viewership require separate approaches). This guide covers it all.
What Data the Helix API Exposes
Streams (live data): - Viewer count (real-time) - Stream title - Game/category name and ID - Started-at timestamp - Language - Tags (up to 10 per stream) - Is-mature flag - Thumbnail URL
Clips: - Title, URL, creator name - Broadcaster name - View count - Duration in seconds - Created-at timestamp - Game ID - Thumbnail URL
Channels: - Broadcaster login and display name - Current game - Stream title - Language - Profile image URL - Account creation date - Description / bio
VODs (videos): - Title, URL, type (archive, highlight, upload) - Duration (e.g., "2h30m15s") - View count - Created-at and published-at timestamps - Language - Thumbnail URL template
Games/Categories: - Name, ID - Box art URL - Tags
What the API does NOT expose: - Historical viewership data beyond "current viewer count" (no trend data over time) - Subscriber counts or subscription revenue (requires broadcaster OAuth, not app token) - Chat message history (requires WebSocket IRC connection) - Banned users or moderation logs (requires broadcaster OAuth) - Bits/donation amounts (private by default)
Getting API Access
Twitch requires you to register an application before making any API calls. Here's the complete setup:
- Go to the Twitch Developer Console (requires a Twitch account)
- Register a new application — name it anything, set OAuth Redirect URL to
http://localhost - Select "Other" for category, accept terms
- After creation, you get a Client ID displayed immediately
- Click "New Secret" to generate a Client Secret (save this — it won't be shown again)
For data extraction without accessing user-specific private data, use the Client Credentials OAuth flow to get an App Access Token:
import requests
import time
import json
from typing import Optional
class TwitchClient:
"""
Full-featured Twitch Helix API client with automatic token refresh,
rate limit tracking, and retry logic.
"""
AUTH_URL = "https://id.twitch.tv/oauth2/token"
BASE_URL = "https://api.twitch.tv/helix"
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
self.token: Optional[str] = None
self.token_expires_at: float = 0
self.rate_remaining: int = 800
self.rate_limit: int = 800
self.rate_reset_at: float = 0
self._request_count: int = 0
def _get_token(self) -> str:
"""Get or refresh the app access token."""
if self.token and time.time() < self.token_expires_at - 300:
return self.token
resp = requests.post(
self.AUTH_URL,
params={
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials",
},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
self.token = data["access_token"]
self.token_expires_at = time.time() + data["expires_in"]
print(f"Token obtained, expires in {data['expires_in']}s")
return self.token
def get(self, endpoint: str, params: dict = None) -> dict:
"""
Make an authenticated GET request to the Helix API.
Handles token management, rate limit tracking, and 429 backoff automatically.
"""
token = self._get_token()
url = f"{self.BASE_URL}/{endpoint}"
headers = {
"Authorization": f"Bearer {token}",
"Client-Id": self.client_id,
}
for attempt in range(3):
resp = requests.get(
url, headers=headers, params=params or {}, timeout=15
)
# Update rate limit state from response headers
self.rate_remaining = int(resp.headers.get("Ratelimit-Remaining", self.rate_remaining))
self.rate_limit = int(resp.headers.get("Ratelimit-Limit", self.rate_limit))
reset_header = resp.headers.get("Ratelimit-Reset", "0")
self.rate_reset_at = float(reset_header)
self._request_count += 1
# Proactive throttle when getting close to limit
if self.rate_remaining < 20:
wait_until = self.rate_reset_at
wait_time = max(0, wait_until - time.time()) + 1
print(f"Approaching rate limit ({self.rate_remaining} remaining). "
f"Waiting {wait_time:.0f}s for reset...")
time.sleep(wait_time)
if resp.status_code == 429:
reset_time = float(resp.headers.get("Ratelimit-Reset", time.time() + 60))
wait = max(reset_time - time.time(), 5)
print(f"Rate limited (attempt {attempt + 1}/3). Waiting {wait:.0f}s...")
time.sleep(wait)
continue
if resp.status_code == 401:
# Token may have been revoked — force refresh
self.token = None
self.token_expires_at = 0
token = self._get_token()
headers["Authorization"] = f"Bearer {token}"
continue
resp.raise_for_status()
return resp.json()
raise Exception(f"Failed after 3 attempts: {endpoint}")
def paginate(
self,
endpoint: str,
params: dict,
max_results: int,
data_key: str = "data",
) -> list[dict]:
"""
Fetch all results from a paginated endpoint up to max_results.
Uses cursor-based pagination via the 'after' parameter.
"""
results = []
cursor = None
while len(results) < max_results:
page_params = {**params, "first": min(100, max_results - len(results))}
if cursor:
page_params["after"] = cursor
data = self.get(endpoint, page_params)
batch = data.get(data_key, [])
if not batch:
break
results.extend(batch)
cursor = data.get("pagination", {}).get("cursor")
if not cursor:
break
time.sleep(0.15) # light pacing between pages
return results[:max_results]
Getting Live Stream Data
The streams endpoint is the core of Twitch data extraction:
def get_live_streams(
client: TwitchClient,
game_id: str = None,
language: str = None,
user_logins: list[str] = None,
max_results: int = 500,
) -> list[dict]:
"""
Get currently live streams.
Can filter by game_id, language, or specific user logins.
Returns streams sorted by viewer count descending.
"""
params = {}
if game_id:
params["game_id"] = game_id
if language:
params["language"] = language
if user_logins:
# API accepts up to 100 login names per request
params["user_login"] = user_logins[:100]
raw_streams = client.paginate("streams", params, max_results)
streams = []
for s in raw_streams:
streams.append({
"id": s["id"],
"user_id": s["user_id"],
"user_login": s["user_login"],
"user_name": s["user_name"],
"game_id": s["game_id"],
"game_name": s["game_name"],
"title": s["title"],
"viewer_count": s["viewer_count"],
"started_at": s["started_at"],
"language": s["language"],
"thumbnail_url": s.get("thumbnail_url", ""),
"tags": s.get("tags", []),
"is_mature": s.get("is_mature", False),
})
return sorted(streams, key=lambda x: x["viewer_count"], reverse=True)
# Example: top 200 English streams in the "Just Chatting" category
client = TwitchClient("YOUR_CLIENT_ID", "YOUR_CLIENT_SECRET")
streams = get_live_streams(client, language="en", max_results=200)
for s in streams[:5]:
print(f"{s['user_name']:30s} {s['viewer_count']:>8,} viewers [{s['game_name']}]")
Fetching Popular Clips
Clips are short highlights that viewers and streamers create. They're great for identifying viral moments, tracking what content resonates, and measuring creator engagement:
from datetime import datetime, timedelta, timezone
def get_clips(
client: TwitchClient,
broadcaster_id: str = None,
game_id: str = None,
days_back: int = 7,
max_results: int = 100,
started_at: str = None,
ended_at: str = None,
) -> list[dict]:
"""
Get popular clips from a channel or game.
Specify broadcaster_id for a channel's clips, or game_id for a category.
"""
if not started_at:
started_at = (
datetime.now(timezone.utc) - timedelta(days=days_back)
).isoformat()
if not ended_at:
ended_at = datetime.now(timezone.utc).isoformat()
params = {
"started_at": started_at,
"ended_at": ended_at,
}
if broadcaster_id:
params["broadcaster_id"] = broadcaster_id
if game_id:
params["game_id"] = game_id
raw_clips = client.paginate("clips", params, max_results)
clips = []
for c in raw_clips:
clips.append({
"id": c["id"],
"url": c["url"],
"embed_url": c["embed_url"],
"title": c["title"],
"broadcaster_id": c["broadcaster_id"],
"broadcaster_name": c["broadcaster_name"],
"creator_name": c["creator_name"],
"view_count": c["view_count"],
"duration": c["duration"],
"created_at": c["created_at"],
"game_id": c["game_id"],
"thumbnail_url": c["thumbnail_url"],
"language": c.get("language", ""),
})
return sorted(clips, key=lambda x: x["view_count"], reverse=True)
# Top clips from Valorant in the last 7 days
# Valorant game_id: 516575
clips = get_clips(client, game_id="516575", days_back=7, max_results=50)
for c in clips[:5]:
print(f"{c['broadcaster_name']:25s} {c['view_count']:>8,} views {c['title'][:50]}")
Channel Information and Detailed Metadata
Batch-fetch channel info for up to 100 channels per request:
def get_user_info(
client: TwitchClient,
logins: list[str] = None,
ids: list[str] = None,
) -> list[dict]:
"""
Get user/channel information by login names or user IDs.
Up to 100 per request.
"""
params = {}
if logins:
params["login"] = logins[:100]
if ids:
params["id"] = ids[:100]
data = client.get("users", params)
users = []
for u in data.get("data", []):
users.append({
"id": u["id"],
"login": u["login"],
"display_name": u["display_name"],
"type": u["type"],
"broadcaster_type": u["broadcaster_type"],
"description": u["description"],
"profile_image_url": u["profile_image_url"],
"created_at": u["created_at"],
})
return users
def get_channel_info(
client: TwitchClient,
broadcaster_ids: list[str],
) -> list[dict]:
"""
Get channel-specific data (current game, title, language).
Requires broadcaster IDs, not logins.
"""
channels = []
for i in range(0, len(broadcaster_ids), 100):
batch = broadcaster_ids[i:i+100]
data = client.get("channels", {"broadcaster_id": batch})
for ch in data.get("data", []):
channels.append({
"broadcaster_id": ch["broadcaster_id"],
"broadcaster_login": ch["broadcaster_login"],
"broadcaster_name": ch["broadcaster_name"],
"game_id": ch["game_id"],
"game_name": ch["game_name"],
"title": ch["title"],
"delay": ch.get("delay", 0),
"tags": ch.get("tags", []),
"broadcaster_language": ch["broadcaster_language"],
})
time.sleep(0.2)
return channels
def get_full_channel_data(
client: TwitchClient,
user_logins: list[str],
) -> list[dict]:
"""
Combine user info and channel info for a list of logins.
Returns merged records with all available fields.
"""
users = get_user_info(client, logins=user_logins)
user_map = {u["id"]: u for u in users}
broadcaster_ids = list(user_map.keys())
channels = get_channel_info(client, broadcaster_ids)
result = []
for ch in channels:
user = user_map.get(ch["broadcaster_id"], {})
result.append({**user, **ch})
return result
# Look up top streamers
channel_data = get_full_channel_data(
client, ["shroud", "pokimane", "xqc", "hasanabi", "summit1g"]
)
for ch in channel_data:
print(f"{ch['display_name']:20s} {ch['broadcaster_type']:12s} {ch['game_name']}")
VOD and Highlight Metadata
Historical content metadata through the videos endpoint:
def get_vods(
client: TwitchClient,
user_id: str,
video_type: str = "archive",
sort: str = "time",
max_results: int = 50,
) -> list[dict]:
"""
Get VODs for a channel.
video_type: 'archive' (past broadcasts), 'highlight', or 'upload'
sort: 'time' (newest first), 'trending', or 'views'
"""
params = {
"user_id": user_id,
"type": video_type,
"sort": sort,
}
raw_vods = client.paginate("videos", params, max_results)
vods = []
for v in raw_vods:
vods.append({
"id": v["id"],
"user_id": v["user_id"],
"user_name": v["user_name"],
"title": v["title"],
"description": v.get("description", ""),
"created_at": v["created_at"],
"published_at": v["published_at"],
"url": v["url"],
"thumbnail_url": v["thumbnail_url"],
"viewable": v["viewable"],
"view_count": v["view_count"],
"language": v["language"],
"type": v["type"],
"duration": v["duration"],
})
return vods
def parse_duration_seconds(duration_str: str) -> int:
"""
Convert Twitch duration string to total seconds.
Format: '2h30m15s', '45m00s', '1h15s', etc.
"""
import re
total = 0
for value, unit in re.findall(r"(\d+)([hms])", duration_str):
n = int(value)
if unit == "h":
total += n * 3600
elif unit == "m":
total += n * 60
else:
total += n
return total
Games and Category Discovery
Enumerate top games for category-level analysis:
def get_top_games(client: TwitchClient, max_results: int = 100) -> list[dict]:
"""Get currently most-viewed game categories on Twitch."""
raw = client.paginate("games/top", {}, max_results)
return [
{
"id": g["id"],
"name": g["name"],
"box_art_url": g["box_art_url"],
"tags": g.get("tags", []),
}
for g in raw
]
def search_categories(client: TwitchClient, query: str, max_results: int = 20) -> list[dict]:
"""Search for game categories by name."""
params = {"query": query, "first": min(max_results, 100)}
data = client.get("search/categories", params)
return data.get("data", [])
def get_game_by_name(client: TwitchClient, name: str) -> Optional[dict]:
"""Get exact game info by name."""
data = client.get("games", {"name": name})
games = data.get("data", [])
return games[0] if games else None
SQLite Storage Schema
A schema built for time-series viewership tracking:
import sqlite3
from datetime import datetime, timezone
def init_twitch_db(db_path: str = "twitch_data.db") -> sqlite3.Connection:
"""Initialize Twitch data SQLite database."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
login TEXT UNIQUE NOT NULL,
display_name TEXT,
broadcaster_type TEXT,
description TEXT,
profile_image_url TEXT,
created_at TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS stream_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_time TEXT NOT NULL,
stream_id TEXT NOT NULL,
user_id TEXT NOT NULL,
user_login TEXT NOT NULL,
user_name TEXT,
game_id TEXT,
game_name TEXT,
title TEXT,
viewer_count INTEGER,
language TEXT,
tags TEXT, -- JSON array
UNIQUE (snapshot_time, stream_id)
);
CREATE TABLE IF NOT EXISTS clips (
id TEXT PRIMARY KEY,
broadcaster_id TEXT,
broadcaster_name TEXT,
creator_name TEXT,
title TEXT,
url TEXT,
view_count INTEGER,
duration REAL,
game_id TEXT,
game_name TEXT,
created_at TEXT,
language TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS vods (
id TEXT PRIMARY KEY,
user_id TEXT,
user_name TEXT,
title TEXT,
view_count INTEGER,
duration_str TEXT,
duration_sec INTEGER,
video_type TEXT,
language TEXT,
created_at TEXT,
url TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS games (
id TEXT PRIMARY KEY,
name TEXT UNIQUE,
box_art_url TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_snapshots_user ON stream_snapshots(user_id, snapshot_time);
CREATE INDEX IF NOT EXISTS idx_snapshots_game ON stream_snapshots(game_id, snapshot_time);
CREATE INDEX IF NOT EXISTS idx_snapshots_time ON stream_snapshots(snapshot_time);
CREATE INDEX IF NOT EXISTS idx_clips_broadcaster ON clips(broadcaster_id);
CREATE INDEX IF NOT EXISTS idx_clips_game ON clips(game_id);
""")
conn.commit()
return conn
def store_stream_snapshot(
conn: sqlite3.Connection,
streams: list[dict],
snapshot_time: str = None,
):
"""Store a batch of stream records as a point-in-time snapshot."""
import json
if not snapshot_time:
snapshot_time = datetime.now(timezone.utc).isoformat()
conn.executemany("""
INSERT OR IGNORE INTO stream_snapshots
(snapshot_time, stream_id, user_id, user_login, user_name,
game_id, game_name, title, viewer_count, language, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(
snapshot_time,
s["id"],
s["user_id"],
s["user_login"],
s["user_name"],
s.get("game_id"),
s.get("game_name"),
s.get("title"),
s["viewer_count"],
s.get("language"),
json.dumps(s.get("tags", [])),
)
for s in streams
])
conn.commit()
return len(streams)
def get_viewership_trend(
conn: sqlite3.Connection,
user_id: str,
days: int = 7,
) -> list:
"""Get hourly viewership snapshots for a specific streamer."""
return conn.execute("""
SELECT
snapshot_time,
viewer_count,
game_name,
title
FROM stream_snapshots
WHERE user_id = ?
AND snapshot_time >= datetime('now', ? || ' days')
ORDER BY snapshot_time
""", (user_id, f"-{days}")).fetchall()
def get_game_viewership_history(
conn: sqlite3.Connection,
game_id: str,
days: int = 7,
) -> list:
"""Aggregate total viewership for a game over time."""
return conn.execute("""
SELECT
snapshot_time,
COUNT(*) as stream_count,
SUM(viewer_count) as total_viewers,
AVG(viewer_count) as avg_viewers,
MAX(viewer_count) as peak_single_stream
FROM stream_snapshots
WHERE game_id = ?
AND snapshot_time >= datetime('now', ? || ' days')
GROUP BY snapshot_time
ORDER BY snapshot_time
""", (game_id, f"-{days}")).fetchall()
Time-Series Viewership Tracking
Poll the API on a schedule to build historical viewership datasets:
import sqlite3
from datetime import datetime, timezone
import json
def poll_and_store(
client: TwitchClient,
db_path: str = "twitch_data.db",
game_id: str = None,
language: str = None,
max_streams: int = 500,
):
"""
Take a snapshot of current live streams and persist to SQLite.
Designed to be called from a cron job every 5-10 minutes.
"""
conn = init_twitch_db(db_path)
streams = get_live_streams(
client,
game_id=game_id,
language=language,
max_results=max_streams,
)
count = store_stream_snapshot(conn, streams)
conn.close()
snapshot_time = datetime.now(timezone.utc).isoformat()
print(f"[{snapshot_time}] Stored {count} stream snapshots")
if streams:
print(f" Peak viewers: {streams[0]['viewer_count']:,} ({streams[0]['user_name']})")
total_viewers = sum(s["viewer_count"] for s in streams)
print(f" Total viewers across top {count} streams: {total_viewers:,}")
# Run once
poll_and_store(client, game_id="516575") # Valorant
# Or set up as a cron job:
# */10 * * * * python3 /path/to/poll_twitch.py
Rate Limits: Complete Reference
The Helix API rate limiting system:
- App Access Tokens: 800 requests per 60-second window (resets rolling)
- User Access Tokens: 800 requests per 60-second window per user
- Headers:
Ratelimit-Limit,Ratelimit-Remaining,Ratelimit-Reset(Unix timestamp) - Points cost: Most endpoints cost 1 point. Creating clips: more. Helix Extensions endpoints: varies.
- Status 429: Returned when limit exceeded.
Ratelimit-Resetheader tells you when the window resets.
For large-scale monitoring at the 800/minute ceiling — tracking thousands of channels simultaneously or polling every category — register multiple Twitch applications. Each has its own independent rate limit pool.
Supplementary Web Scraping
Some data points are not in the Helix API at all: - Channel subscriber counts (requires broadcaster OAuth, not app token) - Historical peak viewer records (third-party sites like TwitchTracker) - Chat emote usage statistics (parsed from IRC) - Community statistics from third-party sites (SullyGnome, TwitchStats)
For scraping third-party tracker sites, route requests through residential proxies — these sites have their own bot detection. ThorData proxies work well for this use case. The API itself does not require proxies since it's authenticated by token and Twitch does not restrict by IP for API access. But if you're scraping twitchtracker.com, sullygnome.com, or streamscharts.com for historical data, proxy rotation is needed:
import httpx
from bs4 import BeautifulSoup
PROXY_URL = "http://USER:[email protected]:9000"
def scrape_twitchtracker_channel(username: str) -> dict:
"""
Scrape historical stats for a channel from TwitchTracker.
Requires residential proxy to avoid bot detection.
"""
url = f"https://twitchtracker.com/{username}"
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://twitchtracker.com/",
}
with httpx.Client(
transport=httpx.HTTPTransport(proxy=PROXY_URL),
headers=headers,
timeout=20,
follow_redirects=True,
) as client:
resp = client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
stats = {}
# TwitchTracker's stat cards have predictable structure
stat_cards = soup.select(".g-x-s-value")
labels = soup.select(".g-x-s-label")
for label_el, value_el in zip(labels, stat_cards):
label = label_el.get_text(strip=True)
value = value_el.get_text(strip=True)
stats[label] = value
return {"username": username, "stats": stats}
Anti-Detection for Web Scraping Components
When scraping Twitch's own web pages (not the API) for data like profile bios or channel header info not in the API:
import asyncio
import random
from playwright.async_api import async_playwright
async def scrape_channel_page_playwright(
username: str,
proxy_config: dict = None,
) -> dict:
"""
Scrape a Twitch channel page with Playwright for data not in the API.
"""
async with async_playwright() as p:
launch_kwargs = {
"headless": True,
"args": [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
}
if proxy_config:
launch_kwargs["proxy"] = proxy_config
browser = await p.chromium.launch(**launch_kwargs)
context = await browser.new_context(
viewport={"width": 1280, "height": 900},
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Safari/537.36"
),
locale="en-US",
)
await context.add_init_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
)
page = await context.new_page()
await page.goto(
f"https://www.twitch.tv/{username}",
wait_until="domcontentloaded",
timeout=30000,
)
await asyncio.sleep(random.uniform(2, 4))
data = await page.evaluate("""
() => {
const bio = document.querySelector('[data-a-target="about-section-bio"]')?.textContent?.trim();
const panels = [...document.querySelectorAll('[data-target="channel-panel"]')]
.map(p => p.textContent?.trim());
return { bio, panels };
}
""")
await browser.close()
return {"username": username, **data}
Practical Applications
Esports and competitive gaming analytics. Track viewership for specific games during tournament periods versus regular weeks. Pull clip data to identify which tournament moments generated the most clips and views — a proxy for peak excitement.
Creator research. Identify rising streamers in a category by tracking channels that recently broke into the top 50 by viewer count. Monitor their VOD publishing frequency and clip view rates to gauge growth trajectory.
Market research for game releases. Watch viewership trends in the weeks before and after a game launch. The Twitch viewership curve for new games has a predictable shape — monitoring it reveals how well a title is retaining players.
Sponsorship and partnership intelligence. Track which brands appear in stream titles (via text pattern matching) and correlate with peak viewership periods. Useful for competitive intelligence on who is investing in Twitch advertising.
Final Thoughts
Twitch's Helix API is one of the better-designed data APIs out there — proper rate limit headers, cursor-based pagination, batch endpoints, and accurate documentation. The 800 requests per minute ceiling is generous enough for most projects.
The main gaps are chat data (WebSocket IRC connection, separate implementation), historical viewership (third-party sources only), and subscriber counts (broadcaster-authorized OAuth only). For everything else — streams, clips, channels, VODs, games — the API delivers exactly what you need with minimal friction. Combined with SQLite for time-series storage and a simple cron schedule, you can build a comprehensive Twitch analytics pipeline in an afternoon.