← Back to blog

Scraping YouTube: Channels, Playlists, Comments, and Video Metadata (2026)

Scraping YouTube: Channels, Playlists, Comments, and Video Metadata (2026)

Every tutorial about YouTube scraping shows you how to get view counts and like counts for a single video. That's the easy part. The interesting stuff -- channel analysis, full playlist enumeration, comment thread extraction, recommended video mapping, transcript harvesting -- that's what most guides skip.

Here's how to actually do it in 2026.


Table of Contents

  1. The Official Route: YouTube Data API v3
  2. Getting All Videos From a Channel
  3. Extracting Comment Threads With Replies
  4. Playlist Enumeration
  5. Quota-Aware Fetching and Persistent Tracking
  6. Batch Requests and Performance Optimization
  7. The InnerTube API: Beyond the Data API
  8. Scraping Transcripts and Captions
  9. Video Search and Autocomplete Data
  10. Proxy Strategy for InnerTube
  11. Storing YouTube Data: SQLite Schema
  12. Data Quality: Handling Deleted Videos and Edge Cases
  13. Channel Analytics and Growth Tracking
  14. Real Use Cases
  15. Common Errors and Fixes
  16. Practical Tips Summary

1. The Official Route: YouTube Data API v3 {#data-api}

Google gives you 10,000 quota units per day for free. That sounds generous until you see the per-endpoint costs:

Endpoint Cost per Call What It Returns
search.list 100 units Search results
videos.list 1 unit Video metadata (up to 50 per call)
channels.list 1 unit Channel info
commentThreads.list 1 unit Comment threads
comments.list 1 unit Replies to a thread
playlistItems.list 1 unit Videos in a playlist
playlists.list 1 unit Channel playlists
captions.list 50 units Available caption tracks
captions.download 200 units Actual caption content

So you can make 10,000 video detail requests per day, or 100 searches. Big difference. Plan accordingly.

Setup

Get your API key from the Google Cloud Console. Enable the YouTube Data API v3. Takes about two minutes.

You don't need OAuth for public data. A simple API key works for channel info, public video metadata, playlists, comments on public videos, search results, and even live chat replays. OAuth is only required for private data -- watch history, private playlists, or managing your own channel.

import requests
import time
import json
from datetime import date

API_KEY = "YOUR_API_KEY"
BASE = "https://www.googleapis.com/youtube/v3"

class YouTubeClient:
    """YouTube Data API client with quota tracking and retry logic."""

    def __init__(self, api_key: str, quota_file: str = "quota.json"):
        self.api_key = api_key
        self.quota_file = quota_file
        self.quota_used = self._load_quota()

    def _load_quota(self) -> int:
        try:
            with open(self.quota_file) as f:
                data = json.load(f)
                if data.get("date") == str(date.today()):
                    return data.get("used", 0)
        except FileNotFoundError:
            pass
        return 0

    def _save_quota(self):
        with open(self.quota_file, "w") as f:
            json.dump({"date": str(date.today()), "used": self.quota_used}, f)

    def get(self, endpoint: str, params: dict, cost: int = 1) -> dict:
        """Make API request with quota tracking and retry."""
        if self.quota_used + cost > 9800:
            raise Exception(f"Quota limit approaching: {self.quota_used} used today")

        params["key"] = self.api_key
        for attempt in range(3):
            try:
                resp = requests.get(f"{BASE}/{endpoint}", params=params, timeout=15)
                if resp.status_code == 403:
                    data = resp.json()
                    reason = data.get("error", {}).get("errors", [{}])[0].get("reason", "")
                    if reason == "quotaExceeded":
                        raise Exception("Daily quota exceeded")
                resp.raise_for_status()
                self.quota_used += cost
                self._save_quota()
                return resp.json()
            except requests.exceptions.RequestException as e:
                if attempt < 2:
                    time.sleep(2 ** attempt)
                else:
                    raise

yt = YouTubeClient(API_KEY)

2. Getting All Videos From a Channel {#channel-videos}

The trick: you can't directly list all videos from a channel. You need the channel's "uploads" playlist ID first, then paginate through that playlist.

def get_channel_uploads_playlist(channel_id: str) -> str:
    """Get the uploads playlist ID for a channel."""
    data = yt.get("channels", {
        "part": "contentDetails",
        "id": channel_id,
    })
    items = data.get("items", [])
    if not items:
        raise ValueError(f"Channel not found: {channel_id}")
    return items[0]["contentDetails"]["relatedPlaylists"]["uploads"]

def get_channel_by_handle(handle: str) -> str:
    """Get channel ID from a @handle."""
    data = yt.get("channels", {
        "part": "id",
        "forHandle": handle,
    })
    items = data.get("items", [])
    if not items:
        raise ValueError(f"Handle not found: {handle}")
    return items[0]["id"]

def get_all_playlist_videos(playlist_id: str,
                            max_videos: int = None) -> list[dict]:
    """Paginate through an entire playlist."""
    videos = []
    page_token = None

    while True:
        params = {
            "part": "snippet,contentDetails",
            "playlistId": playlist_id,
            "maxResults": 50,
        }
        if page_token:
            params["pageToken"] = page_token

        data = yt.get("playlistItems", params)

        for item in data.get("items", []):
            video_id = item["contentDetails"]["videoId"]
            snippet = item["snippet"]
            videos.append({
                "video_id": video_id,
                "title": snippet["title"],
                "published": snippet["publishedAt"],
                "description": snippet.get("description", ""),
                "thumbnail": (snippet.get("thumbnails", {})
                             .get("high", {})
                             .get("url", "")),
                "playlist_position": snippet.get("position", 0),
            })

        if max_videos and len(videos) >= max_videos:
            break

        page_token = data.get("nextPageToken")
        if not page_token:
            break

    return videos[:max_videos] if max_videos else videos

def get_channel_videos_with_stats(channel_id: str) -> list[dict]:
    """Get all videos with full statistics -- combines playlist + videos endpoints."""
    uploads_id = get_channel_uploads_playlist(channel_id)
    videos = get_all_playlist_videos(uploads_id)

    # Enrich with statistics in batches of 50
    for i in range(0, len(videos), 50):
        batch = videos[i:i+50]
        video_ids = [v["video_id"] for v in batch]
        stats_data = yt.get("videos", {
            "part": "statistics,contentDetails",
            "id": ",".join(video_ids),
        })

        stats_map = {}
        for item in stats_data.get("items", []):
            stats_map[item["id"]] = {
                "views": int(item["statistics"].get("viewCount", 0)),
                "likes": int(item["statistics"].get("likeCount", 0)),
                "comments": int(item["statistics"].get("commentCount", 0)),
                "duration": item["contentDetails"]["duration"],
            }

        for v in batch:
            v.update(stats_map.get(v["video_id"], {}))

    return videos

# Example: get all videos from a channel
channel_id = get_channel_by_handle("@GoogleDevelopers")
all_videos = get_channel_videos_with_stats(channel_id)
print(f"Found {len(all_videos)} videos, used {yt.quota_used} quota units")

A channel with 500 videos costs about 12 quota units to fully enumerate with stats. A channel with 2000 videos costs about 42 quota units. Very efficient.


3. Extracting Comment Threads With Replies {#comments}

Comments are structured as threads -- a top-level comment with nested replies. The commentThreads endpoint gives top-level comments; replies require a second call per thread.

def get_all_comments(video_id: str, order: str = "relevance",
                     max_pages: int = 50) -> list[dict]:
    """Get all comment threads and their replies for a video."""
    threads = []
    page_token = None

    for page_num in range(max_pages):
        params = {
            "part": "snippet,replies",
            "videoId": video_id,
            "maxResults": 100,
            "order": order,  # "relevance" or "time"
            "textFormat": "plainText",
        }
        if page_token:
            params["pageToken"] = page_token

        data = yt.get("commentThreads", params)

        if "error" in data:
            reason = data["error"]["errors"][0].get("reason", "")
            if reason == "commentsDisabled":
                print(f"Comments disabled on {video_id}")
                break
            raise Exception(f"API error: {data['error']}")

        for item in data.get("items", []):
            top_snippet = item["snippet"]["topLevelComment"]["snippet"]
            thread = {
                "id": item["id"],
                "author": top_snippet["authorDisplayName"],
                "author_id": top_snippet.get("authorChannelId", {}).get("value"),
                "text": top_snippet["textDisplay"],
                "likes": top_snippet["likeCount"],
                "published": top_snippet["publishedAt"],
                "updated": top_snippet.get("updatedAt"),
                "reply_count": item["snippet"]["totalReplyCount"],
                "replies": [],
            }

            # Replies bundled with thread (max 5)
            if item.get("replies"):
                for reply in item["replies"]["comments"]:
                    rs = reply["snippet"]
                    thread["replies"].append({
                        "id": reply["id"],
                        "author": rs["authorDisplayName"],
                        "text": rs["textDisplay"],
                        "likes": rs["likeCount"],
                        "published": rs["publishedAt"],
                        "parent_id": rs.get("parentId"),
                    })

            # Fetch remaining replies if there are more than 5
            if item["snippet"]["totalReplyCount"] > 5:
                thread["replies"] = get_thread_replies(item["id"])

            threads.append(thread)

        page_token = data.get("nextPageToken")
        if not page_token:
            break

    return threads

def get_thread_replies(thread_id: str) -> list[dict]:
    """Fetch all replies for a comment thread."""
    replies = []
    page_token = None

    while True:
        params = {
            "part": "snippet",
            "parentId": thread_id,
            "maxResults": 100,
            "textFormat": "plainText",
        }
        if page_token:
            params["pageToken"] = page_token

        data = yt.get("comments", params)

        for item in data.get("items", []):
            s = item["snippet"]
            replies.append({
                "id": item["id"],
                "author": s["authorDisplayName"],
                "text": s["textDisplay"],
                "likes": s["likeCount"],
                "published": s["publishedAt"],
                "parent_id": s.get("parentId"),
            })

        page_token = data.get("nextPageToken")
        if not page_token:
            break

    return replies

def flatten_comment_tree(threads: list[dict]) -> list[dict]:
    """Flatten threaded comments into a flat list with parent references."""
    flat = []
    for thread in threads:
        flat.append({**thread, "is_reply": False, "replies": None})
        for reply in thread.get("replies", []):
            flat.append({**reply, "is_reply": True, "thread_id": thread["id"]})
    return flat

The Comment Count Problem

Here's something that trips people up. The commentCount in video statistics often doesn't match what you can actually retrieve. YouTube's API truncates results -- you might see commentCount: 4521 but only get back 2000 through pagination. This isn't a bug. YouTube stops serving older comments after a certain depth. There's no workaround through the official API.


4. Playlist Enumeration {#playlists}

def get_channel_playlists(channel_id: str) -> list[dict]:
    """Get all public playlists for a channel."""
    playlists = []
    page_token = None

    while True:
        params = {
            "part": "snippet,contentDetails",
            "channelId": channel_id,
            "maxResults": 50,
        }
        if page_token:
            params["pageToken"] = page_token

        data = yt.get("playlists", params)

        for item in data.get("items", []):
            snippet = item["snippet"]
            playlists.append({
                "id": item["id"],
                "title": snippet["title"],
                "description": snippet.get("description", ""),
                "published": snippet["publishedAt"],
                "video_count": item["contentDetails"]["itemCount"],
                "thumbnail": (snippet.get("thumbnails", {})
                             .get("high", {}).get("url", "")),
                "is_auto_generated": snippet.get("title", "").startswith("#"),
            })

        page_token = data.get("nextPageToken")
        if not page_token:
            break

    return playlists

def get_playlist_with_all_videos(playlist_id: str) -> dict:
    """Get playlist metadata plus all its videos."""
    # Get playlist info
    pl_data = yt.get("playlists", {
        "part": "snippet,contentDetails",
        "id": playlist_id,
    })
    if not pl_data.get("items"):
        return {}

    playlist = pl_data["items"][0]
    videos = get_all_playlist_videos(playlist_id)

    return {
        "id": playlist_id,
        "title": playlist["snippet"]["title"],
        "video_count": playlist["contentDetails"]["itemCount"],
        "videos": videos,
    }

5. Quota-Aware Fetching and Persistent Tracking {#quota}

Build quota tracking into your code from day one. Here's a complete pattern that persists across script restarts:

import sqlite3
from datetime import datetime

class QuotaTracker:
    """Persistent quota tracking with SQLite."""

    def __init__(self, db_path: str = "youtube_quota.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS quota_log (
                date TEXT,
                endpoint TEXT,
                cost INTEGER,
                timestamp REAL
            )
        """)
        self.conn.commit()

    def used_today(self) -> int:
        today = str(date.today())
        row = self.conn.execute(
            "SELECT COALESCE(SUM(cost), 0) FROM quota_log WHERE date=?",
            (today,)
        ).fetchone()
        return row[0]

    def log(self, endpoint: str, cost: int):
        self.conn.execute(
            "INSERT INTO quota_log VALUES (?,?,?,?)",
            (str(date.today()), endpoint, cost, time.time())
        )
        self.conn.commit()

    def can_spend(self, cost: int, limit: int = 9800) -> bool:
        return self.used_today() + cost <= limit

    def daily_breakdown(self) -> dict:
        today = str(date.today())
        rows = self.conn.execute(
            "SELECT endpoint, SUM(cost) FROM quota_log WHERE date=? GROUP BY endpoint",
            (today,)
        ).fetchall()
        return dict(rows)

tracker = QuotaTracker()
print(f"Quota used today: {tracker.used_today()}")
print("Breakdown:", tracker.daily_breakdown())

6. Batch Requests and Performance Optimization {#batch}

The videos.list endpoint accepts up to 50 comma-separated video IDs. One call for 50 videos costs 1 unit -- same as one video. Always batch:

def batch_get_video_details(video_ids: list[str],
                            parts: list[str] = None) -> dict[str, dict]:
    """Get details for multiple videos efficiently using batch requests."""
    if parts is None:
        parts = ["snippet", "statistics", "contentDetails"]

    result = {}
    for i in range(0, len(video_ids), 50):
        batch = video_ids[i:i+50]
        data = yt.get("videos", {
            "part": ",".join(parts),
            "id": ",".join(batch),
        })

        for item in data.get("items", []):
            vid_id = item["id"]
            result[vid_id] = {
                "title": item["snippet"]["title"],
                "channel": item["snippet"]["channelTitle"],
                "channel_id": item["snippet"]["channelId"],
                "published": item["snippet"]["publishedAt"],
                "description": item["snippet"]["description"],
                "tags": item["snippet"].get("tags", []),
                "category_id": item["snippet"]["categoryId"],
                "views": int(item.get("statistics", {}).get("viewCount", 0)),
                "likes": int(item.get("statistics", {}).get("likeCount", 0)),
                "comments": int(item.get("statistics", {}).get("commentCount", 0)),
                "duration": item.get("contentDetails", {}).get("duration", ""),
                "definition": item.get("contentDetails", {}).get("definition", ""),
                "licensed_content": item.get("contentDetails", {}).get("licensedContent"),
                "live_broadcast_content": item["snippet"].get("liveBroadcastContent"),
            }

    return result

# Use the fields parameter to reduce response payload
def get_minimal_video_data(video_ids: list[str]) -> dict:
    """Get only the fields you need to reduce API response size."""
    result = {}
    for i in range(0, len(video_ids), 50):
        batch = video_ids[i:i+50]
        data = yt.get("videos", {
            "part": "snippet,statistics",
            "id": ",".join(batch),
            "fields": "items(id,snippet/title,snippet/publishedAt,"
                     "statistics/viewCount,statistics/likeCount)",
        })
        for item in data.get("items", []):
            result[item["id"]] = {
                "title": item["snippet"]["title"],
                "published": item["snippet"]["publishedAt"],
                "views": int(item.get("statistics", {}).get("viewCount", 0)),
                "likes": int(item.get("statistics", {}).get("likeCount", 0)),
            }
    return result

7. The InnerTube API: Beyond the Data API {#innertube}

The Data API is great for structured data about known videos. But what about recommended videos, trending content, or content surfaced through YouTube's algorithm? That's where InnerTube comes in.

InnerTube is the internal API that YouTube's web app, mobile app, and TV app all use. It's not documented or officially supported -- Google can change it whenever they want. But it exposes things the Data API doesn't: recommendations, trending pages, "up next" suggestions, channel sections, community posts, and more.

import requests
import json

INNERTUBE_CONTEXT = {
    "client": {
        "clientName": "WEB",
        "clientVersion": "2.20260320.00.00",
        "hl": "en",
        "gl": "US",
        "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                     "AppleWebKit/537.36 (KHTML, like Gecko) "
                     "Chrome/131.0.0.0 Safari/537.36",
        "timeZone": "America/New_York",
        "browserName": "Chrome",
        "browserVersion": "131.0.0.0",
        "osName": "Windows",
        "osVersion": "10.0",
        "platform": "DESKTOP",
    }
}

INNERTUBE_HEADERS = {
    "Content-Type": "application/json",
    "X-YouTube-Client-Name": "1",
    "X-YouTube-Client-Version": "2.20260320.00.00",
    "Origin": "https://www.youtube.com",
    "Referer": "https://www.youtube.com/",
    "User-Agent": INNERTUBE_CONTEXT["client"]["userAgent"],
}

def innertube_request(endpoint: str, body: dict,
                      proxy: str = None) -> dict:
    """Make an InnerTube API request."""
    url = f"https://www.youtube.com/youtubei/v1/{endpoint}"
    full_body = {"context": INNERTUBE_CONTEXT, **body}
    kwargs = {
        "headers": INNERTUBE_HEADERS,
        "json": full_body,
        "timeout": 15,
    }
    if proxy:
        kwargs["proxies"] = {"http": proxy, "https": proxy}

    resp = requests.post(url, **kwargs)
    resp.raise_for_status()
    return resp.json()

def get_trending_videos(category: str = "trending") -> list[dict]:
    """Get trending videos via InnerTube."""
    browse_ids = {
        "trending": "FEtrending",
        "music": "FEmusic",
        "gaming": "FEgaming",
        "movies": "FEmovies",
    }
    data = innertube_request("browse", {
        "browseId": browse_ids.get(category, "FEtrending")
    })
    return _extract_videos_from_renderer(data)

def get_video_recommendations(video_id: str) -> list[dict]:
    """Get 'up next' recommendations for a video."""
    data = innertube_request("next", {"videoId": video_id})
    return _extract_videos_from_renderer(data)

def search_innertube(query: str, continuation_token: str = None) -> dict:
    """Search YouTube via InnerTube."""
    if continuation_token:
        data = innertube_request("search", {"continuation": continuation_token})
    else:
        data = innertube_request("search", {"query": query})
    return data

def _extract_videos_from_renderer(data: dict) -> list[dict]:
    """Extract video data from InnerTube's nested renderer structure."""
    videos = []
    # InnerTube responses are deeply nested with "Renderer" objects
    # This needs recursive traversal
    def traverse(obj):
        if isinstance(obj, dict):
            if "videoRenderer" in obj:
                vr = obj["videoRenderer"]
                videos.append({
                    "video_id": vr.get("videoId", ""),
                    "title": (vr.get("title", {})
                             .get("runs", [{}])[0]
                             .get("text", "")),
                    "channel": (vr.get("ownerText", {})
                               .get("runs", [{}])[0]
                               .get("text", "")),
                    "views": vr.get("viewCountText", {}).get("simpleText", ""),
                    "duration": vr.get("lengthText", {}).get("simpleText", ""),
                    "published": vr.get("publishedTimeText", {}).get("simpleText", ""),
                })
            for v in obj.values():
                traverse(v)
        elif isinstance(obj, list):
            for item in obj:
                traverse(item)
    traverse(data)
    return videos

InnerTube Continuation Tokens

InnerTube uses continuation tokens for pagination, not page numbers:

def get_channel_all_videos_innertube(channel_id: str,
                                      max_pages: int = 10) -> list[dict]:
    """Get all videos from a channel using InnerTube browse endpoint."""
    all_videos = []
    continuation = None

    # Initial request
    data = innertube_request("browse", {
        "browseId": channel_id,
        "params": "EgZ2aWRlb3M%3D",  # "videos" tab parameter
    })

    for page in range(max_pages):
        videos = _extract_videos_from_renderer(data)
        all_videos.extend(videos)

        # Find continuation token
        continuation = _find_continuation_token(data)
        if not continuation:
            break

        data = innertube_request("browse", {"continuation": continuation})
        time.sleep(1.5)

    return all_videos

def _find_continuation_token(data: dict) -> str | None:
    """Find continuation token in InnerTube response."""
    text = json.dumps(data)
    import re
    m = re.search(r'"continuationCommand":\{"token":"([^"]+)"', text)
    return m.group(1) if m else None

8. Scraping Transcripts and Captions {#transcripts}

YouTube captions are expensive via the official API (200 units per download). The unofficial route is much cheaper:

import xml.etree.ElementTree as ET
import urllib.parse

def get_video_transcript(video_id: str, language: str = "en") -> list[dict]:
    """Get video transcript without API quota via InnerTube."""
    # Get video page to extract caption track URLs
    data = innertube_request("player", {"videoId": video_id})

    captions = (data.get("captions", {})
                   .get("playerCaptionsTracklistRenderer", {})
                   .get("captionTracks", []))

    if not captions:
        return []

    # Find the target language
    target_track = None
    for track in captions:
        lang_code = track.get("languageCode", "")
        if lang_code == language or lang_code.startswith(language):
            target_track = track
            break

    if not target_track and captions:
        target_track = captions[0]  # fallback to first available

    if not target_track:
        return []

    # Download the XML caption file
    caption_url = target_track["baseUrl"]
    resp = requests.get(caption_url, timeout=15)
    resp.raise_for_status()

    # Parse the timed text XML
    root = ET.fromstring(resp.text)
    transcript = []
    for text_elem in root.findall(".//text"):
        start = float(text_elem.get("start", 0))
        duration = float(text_elem.get("dur", 0))
        text = text_elem.text or ""
        # Decode HTML entities
        import html
        text = html.unescape(text).replace("\n", " ").strip()
        if text:
            transcript.append({
                "start": start,
                "duration": duration,
                "end": start + duration,
                "text": text,
            })

    return transcript

def format_transcript_as_text(transcript: list[dict]) -> str:
    """Convert timestamped transcript to plain text."""
    return " ".join(item["text"] for item in transcript)

def format_transcript_as_srt(transcript: list[dict]) -> str:
    """Convert transcript to SRT subtitle format."""
    lines = []
    for i, item in enumerate(transcript, 1):
        start_h = int(item["start"] // 3600)
        start_m = int((item["start"] % 3600) // 60)
        start_s = int(item["start"] % 60)
        start_ms = int((item["start"] % 1) * 1000)
        end_h = int(item["end"] // 3600)
        end_m = int((item["end"] % 3600) // 60)
        end_s = int(item["end"] % 60)
        end_ms = int((item["end"] % 1) * 1000)

        lines.append(f"{i}")
        lines.append(f"{start_h:02d}:{start_m:02d}:{start_s:02d},{start_ms:03d} --> "
                    f"{end_h:02d}:{end_m:02d}:{end_s:02d},{end_ms:03d}")
        lines.append(item["text"])
        lines.append("")

    return "\n".join(lines)

def search_youtube(query: str, max_results: int = 50,
                   published_after: str = None,
                   video_type: str = None) -> list[dict]:
    """Search YouTube via Data API. Cost: 100 units per call."""
    results = []
    page_token = None
    fetched = 0

    while fetched < max_results:
        batch_size = min(50, max_results - fetched)
        params = {
            "part": "snippet",
            "q": query,
            "type": "video",
            "maxResults": batch_size,
            "order": "relevance",
        }
        if page_token:
            params["pageToken"] = page_token
        if published_after:
            params["publishedAfter"] = published_after  # RFC 3339 format
        if video_type:
            params["videoDuration"] = video_type  # short/medium/long

        data = yt.get("search", params, cost=100)  # expensive!
        for item in data.get("items", []):
            sn = item["snippet"]
            results.append({
                "video_id": item["id"]["videoId"],
                "title": sn["title"],
                "channel": sn["channelTitle"],
                "channel_id": sn["channelId"],
                "published": sn["publishedAt"],
                "description": sn.get("description", ""),
                "thumbnail": sn.get("thumbnails", {}).get("high", {}).get("url", ""),
            })

        fetched += batch_size
        page_token = data.get("nextPageToken")
        if not page_token or fetched >= max_results:
            break

    return results

def get_search_autocomplete(query: str) -> list[str]:
    """Get YouTube search autocomplete suggestions (no API key needed)."""
    url = "https://suggestqueries.google.com/complete/search"
    params = {
        "client": "youtube",
        "ds": "yt",
        "q": query,
        "hl": "en",
    }
    resp = requests.get(url, params=params, timeout=10)
    resp.raise_for_status()
    # Response is JSONP: window.google.ac.h([...])
    text = resp.text
    start = text.index("[")
    end = text.rindex("]") + 1
    data = json.loads(text[start:end])
    suggestions = [item[0] for item in data[1] if isinstance(item, list) and item]
    return suggestions

# Get related search terms
suggestions = get_search_autocomplete("python web scraping")
print(suggestions)
# ['python web scraping tutorial', 'python web scraping 2026', ...]

10. Proxy Strategy for InnerTube {#proxies}

YouTube's Data API doesn't need proxies -- it's key-authenticated and rate limits per key, not per IP. InnerTube is different: it uses session-based detection and Google's own bot detection infrastructure.

For InnerTube scraping at volume, rotating residential proxies distribute requests across different IP addresses and avoid triggering rate limits:

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
THORDATA_HOST = "proxy.thordata.com"
THORDATA_PORT = 9000

def get_residential_proxy(country: str = "US") -> str:
    user = f"{THORDATA_USER}-country-{country.lower()}"
    return f"http://{user}:{THORDATA_PASS}@{THORDATA_HOST}:{THORDATA_PORT}"

def innertube_request_proxied(endpoint: str, body: dict,
                               country: str = "US") -> dict:
    """Make InnerTube request through rotating residential proxy."""
    proxy = get_residential_proxy(country)
    return innertube_request(endpoint, body, proxy=proxy)

ThorData offers rotating residential IPs with geographic targeting -- useful for retrieving region-locked content or ensuring your InnerTube requests appear to originate from a specific country.


11. Storing YouTube Data: SQLite Schema {#storage}

import sqlite3
import time

def init_youtube_db(db_path: str = "youtube_data.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")

    conn.execute("""
        CREATE TABLE IF NOT EXISTS channels (
            id TEXT PRIMARY KEY,
            title TEXT,
            handle TEXT,
            description TEXT,
            subscriber_count INTEGER,
            video_count INTEGER,
            view_count INTEGER,
            country TEXT,
            uploads_playlist_id TEXT,
            scraped_at REAL
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS videos (
            id TEXT PRIMARY KEY,
            channel_id TEXT,
            title TEXT,
            description TEXT,
            published_at TEXT,
            duration TEXT,
            view_count INTEGER,
            like_count INTEGER,
            comment_count INTEGER,
            tags TEXT,
            category_id TEXT,
            thumbnail_url TEXT,
            definition TEXT,
            is_live INTEGER,
            scraped_at REAL,
            FOREIGN KEY (channel_id) REFERENCES channels(id)
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS comments (
            id TEXT PRIMARY KEY,
            video_id TEXT,
            author TEXT,
            author_id TEXT,
            text TEXT,
            likes INTEGER,
            published_at TEXT,
            is_reply INTEGER,
            parent_id TEXT,
            reply_count INTEGER,
            scraped_at REAL,
            FOREIGN KEY (video_id) REFERENCES videos(id)
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS transcripts (
            video_id TEXT,
            language TEXT,
            start_time REAL,
            duration REAL,
            text TEXT,
            scraped_at REAL,
            PRIMARY KEY (video_id, language, start_time)
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_videos_channel ON videos(channel_id)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_videos_published ON videos(published_at)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_comments_video ON comments(video_id)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_transcripts_video ON transcripts(video_id)")

    conn.commit()
    return conn

def save_videos_bulk(conn: sqlite3.Connection, videos: list[dict]):
    import json
    now = time.time()
    conn.executemany("""
        INSERT OR REPLACE INTO videos VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    """, [
        (v.get("video_id") or v.get("id"), v.get("channel_id"),
         v.get("title"), v.get("description"),
         v.get("published") or v.get("published_at"),
         v.get("duration"), v.get("views", 0), v.get("likes", 0),
         v.get("comments", 0), json.dumps(v.get("tags", [])),
         v.get("category_id"), v.get("thumbnail"),
         v.get("definition"), int(v.get("is_live", False)), now)
        for v in videos
    ])
    conn.commit()

12. Data Quality: Handling Deleted Videos and Edge Cases {#data-quality}

def is_video_available(video_id: str) -> dict:
    """Check if a video is available and why if not."""
    data = yt.get("videos", {
        "part": "status,snippet",
        "id": video_id,
    })
    if not data.get("items"):
        return {"available": False, "reason": "deleted_or_private"}

    item = data["items"][0]
    status = item.get("status", {})
    return {
        "available": True,
        "upload_status": status.get("uploadStatus"),
        "privacy": status.get("privacyStatus"),
        "embeddable": status.get("embeddable"),
        "made_for_kids": status.get("madeForKids"),
        "has_comments": item["snippet"].get("liveBroadcastContent") != "none"
                       or True,  # simplified
    }

def parse_duration_seconds(duration: str) -> int:
    """Convert ISO 8601 duration (PT4M13S) to seconds."""
    import re
    m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", duration)
    if not m:
        return 0
    hours = int(m.group(1) or 0)
    minutes = int(m.group(2) or 0)
    seconds = int(m.group(3) or 0)
    return hours * 3600 + minutes * 60 + seconds

def format_view_count(count: int) -> str:
    """Format view count for human display."""
    if count >= 1_000_000:
        return f"{count/1_000_000:.1f}M"
    if count >= 1_000:
        return f"{count/1_000:.0f}K"
    return str(count)

13. Channel Analytics and Growth Tracking {#analytics}

import sqlite3
from datetime import datetime, timedelta

def track_channel_growth(channel_id: str, db: sqlite3.Connection):
    """Record channel stats snapshot for growth tracking."""
    data = yt.get("channels", {
        "part": "statistics",
        "id": channel_id,
    })
    if not data.get("items"):
        return

    stats = data["items"][0]["statistics"]
    db.execute("""
        CREATE TABLE IF NOT EXISTS channel_snapshots (
            channel_id TEXT,
            snapshot_date TEXT,
            subscribers INTEGER,
            video_count INTEGER,
            total_views INTEGER,
            PRIMARY KEY (channel_id, snapshot_date)
        )
    """)
    db.execute("""
        INSERT OR REPLACE INTO channel_snapshots VALUES (?,?,?,?,?)
    """, (
        channel_id, str(date.today()),
        int(stats.get("subscriberCount", 0)),
        int(stats.get("videoCount", 0)),
        int(stats.get("viewCount", 0)),
    ))
    db.commit()

def compute_growth_rate(channel_id: str, db: sqlite3.Connection,
                         days: int = 30) -> dict:
    """Compute subscriber/view growth over a period."""
    rows = db.execute("""
        SELECT snapshot_date, subscribers, total_views
        FROM channel_snapshots
        WHERE channel_id = ?
        ORDER BY snapshot_date DESC
        LIMIT ?
    """, (channel_id, days)).fetchall()

    if len(rows) < 2:
        return {"error": "insufficient data"}

    newest = rows[0]
    oldest = rows[-1]
    days_diff = (datetime.fromisoformat(newest[0]) -
                 datetime.fromisoformat(oldest[0])).days or 1

    return {
        "subscriber_growth": newest[1] - oldest[1],
        "view_growth": newest[2] - oldest[2],
        "days": days_diff,
        "daily_subscriber_rate": (newest[1] - oldest[1]) / days_diff,
        "daily_view_rate": (newest[2] - oldest[2]) / days_diff,
    }

14. Real Use Cases {#use-cases}

Content Gap Analysis

def find_content_gaps(competitor_channel_id: str,
                       your_topics: list[str]) -> list[dict]:
    """Find topics competitors cover that you don't."""
    # Get competitor's video titles
    uploads = get_channel_uploads_playlist(competitor_channel_id)
    videos = get_all_playlist_videos(uploads, max_videos=500)

    # Find videos about topics you haven't covered
    gaps = []
    for video in videos:
        title_lower = video["title"].lower()
        if not any(topic.lower() in title_lower for topic in your_topics):
            gaps.append(video)

    return sorted(gaps, key=lambda v: v.get("views", 0), reverse=True)

Viral Video Early Detection

def find_viral_candidates(channel_ids: list[str],
                           hours_old: int = 48) -> list[dict]:
    """Find recently published videos with rapidly growing engagement."""
    from datetime import datetime, timezone
    cutoff = datetime.now(timezone.utc).timestamp() - (hours_old * 3600)

    candidates = []
    for channel_id in channel_ids:
        uploads = get_channel_uploads_playlist(channel_id)
        recent = get_all_playlist_videos(uploads, max_videos=10)

        video_ids = [v["video_id"] for v in recent]
        details = batch_get_video_details(video_ids)

        for video_id, data in details.items():
            pub_ts = datetime.fromisoformat(
                data["published"].replace("Z", "+00:00")
            ).timestamp()
            if pub_ts > cutoff:
                age_hours = (time.time() - pub_ts) / 3600
                views_per_hour = data["views"] / max(age_hours, 1)
                if views_per_hour > 10000:  # Adjust threshold
                    candidates.append({
                        **data,
                        "video_id": video_id,
                        "age_hours": round(age_hours, 1),
                        "views_per_hour": round(views_per_hour),
                    })

    return sorted(candidates, key=lambda x: -x["views_per_hour"])

15. Common Errors and Fixes {#errors}

Error Cause Fix
quotaExceeded Hit 10,000 daily units Wait until midnight Pacific; add second project
Empty items on playlist Playlist is private or deleted Check with playlists.list first
commentsDisabled Channel disabled comments Skip comment collection for this video
videoNotFound Video deleted or private Remove from your tracking list
403 on InnerTube IP blocked by Google bot detection Use residential proxies
Stale nextPageToken Token expired (they last ~1 hour) Re-fetch from the beginning of the page
processingFailure Video still processing Retry after 5 minutes
Empty subscriberCount Channel hid subscribers Normal -- some channels hide this

16. Practical Tips Summary {#summary}

Quota management: - Batch videos.list at 50 IDs per call -- 1 unit for 50 videos vs 50 units for 50 individual calls - Use fields parameter to cut response size by 50-80% - Cache channel uploads playlist IDs -- they don't change - Search is 100x more expensive than direct lookups -- avoid when possible

InnerTube: - Keep clientVersion strings current -- stale versions get rejected within weeks - Use residential proxies from ThorData for sustained InnerTube access - The renderer structure changes with YouTube UI updates -- build flexible parsers that can traverse unknown nesting

Comments: - replace_more equivalent in the API is fetching all thread reply pages -- add quota cost - Comment count in statistics doesn't match what's retrievable -- stop when nextPageToken is absent - Handle commentsDisabled gracefully -- it's common on news channels and controversial videos

Transcripts: - Use InnerTube's /player endpoint for transcript URLs -- free and no quota cost - Auto-generated captions cover 90% of English videos but have accuracy issues - Cache transcripts locally -- they rarely change after initial upload

YouTube scraping in 2026 isn't fundamentally different from previous years. The Data API is stable and generous for direct lookups. InnerTube endpoints change more frequently but expose data the official API doesn't. Build your code to handle both, track your quota carefully, and cache aggressively. The data is there if you're systematic about extracting it.