← Back to blog

Scraping Google Trends in 2026: Real-Time Data, Rising Queries, and Geographic Breakdown

Scraping Google Trends in 2026: Real-Time Data, Rising Queries, and Geographic Breakdown

I've spent a fair amount of time pulling data from Google Trends — for content strategy pipelines, early trading signals for crypto, and validating niche ideas before committing to them. The data is genuinely useful. Watching a celebrity's name climb before mainstream press picks it up, or seeing "solar panel installation" spike every March, gives you a real edge. The problem is getting the data reliably, because Google has made it progressively harder over the years.

This post covers what I've figured out: using pytrends, hitting the raw API when the library fails, handling rate limits, rotating proxies, building a continuous monitoring loop, and real-world use cases across several domains.


Google's Anti-Scraping Defenses

Before touching any code, it helps to understand what you're up against.

Google Trends sits behind a cookie consent wall in EU regions, which blocks unauthenticated requests before they reach the actual data endpoints. Even outside Europe, Google requires the NID cookie for most API calls. Without a browser session cookie, you'll get a redirect or a 403.

Rate limiting is aggressive. In my testing during early 2026, you get roughly 10–15 requests before Google starts returning 429s. The backoff period is unpredictable — sometimes 30 seconds, sometimes several minutes. Push through the 429s and you hit a CAPTCHA gate that's essentially impossible to automate past without solving it externally.

The main defenses in order of frequency: 1. NID cookie requirement — most endpoints need a valid Google session cookie 2. Per-IP rate limiting — 10-15 requests then 429 3. CAPTCHA gates — triggered after repeated rate limit violations 4. Geographic consent redirects — EU requests hit GDPR consent pages first


Installation and Setup

# Install dependencies
# pip install pytrends requests pandas

from pytrends.request import TrendReq
import pandas as pd
import requests
import json
import time
import random
import sqlite3
import csv
from pathlib import Path
from datetime import datetime, timezone, timedelta

Using pytrends

Pytrends is the most practical starting point. It wraps the unofficial Google Trends API and handles session setup, token fetching, and response parsing.

def create_pytrends_client(proxy=None, timeout=(10, 25)):
    """
    Create a pytrends TrendReq client with optional proxy support.

    Args:
        proxy: Proxy URL string (e.g. "http://user:pass@host:port")
        timeout: (connect_timeout, read_timeout) tuple

    Returns:
        Configured TrendReq instance
    """
    kwargs = {
        "hl": "en-US",
        "tz": 360,
        "timeout": timeout,
        "retries": 3,
        "backoff_factor": 0.5,
    }

    if proxy:
        kwargs["proxies"] = {
            "http": proxy,
            "https": proxy,
        }

    return TrendReq(**kwargs)


pytrends = create_pytrends_client()

def get_interest_over_time(keywords, timeframe="today 3-m", geo="", cat=0):
    """
    Get interest-over-time data for a list of keywords.

    Args:
        keywords: List of 1-5 keywords to compare
        timeframe: Time range. Options:
                   "now 1-H" — last hour
                   "now 4-H" — last 4 hours
                   "now 1-d" — last day
                   "now 7-d" — last 7 days
                   "today 1-m" — last 30 days
                   "today 3-m" — last 90 days
                   "today 12-m" — last year
                   "today 5-y" — last 5 years
                   "all" — since 2004
                   "2020-01-01 2020-12-31" — custom range
        geo: Country code (e.g. "US", "GB") or "" for worldwide
        cat: Category number (0 = all categories)

    Returns:
        DataFrame with interest scores (0-100) indexed by time
    """
    if len(keywords) > 5:
        raise ValueError("Google Trends allows maximum 5 keywords per request")

    pytrends.build_payload(
        keywords,
        cat=cat,
        timeframe=timeframe,
        geo=geo,
    )

    df = pytrends.interest_over_time()

    # Drop the 'isPartial' column if present
    if "isPartial" in df.columns:
        df = df.drop(columns=["isPartial"])

    return df


# Example: compare electric vehicle brands
ev_interest = get_interest_over_time(
    ["Tesla", "Rivian", "Lucid Motors", "BYD"],
    timeframe="today 12-m",
    geo="US"
)
print(ev_interest.tail(4))
time.sleep(2)

The related queries endpoint is often the most actionable part of Google Trends — it shows you what people are searching for alongside your keyword, and crucially the "rising" section shows breakout searches gaining momentum.

def get_related_queries(keywords, timeframe="today 3-m", geo="US"):
    """
    Get top and rising related queries for each keyword.

    Args:
        keywords: List of 1-5 keywords
        timeframe: Time range string
        geo: Country code or "" for worldwide

    Returns:
        Dict mapping each keyword to {"top": DataFrame, "rising": DataFrame}
    """
    pytrends.build_payload(keywords, timeframe=timeframe, geo=geo)

    time.sleep(1)
    related = pytrends.related_queries()

    results = {}
    for kw in keywords:
        kw_data = related.get(kw, {})
        results[kw] = {
            "top": kw_data.get("top"),
            "rising": kw_data.get("rising"),
        }

    return results


def get_related_topics(keywords, timeframe="today 3-m", geo="US"):
    """
    Get top and rising related topics (entities, not just keyword strings).
    Topics are more semantically meaningful than raw queries.

    Returns:
        Dict mapping each keyword to {"top": DataFrame, "rising": DataFrame}
    """
    pytrends.build_payload(keywords, timeframe=timeframe, geo=geo)
    time.sleep(1)
    return pytrends.related_topics()


# Example: find rising searches around "electric vehicle"
queries = get_related_queries(["electric vehicle"], timeframe="today 3-m", geo="US")

for kw, data in queries.items():
    print(f"\n=== {kw} ===")

    if data["rising"] is not None and not data["rising"].empty:
        print("RISING queries:")
        print(data["rising"].head(10).to_string(index=False))

    time.sleep(2)

    if data["top"] is not None and not data["top"].empty:
        print("\nTOP queries:")
        print(data["top"].head(10).to_string(index=False))

The "rising" queries show percentage growth (or "Breakout" for >5000% growth). These are the early signals before mainstream coverage picks up a trend.


For currently trending topics, the trending_searches method gives you what Google considers hot right now:

def get_trending_searches(country="united_states"):
    """
    Get currently trending searches for a country.

    Country names use underscore format: united_states, united_kingdom,
    canada, australia, germany, france, japan, brazil, india, etc.

    Returns:
        DataFrame with trending search terms
    """
    trending = pytrends.trending_searches(pn=country)
    return trending


def get_realtime_trends_direct(geo="US"):
    """
    Get real-time trending stories directly from Google Trends API.
    More detailed than trending_searches — includes news stories and entities.

    Requires a valid NID cookie for reliable results.

    Returns:
        List of (title, entities) tuples
    """
    url = "https://trends.google.com/trends/api/realtimetrends"
    params = {
        "hl": "en-US",
        "tz": "-300",
        "cat": "all",
        "fi": "0",
        "fs": "0",
        "geo": geo,
        "ri": "300",
        "rs": "20",
        "sort": "0",
    }
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": "https://trends.google.com/",
    }

    resp = requests.get(url, params=params, headers=headers, timeout=20)
    if resp.status_code != 200:
        print(f"HTTP {resp.status_code}")
        return []

    # Google prepends ")]}'\n" to JSON responses to prevent hijacking
    try:
        data = json.loads(resp.text[5:])
    except json.JSONDecodeError:
        print("Failed to parse response")
        return []

    stories = data.get("storySummaries", {}).get("trendingStories", [])

    results = []
    for s in stories[:20]:
        title = s.get("title", "")
        entities = s.get("entityNames", [])
        articles = s.get("articles", [])
        image = s.get("image", {}).get("imgUrl", "")

        results.append({
            "title": title,
            "entities": entities,
            "article_count": len(articles),
            "first_source": articles[0].get("source", "") if articles else "",
            "image_url": image,
        })

    return results


# Get trending topics
trending_us = get_trending_searches("united_states")
print("US trending searches:")
print(trending_us.head(20))

time.sleep(3)

realtime = get_realtime_trends_direct("US")
print("\nReal-time trending stories:")
for story in realtime[:5]:
    print(f"  {story['title']} — {', '.join(story['entities'][:3])}")

Direct API Access (When pytrends Fails)

When pytrends starts misbehaving — which happens when Google rotates its internal API structure — going directly to the API endpoints is more reliable because you control exactly what gets sent. The core flow has two steps: fetch a widget token, then use it to get the actual data.

def create_direct_session(proxy=None):
    """Create a requests session configured for Google Trends direct API access."""
    s = requests.Session()
    s.headers.update({
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": "https://trends.google.com/",
    })
    if proxy:
        s.proxies = {"http": proxy, "https": proxy}
    return s


def get_trends_token(session, keyword, geo="US", timeframe="today 12-m"):
    """
    Fetch the widget token needed for data API calls.
    Google Trends requires a two-step process: get token, then get data.

    Returns:
        (token, req_payload) tuple or (None, None) on failure
    """
    explore_url = "https://trends.google.com/trends/api/explore"
    params = {
        "hl": "en-US",
        "tz": "360",
        "req": json.dumps({
            "comparisonItem": [{"keyword": keyword, "geo": geo, "time": timeframe}],
            "category": 0,
            "property": "",
        }),
    }

    try:
        resp = session.get(explore_url, params=params, timeout=20)
        if resp.status_code != 200:
            print(f"Explore returned {resp.status_code}")
            return None, None

        data = json.loads(resp.text[5:])  # Strip XSSI prefix
        widgets = data.get("widgets", [])

        # Find the TIMESERIES widget
        iot_widget = next((w for w in widgets if w["id"] == "TIMESERIES"), None)
        if not iot_widget:
            return None, None

        return iot_widget["token"], iot_widget["request"]

    except Exception as e:
        print(f"Token fetch failed: {e}")
        return None, None


def get_timeseries_data(session, token, req_payload):
    """
    Fetch interest-over-time data using a widget token.

    Returns:
        List of {"date": ..., "value": ..., "isPartial": ...} dicts
    """
    multiline_url = "https://trends.google.com/trends/api/widgetdata/multiline"
    params = {
        "hl": "en-US",
        "tz": "360",
        "req": json.dumps(req_payload),
        "token": token,
    }

    try:
        resp = session.get(multiline_url, params=params, timeout=20)
        if resp.status_code != 200:
            return []

        data = json.loads(resp.text[5:])
        timeline_data = data.get("default", {}).get("timelineData", [])

        results = []
        for point in timeline_data:
            results.append({
                "date": point.get("formattedTime", ""),
                "timestamp": point.get("formattedAxisTime", ""),
                "value": point.get("value", [0])[0],
                "is_partial": point.get("isPartial", False),
            })

        return results

    except Exception as e:
        print(f"Data fetch failed: {e}")
        return []


def get_geomap_data(session, token_geo, req_payload_geo):
    """
    Fetch geographic interest breakdown using a GEOMAP widget token.

    Returns:
        List of {"geoCode": ..., "geoName": ..., "value": ...} dicts
    """
    geomap_url = "https://trends.google.com/trends/api/widgetdata/comparedgeo"
    params = {
        "hl": "en-US",
        "tz": "360",
        "req": json.dumps(req_payload_geo),
        "token": token_geo,
    }

    try:
        resp = session.get(geomap_url, params=params, timeout=20)
        if resp.status_code != 200:
            return []

        data = json.loads(resp.text[5:])
        geo_data = data.get("default", {}).get("geoMapData", [])

        return [
            {
                "code": item.get("geoCode", ""),
                "name": item.get("geoName", ""),
                "value": item.get("value", [0])[0],
                "max_value": item.get("maxValueIndex", 0),
            }
            for item in geo_data
        ]

    except Exception as e:
        print(f"Geo data fetch failed: {e}")
        return []


# Full direct API example
direct_session = create_direct_session()

token, req_payload = get_trends_token(direct_session, "solar panels", geo="US", timeframe="today 12-m")
if token:
    timeseries = get_timeseries_data(direct_session, token, req_payload)
    print(f"Got {len(timeseries)} data points for 'solar panels'")
    for point in timeseries[-5:]:
        print(f"  {point['date']}: {point['value']}")

Geographic Breakdown

Comparing interest across regions is one of the most valuable features for market research:

def compare_by_country(keyword, countries, timeframe="today 3-m", delay=3.0):
    """
    Compare keyword interest across multiple countries.

    Args:
        keyword: Search term to compare
        countries: List of ISO 2-letter country codes
        timeframe: Time range string
        delay: Seconds to wait between requests (be polite)

    Returns:
        Dict mapping country code to average interest score
    """
    results = {}
    for geo in countries:
        try:
            pytrends.build_payload([keyword], timeframe=timeframe, geo=geo)
            df = pytrends.interest_over_time()

            if not df.empty and keyword in df.columns:
                avg = df[keyword].mean()
                results[geo] = round(avg, 1)
                print(f"  {geo}: {avg:.1f}")
            else:
                results[geo] = 0

        except Exception as e:
            print(f"  {geo}: error ({e})")
            results[geo] = None

        time.sleep(delay)

    return results


def compare_by_us_state(keyword, timeframe="today 12-m"):
    """
    Get interest breakdown by US state (sub-region resolution).

    Returns:
        DataFrame with state-level interest scores
    """
    pytrends.build_payload([keyword], timeframe=timeframe, geo="US")
    time.sleep(1)
    return pytrends.interest_by_region(resolution="REGION", inc_low_vol=True)


def compare_by_city(keyword, country="US", timeframe="today 12-m"):
    """
    Get interest breakdown by city within a country.
    Note: Data quality drops significantly for smaller cities.

    Returns:
        DataFrame with city-level interest scores
    """
    pytrends.build_payload([keyword], timeframe=timeframe, geo=country)
    time.sleep(1)
    return pytrends.interest_by_region(resolution="CITY", inc_low_vol=False)


# Heat pump adoption comparison
countries = ["US", "GB", "DE", "FR", "SE", "AU", "CA", "NL"]
heat_pump_interest = compare_by_country("heat pump", countries, timeframe="today 12-m")

print("\nHeat pump search interest by country:")
for country, score in sorted(heat_pump_interest.items(), key=lambda x: -(x[1] or 0)):
    bar = "=" * int((score or 0) / 2)
    print(f"  {country}: {score:5.1f} {bar}")

time.sleep(3)

# US state breakdown
us_states = compare_by_us_state("solar panels", timeframe="today 12-m")
print("\nTop US states for 'solar panels':")
print(us_states.nlargest(10, "solar panels"))

Scaling with Proxies

Google rate-limits by IP, and even with careful request pacing you'll get blocked running continuous monitoring from a single address. Residential proxies are the practical answer because Google's detection looks at IP reputation, not just request frequency — datacenter proxies get flagged fast.

I've had good results with ThorData's residential proxy network, which rotates through real residential IPs with solid US and EU coverage. For Google Trends specifically, you want residential IPs to avoid triggering the CAPTCHA gate.

THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"

def build_thordata_proxy(country="US", sticky=False, session_id=None):
    """
    Build a ThorData proxy URL.

    For rotating (new IP each request): sticky=False
    For sticky sessions (same IP for minutes): sticky=True

    Sign up at: https://thordata.partnerstack.com/partner/0a0x4nzh
    """
    import uuid
    if sticky:
        sid = session_id or str(uuid.uuid4())[:8]
        user = f"{THORDATA_USER}-session-{sid}-country-{country}"
    else:
        user = f"{THORDATA_USER}-country-{country}"

    return f"http://{user}:{THORDATA_PASS}@gate.thordata.net:7777"


def create_proxied_pytrends(country="US"):
    """Create a pytrends client routed through a residential proxy."""
    proxy_url = build_thordata_proxy(country=country)
    return TrendReq(
        hl="en-US",
        tz=360,
        proxies={
            "http": proxy_url,
            "https": proxy_url,
        },
        timeout=(10, 30),
        retries=2,
        backoff_factor=0.5,
    )


class RotatingTrendsClient:
    """
    pytrends client that rotates through proxies to avoid rate limits.
    Creates a fresh client with a new proxy after each rate limit hit.
    """

    def __init__(self, country="US", max_errors=3):
        self.country = country
        self.max_errors = max_errors
        self.client = create_proxied_pytrends(country)
        self.error_count = 0

    def _refresh_client(self):
        """Get a fresh proxy connection."""
        print("Refreshing proxy connection...")
        time.sleep(random.uniform(2, 5))
        self.client = create_proxied_pytrends(self.country)
        self.error_count = 0

    def get_interest_over_time(self, keywords, timeframe="today 3-m", geo=""):
        """Get interest data with automatic retry on rate limit."""
        for attempt in range(3):
            try:
                self.client.build_payload(keywords, timeframe=timeframe, geo=geo)
                df = self.client.interest_over_time()
                self.error_count = 0
                return df
            except Exception as e:
                self.error_count += 1
                print(f"Error (attempt {attempt+1}): {e}")

                if self.error_count >= self.max_errors:
                    self._refresh_client()

                time.sleep(random.uniform(5, 15))

        return pd.DataFrame()


# Use rotating client for sustained collection
rotating_client = RotatingTrendsClient(country="US")

Building a Trend Monitor

Here is a complete hourly monitor that checks a keyword list and alerts when any term spikes relative to its recent baseline. This is useful for catching viral moments early, monitoring brand mentions, or detecting emerging market opportunities.

import statistics

KEYWORDS_TO_MONITOR = [
    "bitcoin",
    "ethereum",
    "solana",
    "NFT",
    "web3",
]
SPIKE_THRESHOLD = 1.5   # Alert when value is 1.5x the recent baseline
BASELINE_WINDOW = 20    # Data points to use for baseline calculation
CHECK_INTERVAL = 3600   # Seconds between checks (1 hour)


def get_recent_interest(pytrends_client, keyword, timeframe="now 1-d"):
    """Get the most recent interest values for a keyword."""
    try:
        pytrends_client.build_payload([keyword], timeframe=timeframe)
        df = pytrends_client.interest_over_time()
        if df.empty or keyword not in df.columns:
            return None
        return df[keyword].tolist()
    except Exception as e:
        print(f"  {keyword}: error — {e}")
        return None


def check_for_spikes(history, latest, threshold=SPIKE_THRESHOLD):
    """
    Check if the latest value represents a spike vs recent history.

    Args:
        history: List of historical values
        latest: Current value to check
        threshold: Multiple of baseline to trigger alert

    Returns:
        (is_spike, baseline) tuple
    """
    if len(history) < 5:
        return False, 0

    # Use recent history for baseline, excluding obvious outliers
    recent = history[-BASELINE_WINDOW:]
    baseline = statistics.median(recent)

    if baseline == 0:
        return False, 0

    return (latest / baseline) >= threshold, baseline


def save_spike_alert(keyword, latest, baseline, timestamp, db_path="trend_monitor.db"):
    """Save a spike alert to SQLite for review."""
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS alerts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            keyword TEXT,
            value INTEGER,
            baseline REAL,
            ratio REAL,
            detected_at TEXT
        )
    """)
    conn.execute(
        "INSERT INTO alerts (keyword, value, baseline, ratio, detected_at) VALUES (?, ?, ?, ?, ?)",
        (keyword, latest, baseline, latest / baseline if baseline > 0 else 0, timestamp)
    )
    conn.commit()
    conn.close()


def run_trend_monitor(keywords=KEYWORDS_TO_MONITOR, interval=CHECK_INTERVAL, proxy=None):
    """
    Run a continuous trend monitor.

    Args:
        keywords: List of keywords to monitor
        interval: Seconds between checks
        proxy: Optional proxy URL for requests
    """
    pytrends_client = create_pytrends_client(proxy=proxy)
    history = {kw: [] for kw in keywords}

    print(f"Starting trend monitor for: {', '.join(keywords)}")
    print(f"Check interval: {interval}s | Spike threshold: {SPIKE_THRESHOLD}x baseline")
    print("=" * 60)

    while True:
        check_time = datetime.now(timezone.utc)
        timestamp = check_time.isoformat()
        print(f"\n[{check_time.strftime('%Y-%m-%d %H:%M UTC')}] Running checks...")

        for kw in keywords:
            values = get_recent_interest(pytrends_client, kw)

            if values is None:
                print(f"  {kw}: no data (skipping)")
                time.sleep(4)
                continue

            latest = values[-1] if values else 0
            is_spike, baseline = check_for_spikes(history[kw], latest)

            if is_spike:
                ratio = latest / baseline if baseline > 0 else 0
                print(f"  SPIKE DETECTED: '{kw}' = {latest} (baseline {baseline:.1f}, ratio {ratio:.2f}x)")
                save_spike_alert(kw, latest, baseline, timestamp)
            else:
                print(f"  {kw}: {latest} (baseline {baseline:.1f})")

            # Update history
            history[kw] = (history[kw] + values)[-50:]

            time.sleep(4)  # Polite delay between keyword checks

        print(f"  Next check in {interval}s...")
        time.sleep(interval)


# Start the monitor (run in background or as a service)
# run_trend_monitor(keywords=["bitcoin", "ethereum", "solana"])

Batch Keyword Comparison

Compare large keyword sets by batching into groups of 5 (the API maximum):

def batch_compare(keywords, anchor=None, timeframe="today 3-m", geo="US", delay=3.0):
    """
    Compare interest for more than 5 keywords by using an anchor term.

    When comparing across batches, include a common anchor keyword in each batch.
    This allows you to normalize scores across batches using the anchor's consistent value.

    Args:
        keywords: Any number of keywords to compare
        anchor: Reference keyword included in every batch for normalization
        timeframe: Time range string
        geo: Country code
        delay: Seconds between batch requests

    Returns:
        Dict mapping keyword to average interest score
    """
    if not anchor:
        anchor = keywords[0]

    # Split into batches of 4 (leaving one slot for anchor)
    batch_size = 4
    others = [kw for kw in keywords if kw != anchor]
    batches = [others[i:i+batch_size] for i in range(0, len(others), batch_size)]

    results = {}
    anchor_values = None

    for i, batch in enumerate(batches):
        batch_keywords = [anchor] + batch
        print(f"Batch {i+1}/{len(batches)}: {batch_keywords}")

        try:
            pytrends.build_payload(batch_keywords, timeframe=timeframe, geo=geo)
            df = pytrends.interest_over_time()

            if df.empty:
                print(f"  Empty response for batch {i+1}")
                time.sleep(delay)
                continue

            # Store anchor values from first batch as reference
            if anchor_values is None and anchor in df.columns:
                anchor_values = df[anchor].mean()

            # Normalize each keyword's score relative to anchor
            anchor_in_batch = df[anchor].mean() if anchor in df.columns and anchor_values else 1

            for kw in batch:
                if kw in df.columns:
                    raw_avg = df[kw].mean()
                    # Normalize: scale so the anchor has a consistent value
                    if anchor_in_batch > 0 and anchor_values:
                        normalized = raw_avg * (anchor_values / anchor_in_batch)
                    else:
                        normalized = raw_avg
                    results[kw] = round(normalized, 1)

        except Exception as e:
            print(f"  Batch {i+1} error: {e}")

        time.sleep(delay)

    # Always include anchor
    results[anchor] = round(anchor_values, 1) if anchor_values else 0
    return dict(sorted(results.items(), key=lambda x: -x[1]))


# Compare 15 AI tools across one timeframe
ai_tools = [
    "ChatGPT", "Claude", "Gemini", "Copilot",
    "Midjourney", "Stable Diffusion", "DALL-E",
    "Perplexity", "Cursor", "Windsurf",
    "Llama", "Mistral", "Grok", "Sora", "Suno"
]

comparison = batch_compare(ai_tools, anchor="ChatGPT", timeframe="today 12-m", geo="US")
print("\nAI tool search interest comparison (normalized):")
for kw, score in comparison.items():
    bar = "=" * int(score / 2)
    print(f"  {kw:20s}: {score:5.1f} {bar}")

SQLite Storage for Trend History

Build a historical database of keyword interest for trend analysis over time:

def init_trends_db(path="trends_history.db"):
    """Initialize SQLite database for trend data storage."""
    conn = sqlite3.connect(path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS interest_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            keyword TEXT NOT NULL,
            geo TEXT NOT NULL DEFAULT '',
            date_label TEXT NOT NULL,
            value INTEGER,
            is_partial INTEGER DEFAULT 0,
            collected_at TEXT,
            UNIQUE(keyword, geo, date_label)
        );

        CREATE TABLE IF NOT EXISTS related_queries (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            keyword TEXT NOT NULL,
            geo TEXT NOT NULL DEFAULT '',
            query_type TEXT NOT NULL,
            query TEXT NOT NULL,
            value TEXT,
            collected_at TEXT,
            UNIQUE(keyword, geo, query_type, query)
        );

        CREATE TABLE IF NOT EXISTS spike_alerts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            keyword TEXT NOT NULL,
            value INTEGER,
            baseline REAL,
            ratio REAL,
            detected_at TEXT
        );

        CREATE INDEX IF NOT EXISTS idx_interest_keyword ON interest_data(keyword);
        CREATE INDEX IF NOT EXISTS idx_interest_date ON interest_data(date_label);
        CREATE INDEX IF NOT EXISTS idx_related_keyword ON related_queries(keyword);
    """)
    conn.commit()
    return conn


def store_interest_data(conn, keyword, geo, df):
    """Store interest-over-time data to SQLite."""
    if df.empty or keyword not in df.columns:
        return 0

    now = datetime.now(timezone.utc).isoformat()
    rows = []

    for idx, row in df.iterrows():
        date_label = idx.strftime("%Y-%m-%d %H:%M") if hasattr(idx, "strftime") else str(idx)
        rows.append((
            keyword, geo, date_label,
            int(row[keyword]),
            int(row.get("isPartial", 0)),
            now,
        ))

    conn.executemany(
        """
        INSERT OR REPLACE INTO interest_data
            (keyword, geo, date_label, value, is_partial, collected_at)
        VALUES (?, ?, ?, ?, ?, ?)
        """,
        rows
    )
    conn.commit()
    return len(rows)


def store_related_queries(conn, keyword, geo, related_data):
    """Store related query data to SQLite."""
    now = datetime.now(timezone.utc).isoformat()
    rows = []

    for query_type in ["top", "rising"]:
        df = related_data.get(query_type)
        if df is None or df.empty:
            continue
        for _, row in df.iterrows():
            query = row.get("query", "")
            value = str(row.get("value", ""))
            if query:
                rows.append((keyword, geo, query_type, query, value, now))

    if rows:
        conn.executemany(
            """
            INSERT OR REPLACE INTO related_queries
                (keyword, geo, query_type, query, value, collected_at)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            rows
        )
        conn.commit()

    return len(rows)

Real-World Use Cases

Use Case 1: Content Strategy Pipeline

Identify rising search topics before they become saturated:

def find_content_opportunities(seed_keywords, geo="US", min_rising_score=1000):
    """
    Find content opportunities by analyzing rising related queries.

    Looks for queries that are breaking out (>5000% growth = "Breakout")
    or rising fast (>1000% growth), but haven't yet been written about much.

    Returns:
        List of opportunity dicts sorted by estimated potential
    """
    opportunities = []

    for seed in seed_keywords:
        print(f"Analyzing: {seed}")

        pytrends.build_payload([seed], timeframe="today 3-m", geo=geo)
        time.sleep(1)

        try:
            related = pytrends.related_queries()
            rising_df = related.get(seed, {}).get("rising")

            if rising_df is None or rising_df.empty:
                continue

            for _, row in rising_df.iterrows():
                query = row.get("query", "")
                value = row.get("value", 0)

                if not query:
                    continue

                # "Breakout" means >5000% growth — these are the gold
                is_breakout = value == 0 or str(value).lower() == "breakout"
                numeric_value = 9999 if is_breakout else int(value or 0)

                if is_breakout or numeric_value >= min_rising_score:
                    opportunities.append({
                        "seed": seed,
                        "query": query,
                        "growth": "Breakout" if is_breakout else f"+{numeric_value}%",
                        "numeric": numeric_value,
                        "geo": geo,
                    })

        except Exception as e:
            print(f"  Error for {seed}: {e}")

        time.sleep(3)

    return sorted(opportunities, key=lambda x: -x["numeric"])


seeds = ["python web scraping", "data extraction", "web automation", "playwright python"]
opps = find_content_opportunities(seeds, geo="US")

print("\nContent opportunities by growth:")
for opp in opps[:15]:
    print(f"  [{opp['growth']:>12s}] {opp['query']} (via '{opp['seed']}')")

Use Case 2: Niche Validation Before Building

Before building a product or writing a series of articles, validate that search demand is growing (not declining):

def validate_niche(keyword, geo="US"):
    """
    Validate whether a niche is growing, stable, or declining.
    Uses 5-year trend data to identify the trajectory.

    Returns:
        Dict with trend assessment and key metrics
    """
    pytrends.build_payload([keyword], timeframe="today 5-y", geo=geo)
    df = pytrends.interest_over_time()

    if df.empty or keyword not in df.columns:
        return {"status": "no data"}

    values = df[keyword].tolist()
    n = len(values)

    if n < 4:
        return {"status": "insufficient data"}

    # Simple linear regression for trend direction
    x_mean = sum(range(n)) / n
    y_mean = sum(values) / n

    numerator = sum((i - x_mean) * (v - y_mean) for i, v in enumerate(values))
    denominator = sum((i - x_mean) ** 2 for i in range(n))
    slope = numerator / denominator if denominator != 0 else 0

    # Assess peak vs current
    peak = max(values)
    current = values[-1]
    peak_idx = values.index(peak)
    peak_fraction = peak_idx / n  # 0 = early, 1 = recent

    assessment = {
        "keyword": keyword,
        "geo": geo,
        "current_score": current,
        "peak_score": peak,
        "current_vs_peak_pct": round(100 * current / peak, 1) if peak > 0 else 0,
        "slope_per_month": round(slope, 2),
        "trend": "growing" if slope > 0.3 else ("declining" if slope < -0.3 else "stable"),
        "peak_timing": "recent" if peak_fraction > 0.7 else ("middle" if peak_fraction > 0.3 else "old"),
        "data_points": n,
    }

    print(f"\n=== Niche validation: {keyword} ({geo}) ===")
    print(f"  Current score: {current}/100")
    print(f"  Peak: {peak}/100 at {peak_fraction:.0%} through the period")
    print(f"  Trend: {assessment['trend']} (slope: {slope:.2f}/month)")
    print(f"  Assessment: {assessment['trend'].upper()} {'(peak was recent - good!)' if peak_fraction > 0.7 and slope > 0 else ''}")

    return assessment


# Validate several niches before investing content effort
niches_to_check = ["playwright python", "uv python", "datastar framework", "htmx"]
for niche in niches_to_check:
    result = validate_niche(niche, geo="US")
    time.sleep(3)

Use Case 3: Crypto/Trading Signal Detection

Google Trends data has been correlated with crypto price movements in academic literature. Rising interest in a coin name often precedes price increases by 24-48 hours:

def build_crypto_signal(coins, geo="US"):
    """
    Build a weekly interest snapshot for crypto monitoring.
    Combines short-term momentum (1 week) with medium-term trend (3 months).

    Returns:
        List of dicts with momentum scores
    """
    signals = []

    for i in range(0, len(coins), 5):
        batch = coins[i:i+5]

        # Short-term momentum
        pytrends.build_payload(batch, timeframe="now 7-d", geo=geo)
        df_short = pytrends.interest_over_time()
        time.sleep(2)

        # Medium-term baseline
        pytrends.build_payload(batch, timeframe="today 3-m", geo=geo)
        df_medium = pytrends.interest_over_time()
        time.sleep(2)

        for coin in batch:
            if coin not in df_short.columns or coin not in df_medium.columns:
                continue

            recent_avg = df_short[coin].tail(24).mean()  # Last 24 hours
            medium_avg = df_medium[coin].mean()

            momentum = round(recent_avg / medium_avg, 2) if medium_avg > 0 else 0

            signals.append({
                "coin": coin,
                "recent_interest": round(recent_avg, 1),
                "baseline_interest": round(medium_avg, 1),
                "momentum_ratio": momentum,
                "signal": "bullish" if momentum > 1.3 else ("bearish" if momentum < 0.7 else "neutral"),
            })

        time.sleep(3)

    return sorted(signals, key=lambda x: -x["momentum_ratio"])


crypto_signals = build_crypto_signal(
    ["bitcoin", "ethereum", "solana", "cardano", "polkadot"],
    geo="US"
)
print("\nCrypto interest momentum:")
for s in crypto_signals:
    print(f"  {s['coin']:12s}: {s['momentum_ratio']:.2f}x ({s['signal']})")

Exporting Results

def export_trends_csv(df, keyword, output_dir="trends_exports"):
    """Export interest-over-time DataFrame to CSV."""
    Path(output_dir).mkdir(exist_ok=True)
    safe_name = keyword.replace(" ", "_").replace("/", "_")
    output_path = Path(output_dir) / f"{safe_name}_{datetime.now().strftime('%Y%m%d')}.csv"

    df_export = df.copy()
    if "isPartial" in df_export.columns:
        df_export = df_export.drop(columns=["isPartial"])

    df_export.to_csv(output_path)
    print(f"Saved {len(df_export)} rows to {output_path}")
    return output_path


def export_comparison_json(results_dict, output_path="trends_comparison.json"):
    """Export a keyword comparison dict to JSON."""
    with open(output_path, "w") as f:
        json.dump({
            "exported_at": datetime.now(timezone.utc).isoformat(),
            "data": results_dict,
        }, f, indent=2)
    print(f"Saved comparison to {output_path}")

Wrapping Up

Google Trends is still one of the more underused data sources for market research and content work. The data is free and the signal quality is solid. The engineering overhead — the NID cookie requirement, aggressive rate limiting, library instability — is real but manageable once you understand the system.

The practical workflow:

  1. Start with pytrends for quick experiments and simple keyword comparisons
  2. Switch to direct API calls when you need reliability or pytrends breaks
  3. Add residential proxies (ThorData) once you're running anything at scale or hitting rate limits regularly
  4. Build a SQLite history to track trends over time — the slope matters more than the current value
  5. Use the rising queries endpoint for content opportunities — breakout queries are gold

The monitoring loop pattern is the most valuable piece here: a lightweight process that checks your keyword list hourly, stores the data, and flags spikes. Run it on a small VPS and you have a 24/7 trend intelligence feed that costs almost nothing.