Scraping YouTube: Channels, Playlists, Comments, and Video Metadata (2026)
Scraping YouTube: Channels, Playlists, Comments, and Video Metadata (2026)
Every tutorial about YouTube scraping shows you how to get view counts and like counts for a single video. That's the easy part. The interesting stuff -- channel analysis, full playlist enumeration, comment thread extraction, recommended video mapping, transcript harvesting -- that's what most guides skip.
Here's how to actually do it in 2026.
Table of Contents
- The Official Route: YouTube Data API v3
- Getting All Videos From a Channel
- Extracting Comment Threads With Replies
- Playlist Enumeration
- Quota-Aware Fetching and Persistent Tracking
- Batch Requests and Performance Optimization
- The InnerTube API: Beyond the Data API
- Scraping Transcripts and Captions
- Video Search and Autocomplete Data
- Proxy Strategy for InnerTube
- Storing YouTube Data: SQLite Schema
- Data Quality: Handling Deleted Videos and Edge Cases
- Channel Analytics and Growth Tracking
- Real Use Cases
- Common Errors and Fixes
- Practical Tips Summary
1. The Official Route: YouTube Data API v3 {#data-api}
Google gives you 10,000 quota units per day for free. That sounds generous until you see the per-endpoint costs:
| Endpoint | Cost per Call | What It Returns |
|---|---|---|
search.list |
100 units | Search results |
videos.list |
1 unit | Video metadata (up to 50 per call) |
channels.list |
1 unit | Channel info |
commentThreads.list |
1 unit | Comment threads |
comments.list |
1 unit | Replies to a thread |
playlistItems.list |
1 unit | Videos in a playlist |
playlists.list |
1 unit | Channel playlists |
captions.list |
50 units | Available caption tracks |
captions.download |
200 units | Actual caption content |
So you can make 10,000 video detail requests per day, or 100 searches. Big difference. Plan accordingly.
Setup
Get your API key from the Google Cloud Console. Enable the YouTube Data API v3. Takes about two minutes.
You don't need OAuth for public data. A simple API key works for channel info, public video metadata, playlists, comments on public videos, search results, and even live chat replays. OAuth is only required for private data -- watch history, private playlists, or managing your own channel.
import requests
import time
import json
from datetime import date
API_KEY = "YOUR_API_KEY"
BASE = "https://www.googleapis.com/youtube/v3"
class YouTubeClient:
"""YouTube Data API client with quota tracking and retry logic."""
def __init__(self, api_key: str, quota_file: str = "quota.json"):
self.api_key = api_key
self.quota_file = quota_file
self.quota_used = self._load_quota()
def _load_quota(self) -> int:
try:
with open(self.quota_file) as f:
data = json.load(f)
if data.get("date") == str(date.today()):
return data.get("used", 0)
except FileNotFoundError:
pass
return 0
def _save_quota(self):
with open(self.quota_file, "w") as f:
json.dump({"date": str(date.today()), "used": self.quota_used}, f)
def get(self, endpoint: str, params: dict, cost: int = 1) -> dict:
"""Make API request with quota tracking and retry."""
if self.quota_used + cost > 9800:
raise Exception(f"Quota limit approaching: {self.quota_used} used today")
params["key"] = self.api_key
for attempt in range(3):
try:
resp = requests.get(f"{BASE}/{endpoint}", params=params, timeout=15)
if resp.status_code == 403:
data = resp.json()
reason = data.get("error", {}).get("errors", [{}])[0].get("reason", "")
if reason == "quotaExceeded":
raise Exception("Daily quota exceeded")
resp.raise_for_status()
self.quota_used += cost
self._save_quota()
return resp.json()
except requests.exceptions.RequestException as e:
if attempt < 2:
time.sleep(2 ** attempt)
else:
raise
yt = YouTubeClient(API_KEY)
2. Getting All Videos From a Channel {#channel-videos}
The trick: you can't directly list all videos from a channel. You need the channel's "uploads" playlist ID first, then paginate through that playlist.
def get_channel_uploads_playlist(channel_id: str) -> str:
"""Get the uploads playlist ID for a channel."""
data = yt.get("channels", {
"part": "contentDetails",
"id": channel_id,
})
items = data.get("items", [])
if not items:
raise ValueError(f"Channel not found: {channel_id}")
return items[0]["contentDetails"]["relatedPlaylists"]["uploads"]
def get_channel_by_handle(handle: str) -> str:
"""Get channel ID from a @handle."""
data = yt.get("channels", {
"part": "id",
"forHandle": handle,
})
items = data.get("items", [])
if not items:
raise ValueError(f"Handle not found: {handle}")
return items[0]["id"]
def get_all_playlist_videos(playlist_id: str,
max_videos: int = None) -> list[dict]:
"""Paginate through an entire playlist."""
videos = []
page_token = None
while True:
params = {
"part": "snippet,contentDetails",
"playlistId": playlist_id,
"maxResults": 50,
}
if page_token:
params["pageToken"] = page_token
data = yt.get("playlistItems", params)
for item in data.get("items", []):
video_id = item["contentDetails"]["videoId"]
snippet = item["snippet"]
videos.append({
"video_id": video_id,
"title": snippet["title"],
"published": snippet["publishedAt"],
"description": snippet.get("description", ""),
"thumbnail": (snippet.get("thumbnails", {})
.get("high", {})
.get("url", "")),
"playlist_position": snippet.get("position", 0),
})
if max_videos and len(videos) >= max_videos:
break
page_token = data.get("nextPageToken")
if not page_token:
break
return videos[:max_videos] if max_videos else videos
def get_channel_videos_with_stats(channel_id: str) -> list[dict]:
"""Get all videos with full statistics -- combines playlist + videos endpoints."""
uploads_id = get_channel_uploads_playlist(channel_id)
videos = get_all_playlist_videos(uploads_id)
# Enrich with statistics in batches of 50
for i in range(0, len(videos), 50):
batch = videos[i:i+50]
video_ids = [v["video_id"] for v in batch]
stats_data = yt.get("videos", {
"part": "statistics,contentDetails",
"id": ",".join(video_ids),
})
stats_map = {}
for item in stats_data.get("items", []):
stats_map[item["id"]] = {
"views": int(item["statistics"].get("viewCount", 0)),
"likes": int(item["statistics"].get("likeCount", 0)),
"comments": int(item["statistics"].get("commentCount", 0)),
"duration": item["contentDetails"]["duration"],
}
for v in batch:
v.update(stats_map.get(v["video_id"], {}))
return videos
# Example: get all videos from a channel
channel_id = get_channel_by_handle("@GoogleDevelopers")
all_videos = get_channel_videos_with_stats(channel_id)
print(f"Found {len(all_videos)} videos, used {yt.quota_used} quota units")
A channel with 500 videos costs about 12 quota units to fully enumerate with stats. A channel with 2000 videos costs about 42 quota units. Very efficient.
3. Extracting Comment Threads With Replies {#comments}
Comments are structured as threads -- a top-level comment with nested replies. The commentThreads endpoint gives top-level comments; replies require a second call per thread.
def get_all_comments(video_id: str, order: str = "relevance",
max_pages: int = 50) -> list[dict]:
"""Get all comment threads and their replies for a video."""
threads = []
page_token = None
for page_num in range(max_pages):
params = {
"part": "snippet,replies",
"videoId": video_id,
"maxResults": 100,
"order": order, # "relevance" or "time"
"textFormat": "plainText",
}
if page_token:
params["pageToken"] = page_token
data = yt.get("commentThreads", params)
if "error" in data:
reason = data["error"]["errors"][0].get("reason", "")
if reason == "commentsDisabled":
print(f"Comments disabled on {video_id}")
break
raise Exception(f"API error: {data['error']}")
for item in data.get("items", []):
top_snippet = item["snippet"]["topLevelComment"]["snippet"]
thread = {
"id": item["id"],
"author": top_snippet["authorDisplayName"],
"author_id": top_snippet.get("authorChannelId", {}).get("value"),
"text": top_snippet["textDisplay"],
"likes": top_snippet["likeCount"],
"published": top_snippet["publishedAt"],
"updated": top_snippet.get("updatedAt"),
"reply_count": item["snippet"]["totalReplyCount"],
"replies": [],
}
# Replies bundled with thread (max 5)
if item.get("replies"):
for reply in item["replies"]["comments"]:
rs = reply["snippet"]
thread["replies"].append({
"id": reply["id"],
"author": rs["authorDisplayName"],
"text": rs["textDisplay"],
"likes": rs["likeCount"],
"published": rs["publishedAt"],
"parent_id": rs.get("parentId"),
})
# Fetch remaining replies if there are more than 5
if item["snippet"]["totalReplyCount"] > 5:
thread["replies"] = get_thread_replies(item["id"])
threads.append(thread)
page_token = data.get("nextPageToken")
if not page_token:
break
return threads
def get_thread_replies(thread_id: str) -> list[dict]:
"""Fetch all replies for a comment thread."""
replies = []
page_token = None
while True:
params = {
"part": "snippet",
"parentId": thread_id,
"maxResults": 100,
"textFormat": "plainText",
}
if page_token:
params["pageToken"] = page_token
data = yt.get("comments", params)
for item in data.get("items", []):
s = item["snippet"]
replies.append({
"id": item["id"],
"author": s["authorDisplayName"],
"text": s["textDisplay"],
"likes": s["likeCount"],
"published": s["publishedAt"],
"parent_id": s.get("parentId"),
})
page_token = data.get("nextPageToken")
if not page_token:
break
return replies
def flatten_comment_tree(threads: list[dict]) -> list[dict]:
"""Flatten threaded comments into a flat list with parent references."""
flat = []
for thread in threads:
flat.append({**thread, "is_reply": False, "replies": None})
for reply in thread.get("replies", []):
flat.append({**reply, "is_reply": True, "thread_id": thread["id"]})
return flat
The Comment Count Problem
Here's something that trips people up. The commentCount in video statistics often doesn't match what you can actually retrieve. YouTube's API truncates results -- you might see commentCount: 4521 but only get back 2000 through pagination. This isn't a bug. YouTube stops serving older comments after a certain depth. There's no workaround through the official API.
4. Playlist Enumeration {#playlists}
def get_channel_playlists(channel_id: str) -> list[dict]:
"""Get all public playlists for a channel."""
playlists = []
page_token = None
while True:
params = {
"part": "snippet,contentDetails",
"channelId": channel_id,
"maxResults": 50,
}
if page_token:
params["pageToken"] = page_token
data = yt.get("playlists", params)
for item in data.get("items", []):
snippet = item["snippet"]
playlists.append({
"id": item["id"],
"title": snippet["title"],
"description": snippet.get("description", ""),
"published": snippet["publishedAt"],
"video_count": item["contentDetails"]["itemCount"],
"thumbnail": (snippet.get("thumbnails", {})
.get("high", {}).get("url", "")),
"is_auto_generated": snippet.get("title", "").startswith("#"),
})
page_token = data.get("nextPageToken")
if not page_token:
break
return playlists
def get_playlist_with_all_videos(playlist_id: str) -> dict:
"""Get playlist metadata plus all its videos."""
# Get playlist info
pl_data = yt.get("playlists", {
"part": "snippet,contentDetails",
"id": playlist_id,
})
if not pl_data.get("items"):
return {}
playlist = pl_data["items"][0]
videos = get_all_playlist_videos(playlist_id)
return {
"id": playlist_id,
"title": playlist["snippet"]["title"],
"video_count": playlist["contentDetails"]["itemCount"],
"videos": videos,
}
5. Quota-Aware Fetching and Persistent Tracking {#quota}
Build quota tracking into your code from day one. Here's a complete pattern that persists across script restarts:
import sqlite3
from datetime import datetime
class QuotaTracker:
"""Persistent quota tracking with SQLite."""
def __init__(self, db_path: str = "youtube_quota.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS quota_log (
date TEXT,
endpoint TEXT,
cost INTEGER,
timestamp REAL
)
""")
self.conn.commit()
def used_today(self) -> int:
today = str(date.today())
row = self.conn.execute(
"SELECT COALESCE(SUM(cost), 0) FROM quota_log WHERE date=?",
(today,)
).fetchone()
return row[0]
def log(self, endpoint: str, cost: int):
self.conn.execute(
"INSERT INTO quota_log VALUES (?,?,?,?)",
(str(date.today()), endpoint, cost, time.time())
)
self.conn.commit()
def can_spend(self, cost: int, limit: int = 9800) -> bool:
return self.used_today() + cost <= limit
def daily_breakdown(self) -> dict:
today = str(date.today())
rows = self.conn.execute(
"SELECT endpoint, SUM(cost) FROM quota_log WHERE date=? GROUP BY endpoint",
(today,)
).fetchall()
return dict(rows)
tracker = QuotaTracker()
print(f"Quota used today: {tracker.used_today()}")
print("Breakdown:", tracker.daily_breakdown())
6. Batch Requests and Performance Optimization {#batch}
The videos.list endpoint accepts up to 50 comma-separated video IDs. One call for 50 videos costs 1 unit -- same as one video. Always batch:
def batch_get_video_details(video_ids: list[str],
parts: list[str] = None) -> dict[str, dict]:
"""Get details for multiple videos efficiently using batch requests."""
if parts is None:
parts = ["snippet", "statistics", "contentDetails"]
result = {}
for i in range(0, len(video_ids), 50):
batch = video_ids[i:i+50]
data = yt.get("videos", {
"part": ",".join(parts),
"id": ",".join(batch),
})
for item in data.get("items", []):
vid_id = item["id"]
result[vid_id] = {
"title": item["snippet"]["title"],
"channel": item["snippet"]["channelTitle"],
"channel_id": item["snippet"]["channelId"],
"published": item["snippet"]["publishedAt"],
"description": item["snippet"]["description"],
"tags": item["snippet"].get("tags", []),
"category_id": item["snippet"]["categoryId"],
"views": int(item.get("statistics", {}).get("viewCount", 0)),
"likes": int(item.get("statistics", {}).get("likeCount", 0)),
"comments": int(item.get("statistics", {}).get("commentCount", 0)),
"duration": item.get("contentDetails", {}).get("duration", ""),
"definition": item.get("contentDetails", {}).get("definition", ""),
"licensed_content": item.get("contentDetails", {}).get("licensedContent"),
"live_broadcast_content": item["snippet"].get("liveBroadcastContent"),
}
return result
# Use the fields parameter to reduce response payload
def get_minimal_video_data(video_ids: list[str]) -> dict:
"""Get only the fields you need to reduce API response size."""
result = {}
for i in range(0, len(video_ids), 50):
batch = video_ids[i:i+50]
data = yt.get("videos", {
"part": "snippet,statistics",
"id": ",".join(batch),
"fields": "items(id,snippet/title,snippet/publishedAt,"
"statistics/viewCount,statistics/likeCount)",
})
for item in data.get("items", []):
result[item["id"]] = {
"title": item["snippet"]["title"],
"published": item["snippet"]["publishedAt"],
"views": int(item.get("statistics", {}).get("viewCount", 0)),
"likes": int(item.get("statistics", {}).get("likeCount", 0)),
}
return result
7. The InnerTube API: Beyond the Data API {#innertube}
The Data API is great for structured data about known videos. But what about recommended videos, trending content, or content surfaced through YouTube's algorithm? That's where InnerTube comes in.
InnerTube is the internal API that YouTube's web app, mobile app, and TV app all use. It's not documented or officially supported -- Google can change it whenever they want. But it exposes things the Data API doesn't: recommendations, trending pages, "up next" suggestions, channel sections, community posts, and more.
import requests
import json
INNERTUBE_CONTEXT = {
"client": {
"clientName": "WEB",
"clientVersion": "2.20260320.00.00",
"hl": "en",
"gl": "US",
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36",
"timeZone": "America/New_York",
"browserName": "Chrome",
"browserVersion": "131.0.0.0",
"osName": "Windows",
"osVersion": "10.0",
"platform": "DESKTOP",
}
}
INNERTUBE_HEADERS = {
"Content-Type": "application/json",
"X-YouTube-Client-Name": "1",
"X-YouTube-Client-Version": "2.20260320.00.00",
"Origin": "https://www.youtube.com",
"Referer": "https://www.youtube.com/",
"User-Agent": INNERTUBE_CONTEXT["client"]["userAgent"],
}
def innertube_request(endpoint: str, body: dict,
proxy: str = None) -> dict:
"""Make an InnerTube API request."""
url = f"https://www.youtube.com/youtubei/v1/{endpoint}"
full_body = {"context": INNERTUBE_CONTEXT, **body}
kwargs = {
"headers": INNERTUBE_HEADERS,
"json": full_body,
"timeout": 15,
}
if proxy:
kwargs["proxies"] = {"http": proxy, "https": proxy}
resp = requests.post(url, **kwargs)
resp.raise_for_status()
return resp.json()
def get_trending_videos(category: str = "trending") -> list[dict]:
"""Get trending videos via InnerTube."""
browse_ids = {
"trending": "FEtrending",
"music": "FEmusic",
"gaming": "FEgaming",
"movies": "FEmovies",
}
data = innertube_request("browse", {
"browseId": browse_ids.get(category, "FEtrending")
})
return _extract_videos_from_renderer(data)
def get_video_recommendations(video_id: str) -> list[dict]:
"""Get 'up next' recommendations for a video."""
data = innertube_request("next", {"videoId": video_id})
return _extract_videos_from_renderer(data)
def search_innertube(query: str, continuation_token: str = None) -> dict:
"""Search YouTube via InnerTube."""
if continuation_token:
data = innertube_request("search", {"continuation": continuation_token})
else:
data = innertube_request("search", {"query": query})
return data
def _extract_videos_from_renderer(data: dict) -> list[dict]:
"""Extract video data from InnerTube's nested renderer structure."""
videos = []
# InnerTube responses are deeply nested with "Renderer" objects
# This needs recursive traversal
def traverse(obj):
if isinstance(obj, dict):
if "videoRenderer" in obj:
vr = obj["videoRenderer"]
videos.append({
"video_id": vr.get("videoId", ""),
"title": (vr.get("title", {})
.get("runs", [{}])[0]
.get("text", "")),
"channel": (vr.get("ownerText", {})
.get("runs", [{}])[0]
.get("text", "")),
"views": vr.get("viewCountText", {}).get("simpleText", ""),
"duration": vr.get("lengthText", {}).get("simpleText", ""),
"published": vr.get("publishedTimeText", {}).get("simpleText", ""),
})
for v in obj.values():
traverse(v)
elif isinstance(obj, list):
for item in obj:
traverse(item)
traverse(data)
return videos
InnerTube Continuation Tokens
InnerTube uses continuation tokens for pagination, not page numbers:
def get_channel_all_videos_innertube(channel_id: str,
max_pages: int = 10) -> list[dict]:
"""Get all videos from a channel using InnerTube browse endpoint."""
all_videos = []
continuation = None
# Initial request
data = innertube_request("browse", {
"browseId": channel_id,
"params": "EgZ2aWRlb3M%3D", # "videos" tab parameter
})
for page in range(max_pages):
videos = _extract_videos_from_renderer(data)
all_videos.extend(videos)
# Find continuation token
continuation = _find_continuation_token(data)
if not continuation:
break
data = innertube_request("browse", {"continuation": continuation})
time.sleep(1.5)
return all_videos
def _find_continuation_token(data: dict) -> str | None:
"""Find continuation token in InnerTube response."""
text = json.dumps(data)
import re
m = re.search(r'"continuationCommand":\{"token":"([^"]+)"', text)
return m.group(1) if m else None
8. Scraping Transcripts and Captions {#transcripts}
YouTube captions are expensive via the official API (200 units per download). The unofficial route is much cheaper:
import xml.etree.ElementTree as ET
import urllib.parse
def get_video_transcript(video_id: str, language: str = "en") -> list[dict]:
"""Get video transcript without API quota via InnerTube."""
# Get video page to extract caption track URLs
data = innertube_request("player", {"videoId": video_id})
captions = (data.get("captions", {})
.get("playerCaptionsTracklistRenderer", {})
.get("captionTracks", []))
if not captions:
return []
# Find the target language
target_track = None
for track in captions:
lang_code = track.get("languageCode", "")
if lang_code == language or lang_code.startswith(language):
target_track = track
break
if not target_track and captions:
target_track = captions[0] # fallback to first available
if not target_track:
return []
# Download the XML caption file
caption_url = target_track["baseUrl"]
resp = requests.get(caption_url, timeout=15)
resp.raise_for_status()
# Parse the timed text XML
root = ET.fromstring(resp.text)
transcript = []
for text_elem in root.findall(".//text"):
start = float(text_elem.get("start", 0))
duration = float(text_elem.get("dur", 0))
text = text_elem.text or ""
# Decode HTML entities
import html
text = html.unescape(text).replace("\n", " ").strip()
if text:
transcript.append({
"start": start,
"duration": duration,
"end": start + duration,
"text": text,
})
return transcript
def format_transcript_as_text(transcript: list[dict]) -> str:
"""Convert timestamped transcript to plain text."""
return " ".join(item["text"] for item in transcript)
def format_transcript_as_srt(transcript: list[dict]) -> str:
"""Convert transcript to SRT subtitle format."""
lines = []
for i, item in enumerate(transcript, 1):
start_h = int(item["start"] // 3600)
start_m = int((item["start"] % 3600) // 60)
start_s = int(item["start"] % 60)
start_ms = int((item["start"] % 1) * 1000)
end_h = int(item["end"] // 3600)
end_m = int((item["end"] % 3600) // 60)
end_s = int(item["end"] % 60)
end_ms = int((item["end"] % 1) * 1000)
lines.append(f"{i}")
lines.append(f"{start_h:02d}:{start_m:02d}:{start_s:02d},{start_ms:03d} --> "
f"{end_h:02d}:{end_m:02d}:{end_s:02d},{end_ms:03d}")
lines.append(item["text"])
lines.append("")
return "\n".join(lines)
9. Video Search and Autocomplete Data {#search}
def search_youtube(query: str, max_results: int = 50,
published_after: str = None,
video_type: str = None) -> list[dict]:
"""Search YouTube via Data API. Cost: 100 units per call."""
results = []
page_token = None
fetched = 0
while fetched < max_results:
batch_size = min(50, max_results - fetched)
params = {
"part": "snippet",
"q": query,
"type": "video",
"maxResults": batch_size,
"order": "relevance",
}
if page_token:
params["pageToken"] = page_token
if published_after:
params["publishedAfter"] = published_after # RFC 3339 format
if video_type:
params["videoDuration"] = video_type # short/medium/long
data = yt.get("search", params, cost=100) # expensive!
for item in data.get("items", []):
sn = item["snippet"]
results.append({
"video_id": item["id"]["videoId"],
"title": sn["title"],
"channel": sn["channelTitle"],
"channel_id": sn["channelId"],
"published": sn["publishedAt"],
"description": sn.get("description", ""),
"thumbnail": sn.get("thumbnails", {}).get("high", {}).get("url", ""),
})
fetched += batch_size
page_token = data.get("nextPageToken")
if not page_token or fetched >= max_results:
break
return results
def get_search_autocomplete(query: str) -> list[str]:
"""Get YouTube search autocomplete suggestions (no API key needed)."""
url = "https://suggestqueries.google.com/complete/search"
params = {
"client": "youtube",
"ds": "yt",
"q": query,
"hl": "en",
}
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
# Response is JSONP: window.google.ac.h([...])
text = resp.text
start = text.index("[")
end = text.rindex("]") + 1
data = json.loads(text[start:end])
suggestions = [item[0] for item in data[1] if isinstance(item, list) and item]
return suggestions
# Get related search terms
suggestions = get_search_autocomplete("python web scraping")
print(suggestions)
# ['python web scraping tutorial', 'python web scraping 2026', ...]
10. Proxy Strategy for InnerTube {#proxies}
YouTube's Data API doesn't need proxies -- it's key-authenticated and rate limits per key, not per IP. InnerTube is different: it uses session-based detection and Google's own bot detection infrastructure.
For InnerTube scraping at volume, rotating residential proxies distribute requests across different IP addresses and avoid triggering rate limits:
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000
def get_residential_proxy(country: str = "US") -> str:
user = f"{THORDATA_USER}-country-{country.lower()}"
return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"
def innertube_request_proxied(endpoint: str, body: dict,
country: str = "US") -> dict:
"""Make InnerTube request through rotating residential proxy."""
proxy = get_residential_proxy(country)
return innertube_request(endpoint, body, proxy=proxy)
ThorData offers rotating residential IPs with geographic targeting -- useful for retrieving region-locked content or ensuring your InnerTube requests appear to originate from a specific country.
11. Storing YouTube Data: SQLite Schema {#storage}
import sqlite3
import time
def init_youtube_db(db_path: str = "youtube_data.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS channels (
id TEXT PRIMARY KEY,
title TEXT,
handle TEXT,
description TEXT,
subscriber_count INTEGER,
video_count INTEGER,
view_count INTEGER,
country TEXT,
uploads_playlist_id TEXT,
scraped_at REAL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS videos (
id TEXT PRIMARY KEY,
channel_id TEXT,
title TEXT,
description TEXT,
published_at TEXT,
duration TEXT,
view_count INTEGER,
like_count INTEGER,
comment_count INTEGER,
tags TEXT,
category_id TEXT,
thumbnail_url TEXT,
definition TEXT,
is_live INTEGER,
scraped_at REAL,
FOREIGN KEY (channel_id) REFERENCES channels(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS comments (
id TEXT PRIMARY KEY,
video_id TEXT,
author TEXT,
author_id TEXT,
text TEXT,
likes INTEGER,
published_at TEXT,
is_reply INTEGER,
parent_id TEXT,
reply_count INTEGER,
scraped_at REAL,
FOREIGN KEY (video_id) REFERENCES videos(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS transcripts (
video_id TEXT,
language TEXT,
start_time REAL,
duration REAL,
text TEXT,
scraped_at REAL,
PRIMARY KEY (video_id, language, start_time)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_videos_channel ON videos(channel_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_videos_published ON videos(published_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_comments_video ON comments(video_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_transcripts_video ON transcripts(video_id)")
conn.commit()
return conn
def save_videos_bulk(conn: sqlite3.Connection, videos: list[dict]):
import json
now = time.time()
conn.executemany("""
INSERT OR REPLACE INTO videos VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", [
(v.get("video_id") or v.get("id"), v.get("channel_id"),
v.get("title"), v.get("description"),
v.get("published") or v.get("published_at"),
v.get("duration"), v.get("views", 0), v.get("likes", 0),
v.get("comments", 0), json.dumps(v.get("tags", [])),
v.get("category_id"), v.get("thumbnail"),
v.get("definition"), int(v.get("is_live", False)), now)
for v in videos
])
conn.commit()
12. Data Quality: Handling Deleted Videos and Edge Cases {#data-quality}
def is_video_available(video_id: str) -> dict:
"""Check if a video is available and why if not."""
data = yt.get("videos", {
"part": "status,snippet",
"id": video_id,
})
if not data.get("items"):
return {"available": False, "reason": "deleted_or_private"}
item = data["items"][0]
status = item.get("status", {})
return {
"available": True,
"upload_status": status.get("uploadStatus"),
"privacy": status.get("privacyStatus"),
"embeddable": status.get("embeddable"),
"made_for_kids": status.get("madeForKids"),
"has_comments": item["snippet"].get("liveBroadcastContent") != "none"
or True, # simplified
}
def parse_duration_seconds(duration: str) -> int:
"""Convert ISO 8601 duration (PT4M13S) to seconds."""
import re
m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", duration)
if not m:
return 0
hours = int(m.group(1) or 0)
minutes = int(m.group(2) or 0)
seconds = int(m.group(3) or 0)
return hours * 3600 + minutes * 60 + seconds
def format_view_count(count: int) -> str:
"""Format view count for human display."""
if count >= 1_000_000:
return f"{count/1_000_000:.1f}M"
if count >= 1_000:
return f"{count/1_000:.0f}K"
return str(count)
13. Channel Analytics and Growth Tracking {#analytics}
import sqlite3
from datetime import datetime, timedelta
def track_channel_growth(channel_id: str, db: sqlite3.Connection):
"""Record channel stats snapshot for growth tracking."""
data = yt.get("channels", {
"part": "statistics",
"id": channel_id,
})
if not data.get("items"):
return
stats = data["items"][0]["statistics"]
db.execute("""
CREATE TABLE IF NOT EXISTS channel_snapshots (
channel_id TEXT,
snapshot_date TEXT,
subscribers INTEGER,
video_count INTEGER,
total_views INTEGER,
PRIMARY KEY (channel_id, snapshot_date)
)
""")
db.execute("""
INSERT OR REPLACE INTO channel_snapshots VALUES (?,?,?,?,?)
""", (
channel_id, str(date.today()),
int(stats.get("subscriberCount", 0)),
int(stats.get("videoCount", 0)),
int(stats.get("viewCount", 0)),
))
db.commit()
def compute_growth_rate(channel_id: str, db: sqlite3.Connection,
days: int = 30) -> dict:
"""Compute subscriber/view growth over a period."""
rows = db.execute("""
SELECT snapshot_date, subscribers, total_views
FROM channel_snapshots
WHERE channel_id = ?
ORDER BY snapshot_date DESC
LIMIT ?
""", (channel_id, days)).fetchall()
if len(rows) < 2:
return {"error": "insufficient data"}
newest = rows[0]
oldest = rows[-1]
days_diff = (datetime.fromisoformat(newest[0]) -
datetime.fromisoformat(oldest[0])).days or 1
return {
"subscriber_growth": newest[1] - oldest[1],
"view_growth": newest[2] - oldest[2],
"days": days_diff,
"daily_subscriber_rate": (newest[1] - oldest[1]) / days_diff,
"daily_view_rate": (newest[2] - oldest[2]) / days_diff,
}
14. Real Use Cases {#use-cases}
Content Gap Analysis
def find_content_gaps(competitor_channel_id: str,
your_topics: list[str]) -> list[dict]:
"""Find topics competitors cover that you don't."""
# Get competitor's video titles
uploads = get_channel_uploads_playlist(competitor_channel_id)
videos = get_all_playlist_videos(uploads, max_videos=500)
# Find videos about topics you haven't covered
gaps = []
for video in videos:
title_lower = video["title"].lower()
if not any(topic.lower() in title_lower for topic in your_topics):
gaps.append(video)
return sorted(gaps, key=lambda v: v.get("views", 0), reverse=True)
Viral Video Early Detection
def find_viral_candidates(channel_ids: list[str],
hours_old: int = 48) -> list[dict]:
"""Find recently published videos with rapidly growing engagement."""
from datetime import datetime, timezone
cutoff = datetime.now(timezone.utc).timestamp() - (hours_old * 3600)
candidates = []
for channel_id in channel_ids:
uploads = get_channel_uploads_playlist(channel_id)
recent = get_all_playlist_videos(uploads, max_videos=10)
video_ids = [v["video_id"] for v in recent]
details = batch_get_video_details(video_ids)
for video_id, data in details.items():
pub_ts = datetime.fromisoformat(
data["published"].replace("Z", "+00:00")
).timestamp()
if pub_ts > cutoff:
age_hours = (time.time() - pub_ts) / 3600
views_per_hour = data["views"] / max(age_hours, 1)
if views_per_hour > 10000: # Adjust threshold
candidates.append({
**data,
"video_id": video_id,
"age_hours": round(age_hours, 1),
"views_per_hour": round(views_per_hour),
})
return sorted(candidates, key=lambda x: -x["views_per_hour"])
15. Common Errors and Fixes {#errors}
| Error | Cause | Fix |
|---|---|---|
quotaExceeded |
Hit 10,000 daily units | Wait until midnight Pacific; add second project |
Empty items on playlist |
Playlist is private or deleted | Check with playlists.list first |
commentsDisabled |
Channel disabled comments | Skip comment collection for this video |
videoNotFound |
Video deleted or private | Remove from your tracking list |
| 403 on InnerTube | IP blocked by Google bot detection | Use residential proxies |
Stale nextPageToken |
Token expired (they last ~1 hour) | Re-fetch from the beginning of the page |
processingFailure |
Video still processing | Retry after 5 minutes |
Empty subscriberCount |
Channel hid subscribers | Normal -- some channels hide this |
16. Practical Tips Summary {#summary}
Quota management:
- Batch videos.list at 50 IDs per call -- 1 unit for 50 videos vs 50 units for 50 individual calls
- Use fields parameter to cut response size by 50-80%
- Cache channel uploads playlist IDs -- they don't change
- Search is 100x more expensive than direct lookups -- avoid when possible
InnerTube:
- Keep clientVersion strings current -- stale versions get rejected within weeks
- Use residential proxies from ThorData for sustained InnerTube access
- The renderer structure changes with YouTube UI updates -- build flexible parsers that can traverse unknown nesting
Comments:
- replace_more equivalent in the API is fetching all thread reply pages -- add quota cost
- Comment count in statistics doesn't match what's retrievable -- stop when nextPageToken is absent
- Handle commentsDisabled gracefully -- it's common on news channels and controversial videos
Transcripts:
- Use InnerTube's /player endpoint for transcript URLs -- free and no quota cost
- Auto-generated captions cover 90% of English videos but have accuracy issues
- Cache transcripts locally -- they rarely change after initial upload
YouTube scraping in 2026 isn't fundamentally different from previous years. The Data API is stable and generous for direct lookups. InnerTube endpoints change more frequently but expose data the official API doesn't. Build your code to handle both, track your quota carefully, and cache aggressively. The data is there if you're systematic about extracting it.