Scraping Google Trends in 2026: Real-Time Data, Rising Queries, and Geographic Breakdown
Scraping Google Trends in 2026: Real-Time Data, Rising Queries, and Geographic Breakdown
I've spent a fair amount of time pulling data from Google Trends — for content strategy pipelines, early trading signals for crypto, and validating niche ideas before committing to them. The data is genuinely useful. Watching a celebrity's name climb before mainstream press picks it up, or seeing "solar panel installation" spike every March, gives you a real edge. The problem is getting the data reliably, because Google has made it progressively harder over the years.
This post covers what I've figured out: using pytrends, hitting the raw API when the library fails, handling rate limits, rotating proxies, building a continuous monitoring loop, and real-world use cases across several domains.
Google's Anti-Scraping Defenses
Before touching any code, it helps to understand what you're up against.
Google Trends sits behind a cookie consent wall in EU regions, which blocks unauthenticated requests before they reach the actual data endpoints. Even outside Europe, Google requires the NID cookie for most API calls. Without a browser session cookie, you'll get a redirect or a 403.
Rate limiting is aggressive. In my testing during early 2026, you get roughly 10–15 requests before Google starts returning 429s. The backoff period is unpredictable — sometimes 30 seconds, sometimes several minutes. Push through the 429s and you hit a CAPTCHA gate that's essentially impossible to automate past without solving it externally.
The main defenses in order of frequency: 1. NID cookie requirement — most endpoints need a valid Google session cookie 2. Per-IP rate limiting — 10-15 requests then 429 3. CAPTCHA gates — triggered after repeated rate limit violations 4. Geographic consent redirects — EU requests hit GDPR consent pages first
Installation and Setup
# Install dependencies
# pip install pytrends requests pandas
from pytrends.request import TrendReq
import pandas as pd
import requests
import json
import time
import random
import sqlite3
import csv
from pathlib import Path
from datetime import datetime, timezone, timedelta
Using pytrends
Pytrends is the most practical starting point. It wraps the unofficial Google Trends API and handles session setup, token fetching, and response parsing.
def create_pytrends_client(proxy=None, timeout=(10, 25)):
"""
Create a pytrends TrendReq client with optional proxy support.
Args:
proxy: Proxy URL string (e.g. "http://user:pass@host:port")
timeout: (connect_timeout, read_timeout) tuple
Returns:
Configured TrendReq instance
"""
kwargs = {
"hl": "en-US",
"tz": 360,
"timeout": timeout,
"retries": 3,
"backoff_factor": 0.5,
}
if proxy:
kwargs["proxies"] = {
"http": proxy,
"https": proxy,
}
return TrendReq(**kwargs)
pytrends = create_pytrends_client()
def get_interest_over_time(keywords, timeframe="today 3-m", geo="", cat=0):
"""
Get interest-over-time data for a list of keywords.
Args:
keywords: List of 1-5 keywords to compare
timeframe: Time range. Options:
"now 1-H" — last hour
"now 4-H" — last 4 hours
"now 1-d" — last day
"now 7-d" — last 7 days
"today 1-m" — last 30 days
"today 3-m" — last 90 days
"today 12-m" — last year
"today 5-y" — last 5 years
"all" — since 2004
"2020-01-01 2020-12-31" — custom range
geo: Country code (e.g. "US", "GB") or "" for worldwide
cat: Category number (0 = all categories)
Returns:
DataFrame with interest scores (0-100) indexed by time
"""
if len(keywords) > 5:
raise ValueError("Google Trends allows maximum 5 keywords per request")
pytrends.build_payload(
keywords,
cat=cat,
timeframe=timeframe,
geo=geo,
)
df = pytrends.interest_over_time()
# Drop the 'isPartial' column if present
if "isPartial" in df.columns:
df = df.drop(columns=["isPartial"])
return df
# Example: compare electric vehicle brands
ev_interest = get_interest_over_time(
["Tesla", "Rivian", "Lucid Motors", "BYD"],
timeframe="today 12-m",
geo="US"
)
print(ev_interest.tail(4))
time.sleep(2)
Related Queries and Rising Topics
The related queries endpoint is often the most actionable part of Google Trends — it shows you what people are searching for alongside your keyword, and crucially the "rising" section shows breakout searches gaining momentum.
def get_related_queries(keywords, timeframe="today 3-m", geo="US"):
"""
Get top and rising related queries for each keyword.
Args:
keywords: List of 1-5 keywords
timeframe: Time range string
geo: Country code or "" for worldwide
Returns:
Dict mapping each keyword to {"top": DataFrame, "rising": DataFrame}
"""
pytrends.build_payload(keywords, timeframe=timeframe, geo=geo)
time.sleep(1)
related = pytrends.related_queries()
results = {}
for kw in keywords:
kw_data = related.get(kw, {})
results[kw] = {
"top": kw_data.get("top"),
"rising": kw_data.get("rising"),
}
return results
def get_related_topics(keywords, timeframe="today 3-m", geo="US"):
"""
Get top and rising related topics (entities, not just keyword strings).
Topics are more semantically meaningful than raw queries.
Returns:
Dict mapping each keyword to {"top": DataFrame, "rising": DataFrame}
"""
pytrends.build_payload(keywords, timeframe=timeframe, geo=geo)
time.sleep(1)
return pytrends.related_topics()
# Example: find rising searches around "electric vehicle"
queries = get_related_queries(["electric vehicle"], timeframe="today 3-m", geo="US")
for kw, data in queries.items():
print(f"\n=== {kw} ===")
if data["rising"] is not None and not data["rising"].empty:
print("RISING queries:")
print(data["rising"].head(10).to_string(index=False))
time.sleep(2)
if data["top"] is not None and not data["top"].empty:
print("\nTOP queries:")
print(data["top"].head(10).to_string(index=False))
The "rising" queries show percentage growth (or "Breakout" for >5000% growth). These are the early signals before mainstream coverage picks up a trend.
Trending Searches (Real-Time)
For currently trending topics, the trending_searches method gives you what Google considers hot right now:
def get_trending_searches(country="united_states"):
"""
Get currently trending searches for a country.
Country names use underscore format: united_states, united_kingdom,
canada, australia, germany, france, japan, brazil, india, etc.
Returns:
DataFrame with trending search terms
"""
trending = pytrends.trending_searches(pn=country)
return trending
def get_realtime_trends_direct(geo="US"):
"""
Get real-time trending stories directly from Google Trends API.
More detailed than trending_searches — includes news stories and entities.
Requires a valid NID cookie for reliable results.
Returns:
List of (title, entities) tuples
"""
url = "https://trends.google.com/trends/api/realtimetrends"
params = {
"hl": "en-US",
"tz": "-300",
"cat": "all",
"fi": "0",
"fs": "0",
"geo": geo,
"ri": "300",
"rs": "20",
"sort": "0",
}
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://trends.google.com/",
}
resp = requests.get(url, params=params, headers=headers, timeout=20)
if resp.status_code != 200:
print(f"HTTP {resp.status_code}")
return []
# Google prepends ")]}'\n" to JSON responses to prevent hijacking
try:
data = json.loads(resp.text[5:])
except json.JSONDecodeError:
print("Failed to parse response")
return []
stories = data.get("storySummaries", {}).get("trendingStories", [])
results = []
for s in stories[:20]:
title = s.get("title", "")
entities = s.get("entityNames", [])
articles = s.get("articles", [])
image = s.get("image", {}).get("imgUrl", "")
results.append({
"title": title,
"entities": entities,
"article_count": len(articles),
"first_source": articles[0].get("source", "") if articles else "",
"image_url": image,
})
return results
# Get trending topics
trending_us = get_trending_searches("united_states")
print("US trending searches:")
print(trending_us.head(20))
time.sleep(3)
realtime = get_realtime_trends_direct("US")
print("\nReal-time trending stories:")
for story in realtime[:5]:
print(f" {story['title']} — {', '.join(story['entities'][:3])}")
Direct API Access (When pytrends Fails)
When pytrends starts misbehaving — which happens when Google rotates its internal API structure — going directly to the API endpoints is more reliable because you control exactly what gets sent. The core flow has two steps: fetch a widget token, then use it to get the actual data.
def create_direct_session(proxy=None):
"""Create a requests session configured for Google Trends direct API access."""
s = requests.Session()
s.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
"Referer": "https://trends.google.com/",
})
if proxy:
s.proxies = {"http": proxy, "https": proxy}
return s
def get_trends_token(session, keyword, geo="US", timeframe="today 12-m"):
"""
Fetch the widget token needed for data API calls.
Google Trends requires a two-step process: get token, then get data.
Returns:
(token, req_payload) tuple or (None, None) on failure
"""
explore_url = "https://trends.google.com/trends/api/explore"
params = {
"hl": "en-US",
"tz": "360",
"req": json.dumps({
"comparisonItem": [{"keyword": keyword, "geo": geo, "time": timeframe}],
"category": 0,
"property": "",
}),
}
try:
resp = session.get(explore_url, params=params, timeout=20)
if resp.status_code != 200:
print(f"Explore returned {resp.status_code}")
return None, None
data = json.loads(resp.text[5:]) # Strip XSSI prefix
widgets = data.get("widgets", [])
# Find the TIMESERIES widget
iot_widget = next((w for w in widgets if w["id"] == "TIMESERIES"), None)
if not iot_widget:
return None, None
return iot_widget["token"], iot_widget["request"]
except Exception as e:
print(f"Token fetch failed: {e}")
return None, None
def get_timeseries_data(session, token, req_payload):
"""
Fetch interest-over-time data using a widget token.
Returns:
List of {"date": ..., "value": ..., "isPartial": ...} dicts
"""
multiline_url = "https://trends.google.com/trends/api/widgetdata/multiline"
params = {
"hl": "en-US",
"tz": "360",
"req": json.dumps(req_payload),
"token": token,
}
try:
resp = session.get(multiline_url, params=params, timeout=20)
if resp.status_code != 200:
return []
data = json.loads(resp.text[5:])
timeline_data = data.get("default", {}).get("timelineData", [])
results = []
for point in timeline_data:
results.append({
"date": point.get("formattedTime", ""),
"timestamp": point.get("formattedAxisTime", ""),
"value": point.get("value", [0])[0],
"is_partial": point.get("isPartial", False),
})
return results
except Exception as e:
print(f"Data fetch failed: {e}")
return []
def get_geomap_data(session, token_geo, req_payload_geo):
"""
Fetch geographic interest breakdown using a GEOMAP widget token.
Returns:
List of {"geoCode": ..., "geoName": ..., "value": ...} dicts
"""
geomap_url = "https://trends.google.com/trends/api/widgetdata/comparedgeo"
params = {
"hl": "en-US",
"tz": "360",
"req": json.dumps(req_payload_geo),
"token": token_geo,
}
try:
resp = session.get(geomap_url, params=params, timeout=20)
if resp.status_code != 200:
return []
data = json.loads(resp.text[5:])
geo_data = data.get("default", {}).get("geoMapData", [])
return [
{
"code": item.get("geoCode", ""),
"name": item.get("geoName", ""),
"value": item.get("value", [0])[0],
"max_value": item.get("maxValueIndex", 0),
}
for item in geo_data
]
except Exception as e:
print(f"Geo data fetch failed: {e}")
return []
# Full direct API example
direct_session = create_direct_session()
token, req_payload = get_trends_token(direct_session, "solar panels", geo="US", timeframe="today 12-m")
if token:
timeseries = get_timeseries_data(direct_session, token, req_payload)
print(f"Got {len(timeseries)} data points for 'solar panels'")
for point in timeseries[-5:]:
print(f" {point['date']}: {point['value']}")
Geographic Breakdown
Comparing interest across regions is one of the most valuable features for market research:
def compare_by_country(keyword, countries, timeframe="today 3-m", delay=3.0):
"""
Compare keyword interest across multiple countries.
Args:
keyword: Search term to compare
countries: List of ISO 2-letter country codes
timeframe: Time range string
delay: Seconds to wait between requests (be polite)
Returns:
Dict mapping country code to average interest score
"""
results = {}
for geo in countries:
try:
pytrends.build_payload([keyword], timeframe=timeframe, geo=geo)
df = pytrends.interest_over_time()
if not df.empty and keyword in df.columns:
avg = df[keyword].mean()
results[geo] = round(avg, 1)
print(f" {geo}: {avg:.1f}")
else:
results[geo] = 0
except Exception as e:
print(f" {geo}: error ({e})")
results[geo] = None
time.sleep(delay)
return results
def compare_by_us_state(keyword, timeframe="today 12-m"):
"""
Get interest breakdown by US state (sub-region resolution).
Returns:
DataFrame with state-level interest scores
"""
pytrends.build_payload([keyword], timeframe=timeframe, geo="US")
time.sleep(1)
return pytrends.interest_by_region(resolution="REGION", inc_low_vol=True)
def compare_by_city(keyword, country="US", timeframe="today 12-m"):
"""
Get interest breakdown by city within a country.
Note: Data quality drops significantly for smaller cities.
Returns:
DataFrame with city-level interest scores
"""
pytrends.build_payload([keyword], timeframe=timeframe, geo=country)
time.sleep(1)
return pytrends.interest_by_region(resolution="CITY", inc_low_vol=False)
# Heat pump adoption comparison
countries = ["US", "GB", "DE", "FR", "SE", "AU", "CA", "NL"]
heat_pump_interest = compare_by_country("heat pump", countries, timeframe="today 12-m")
print("\nHeat pump search interest by country:")
for country, score in sorted(heat_pump_interest.items(), key=lambda x: -(x[1] or 0)):
bar = "=" * int((score or 0) / 2)
print(f" {country}: {score:5.1f} {bar}")
time.sleep(3)
# US state breakdown
us_states = compare_by_us_state("solar panels", timeframe="today 12-m")
print("\nTop US states for 'solar panels':")
print(us_states.nlargest(10, "solar panels"))
Scaling with Proxies
Google rate-limits by IP, and even with careful request pacing you'll get blocked running continuous monitoring from a single address. Residential proxies are the practical answer because Google's detection looks at IP reputation, not just request frequency — datacenter proxies get flagged fast.
I've had good results with ThorData's residential proxy network, which rotates through real residential IPs with solid US and EU coverage. For Google Trends specifically, you want residential IPs to avoid triggering the CAPTCHA gate.
THORDATA_USER = "your_username"
THORDATA_PASS = "your_password"
def build_thordata_proxy(country="US", sticky=False, session_id=None):
"""
Build a ThorData proxy URL.
For rotating (new IP each request): sticky=False
For sticky sessions (same IP for minutes): sticky=True
Sign up at: https://thordata.partnerstack.com/partner/0a0x4nzh
"""
import uuid
if sticky:
sid = session_id or str(uuid.uuid4())[:8]
user = f"{THORDATA_USER}-session-{sid}-country-{country}"
else:
user = f"{THORDATA_USER}-country-{country}"
return f"http://{user}:{THORDATA_PASS}@gate.thordata.net:7777"
def create_proxied_pytrends(country="US"):
"""Create a pytrends client routed through a residential proxy."""
proxy_url = build_thordata_proxy(country=country)
return TrendReq(
hl="en-US",
tz=360,
proxies={
"http": proxy_url,
"https": proxy_url,
},
timeout=(10, 30),
retries=2,
backoff_factor=0.5,
)
class RotatingTrendsClient:
"""
pytrends client that rotates through proxies to avoid rate limits.
Creates a fresh client with a new proxy after each rate limit hit.
"""
def __init__(self, country="US", max_errors=3):
self.country = country
self.max_errors = max_errors
self.client = create_proxied_pytrends(country)
self.error_count = 0
def _refresh_client(self):
"""Get a fresh proxy connection."""
print("Refreshing proxy connection...")
time.sleep(random.uniform(2, 5))
self.client = create_proxied_pytrends(self.country)
self.error_count = 0
def get_interest_over_time(self, keywords, timeframe="today 3-m", geo=""):
"""Get interest data with automatic retry on rate limit."""
for attempt in range(3):
try:
self.client.build_payload(keywords, timeframe=timeframe, geo=geo)
df = self.client.interest_over_time()
self.error_count = 0
return df
except Exception as e:
self.error_count += 1
print(f"Error (attempt {attempt+1}): {e}")
if self.error_count >= self.max_errors:
self._refresh_client()
time.sleep(random.uniform(5, 15))
return pd.DataFrame()
# Use rotating client for sustained collection
rotating_client = RotatingTrendsClient(country="US")
Building a Trend Monitor
Here is a complete hourly monitor that checks a keyword list and alerts when any term spikes relative to its recent baseline. This is useful for catching viral moments early, monitoring brand mentions, or detecting emerging market opportunities.
import statistics
KEYWORDS_TO_MONITOR = [
"bitcoin",
"ethereum",
"solana",
"NFT",
"web3",
]
SPIKE_THRESHOLD = 1.5 # Alert when value is 1.5x the recent baseline
BASELINE_WINDOW = 20 # Data points to use for baseline calculation
CHECK_INTERVAL = 3600 # Seconds between checks (1 hour)
def get_recent_interest(pytrends_client, keyword, timeframe="now 1-d"):
"""Get the most recent interest values for a keyword."""
try:
pytrends_client.build_payload([keyword], timeframe=timeframe)
df = pytrends_client.interest_over_time()
if df.empty or keyword not in df.columns:
return None
return df[keyword].tolist()
except Exception as e:
print(f" {keyword}: error — {e}")
return None
def check_for_spikes(history, latest, threshold=SPIKE_THRESHOLD):
"""
Check if the latest value represents a spike vs recent history.
Args:
history: List of historical values
latest: Current value to check
threshold: Multiple of baseline to trigger alert
Returns:
(is_spike, baseline) tuple
"""
if len(history) < 5:
return False, 0
# Use recent history for baseline, excluding obvious outliers
recent = history[-BASELINE_WINDOW:]
baseline = statistics.median(recent)
if baseline == 0:
return False, 0
return (latest / baseline) >= threshold, baseline
def save_spike_alert(keyword, latest, baseline, timestamp, db_path="trend_monitor.db"):
"""Save a spike alert to SQLite for review."""
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
value INTEGER,
baseline REAL,
ratio REAL,
detected_at TEXT
)
""")
conn.execute(
"INSERT INTO alerts (keyword, value, baseline, ratio, detected_at) VALUES (?, ?, ?, ?, ?)",
(keyword, latest, baseline, latest / baseline if baseline > 0 else 0, timestamp)
)
conn.commit()
conn.close()
def run_trend_monitor(keywords=KEYWORDS_TO_MONITOR, interval=CHECK_INTERVAL, proxy=None):
"""
Run a continuous trend monitor.
Args:
keywords: List of keywords to monitor
interval: Seconds between checks
proxy: Optional proxy URL for requests
"""
pytrends_client = create_pytrends_client(proxy=proxy)
history = {kw: [] for kw in keywords}
print(f"Starting trend monitor for: {', '.join(keywords)}")
print(f"Check interval: {interval}s | Spike threshold: {SPIKE_THRESHOLD}x baseline")
print("=" * 60)
while True:
check_time = datetime.now(timezone.utc)
timestamp = check_time.isoformat()
print(f"\n[{check_time.strftime('%Y-%m-%d %H:%M UTC')}] Running checks...")
for kw in keywords:
values = get_recent_interest(pytrends_client, kw)
if values is None:
print(f" {kw}: no data (skipping)")
time.sleep(4)
continue
latest = values[-1] if values else 0
is_spike, baseline = check_for_spikes(history[kw], latest)
if is_spike:
ratio = latest / baseline if baseline > 0 else 0
print(f" SPIKE DETECTED: '{kw}' = {latest} (baseline {baseline:.1f}, ratio {ratio:.2f}x)")
save_spike_alert(kw, latest, baseline, timestamp)
else:
print(f" {kw}: {latest} (baseline {baseline:.1f})")
# Update history
history[kw] = (history[kw] + values)[-50:]
time.sleep(4) # Polite delay between keyword checks
print(f" Next check in {interval}s...")
time.sleep(interval)
# Start the monitor (run in background or as a service)
# run_trend_monitor(keywords=["bitcoin", "ethereum", "solana"])
Batch Keyword Comparison
Compare large keyword sets by batching into groups of 5 (the API maximum):
def batch_compare(keywords, anchor=None, timeframe="today 3-m", geo="US", delay=3.0):
"""
Compare interest for more than 5 keywords by using an anchor term.
When comparing across batches, include a common anchor keyword in each batch.
This allows you to normalize scores across batches using the anchor's consistent value.
Args:
keywords: Any number of keywords to compare
anchor: Reference keyword included in every batch for normalization
timeframe: Time range string
geo: Country code
delay: Seconds between batch requests
Returns:
Dict mapping keyword to average interest score
"""
if not anchor:
anchor = keywords[0]
# Split into batches of 4 (leaving one slot for anchor)
batch_size = 4
others = [kw for kw in keywords if kw != anchor]
batches = [others[i:i+batch_size] for i in range(0, len(others), batch_size)]
results = {}
anchor_values = None
for i, batch in enumerate(batches):
batch_keywords = [anchor] + batch
print(f"Batch {i+1}/{len(batches)}: {batch_keywords}")
try:
pytrends.build_payload(batch_keywords, timeframe=timeframe, geo=geo)
df = pytrends.interest_over_time()
if df.empty:
print(f" Empty response for batch {i+1}")
time.sleep(delay)
continue
# Store anchor values from first batch as reference
if anchor_values is None and anchor in df.columns:
anchor_values = df[anchor].mean()
# Normalize each keyword's score relative to anchor
anchor_in_batch = df[anchor].mean() if anchor in df.columns and anchor_values else 1
for kw in batch:
if kw in df.columns:
raw_avg = df[kw].mean()
# Normalize: scale so the anchor has a consistent value
if anchor_in_batch > 0 and anchor_values:
normalized = raw_avg * (anchor_values / anchor_in_batch)
else:
normalized = raw_avg
results[kw] = round(normalized, 1)
except Exception as e:
print(f" Batch {i+1} error: {e}")
time.sleep(delay)
# Always include anchor
results[anchor] = round(anchor_values, 1) if anchor_values else 0
return dict(sorted(results.items(), key=lambda x: -x[1]))
# Compare 15 AI tools across one timeframe
ai_tools = [
"ChatGPT", "Claude", "Gemini", "Copilot",
"Midjourney", "Stable Diffusion", "DALL-E",
"Perplexity", "Cursor", "Windsurf",
"Llama", "Mistral", "Grok", "Sora", "Suno"
]
comparison = batch_compare(ai_tools, anchor="ChatGPT", timeframe="today 12-m", geo="US")
print("\nAI tool search interest comparison (normalized):")
for kw, score in comparison.items():
bar = "=" * int(score / 2)
print(f" {kw:20s}: {score:5.1f} {bar}")
SQLite Storage for Trend History
Build a historical database of keyword interest for trend analysis over time:
def init_trends_db(path="trends_history.db"):
"""Initialize SQLite database for trend data storage."""
conn = sqlite3.connect(path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS interest_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
geo TEXT NOT NULL DEFAULT '',
date_label TEXT NOT NULL,
value INTEGER,
is_partial INTEGER DEFAULT 0,
collected_at TEXT,
UNIQUE(keyword, geo, date_label)
);
CREATE TABLE IF NOT EXISTS related_queries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
geo TEXT NOT NULL DEFAULT '',
query_type TEXT NOT NULL,
query TEXT NOT NULL,
value TEXT,
collected_at TEXT,
UNIQUE(keyword, geo, query_type, query)
);
CREATE TABLE IF NOT EXISTS spike_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
value INTEGER,
baseline REAL,
ratio REAL,
detected_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_interest_keyword ON interest_data(keyword);
CREATE INDEX IF NOT EXISTS idx_interest_date ON interest_data(date_label);
CREATE INDEX IF NOT EXISTS idx_related_keyword ON related_queries(keyword);
""")
conn.commit()
return conn
def store_interest_data(conn, keyword, geo, df):
"""Store interest-over-time data to SQLite."""
if df.empty or keyword not in df.columns:
return 0
now = datetime.now(timezone.utc).isoformat()
rows = []
for idx, row in df.iterrows():
date_label = idx.strftime("%Y-%m-%d %H:%M") if hasattr(idx, "strftime") else str(idx)
rows.append((
keyword, geo, date_label,
int(row[keyword]),
int(row.get("isPartial", 0)),
now,
))
conn.executemany(
"""
INSERT OR REPLACE INTO interest_data
(keyword, geo, date_label, value, is_partial, collected_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
rows
)
conn.commit()
return len(rows)
def store_related_queries(conn, keyword, geo, related_data):
"""Store related query data to SQLite."""
now = datetime.now(timezone.utc).isoformat()
rows = []
for query_type in ["top", "rising"]:
df = related_data.get(query_type)
if df is None or df.empty:
continue
for _, row in df.iterrows():
query = row.get("query", "")
value = str(row.get("value", ""))
if query:
rows.append((keyword, geo, query_type, query, value, now))
if rows:
conn.executemany(
"""
INSERT OR REPLACE INTO related_queries
(keyword, geo, query_type, query, value, collected_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
rows
)
conn.commit()
return len(rows)
Real-World Use Cases
Use Case 1: Content Strategy Pipeline
Identify rising search topics before they become saturated:
def find_content_opportunities(seed_keywords, geo="US", min_rising_score=1000):
"""
Find content opportunities by analyzing rising related queries.
Looks for queries that are breaking out (>5000% growth = "Breakout")
or rising fast (>1000% growth), but haven't yet been written about much.
Returns:
List of opportunity dicts sorted by estimated potential
"""
opportunities = []
for seed in seed_keywords:
print(f"Analyzing: {seed}")
pytrends.build_payload([seed], timeframe="today 3-m", geo=geo)
time.sleep(1)
try:
related = pytrends.related_queries()
rising_df = related.get(seed, {}).get("rising")
if rising_df is None or rising_df.empty:
continue
for _, row in rising_df.iterrows():
query = row.get("query", "")
value = row.get("value", 0)
if not query:
continue
# "Breakout" means >5000% growth — these are the gold
is_breakout = value == 0 or str(value).lower() == "breakout"
numeric_value = 9999 if is_breakout else int(value or 0)
if is_breakout or numeric_value >= min_rising_score:
opportunities.append({
"seed": seed,
"query": query,
"growth": "Breakout" if is_breakout else f"+{numeric_value}%",
"numeric": numeric_value,
"geo": geo,
})
except Exception as e:
print(f" Error for {seed}: {e}")
time.sleep(3)
return sorted(opportunities, key=lambda x: -x["numeric"])
seeds = ["python web scraping", "data extraction", "web automation", "playwright python"]
opps = find_content_opportunities(seeds, geo="US")
print("\nContent opportunities by growth:")
for opp in opps[:15]:
print(f" [{opp['growth']:>12s}] {opp['query']} (via '{opp['seed']}')")
Use Case 2: Niche Validation Before Building
Before building a product or writing a series of articles, validate that search demand is growing (not declining):
def validate_niche(keyword, geo="US"):
"""
Validate whether a niche is growing, stable, or declining.
Uses 5-year trend data to identify the trajectory.
Returns:
Dict with trend assessment and key metrics
"""
pytrends.build_payload([keyword], timeframe="today 5-y", geo=geo)
df = pytrends.interest_over_time()
if df.empty or keyword not in df.columns:
return {"status": "no data"}
values = df[keyword].tolist()
n = len(values)
if n < 4:
return {"status": "insufficient data"}
# Simple linear regression for trend direction
x_mean = sum(range(n)) / n
y_mean = sum(values) / n
numerator = sum((i - x_mean) * (v - y_mean) for i, v in enumerate(values))
denominator = sum((i - x_mean) ** 2 for i in range(n))
slope = numerator / denominator if denominator != 0 else 0
# Assess peak vs current
peak = max(values)
current = values[-1]
peak_idx = values.index(peak)
peak_fraction = peak_idx / n # 0 = early, 1 = recent
assessment = {
"keyword": keyword,
"geo": geo,
"current_score": current,
"peak_score": peak,
"current_vs_peak_pct": round(100 * current / peak, 1) if peak > 0 else 0,
"slope_per_month": round(slope, 2),
"trend": "growing" if slope > 0.3 else ("declining" if slope < -0.3 else "stable"),
"peak_timing": "recent" if peak_fraction > 0.7 else ("middle" if peak_fraction > 0.3 else "old"),
"data_points": n,
}
print(f"\n=== Niche validation: {keyword} ({geo}) ===")
print(f" Current score: {current}/100")
print(f" Peak: {peak}/100 at {peak_fraction:.0%} through the period")
print(f" Trend: {assessment['trend']} (slope: {slope:.2f}/month)")
print(f" Assessment: {assessment['trend'].upper()} {'(peak was recent - good!)' if peak_fraction > 0.7 and slope > 0 else ''}")
return assessment
# Validate several niches before investing content effort
niches_to_check = ["playwright python", "uv python", "datastar framework", "htmx"]
for niche in niches_to_check:
result = validate_niche(niche, geo="US")
time.sleep(3)
Use Case 3: Crypto/Trading Signal Detection
Google Trends data has been correlated with crypto price movements in academic literature. Rising interest in a coin name often precedes price increases by 24-48 hours:
def build_crypto_signal(coins, geo="US"):
"""
Build a weekly interest snapshot for crypto monitoring.
Combines short-term momentum (1 week) with medium-term trend (3 months).
Returns:
List of dicts with momentum scores
"""
signals = []
for i in range(0, len(coins), 5):
batch = coins[i:i+5]
# Short-term momentum
pytrends.build_payload(batch, timeframe="now 7-d", geo=geo)
df_short = pytrends.interest_over_time()
time.sleep(2)
# Medium-term baseline
pytrends.build_payload(batch, timeframe="today 3-m", geo=geo)
df_medium = pytrends.interest_over_time()
time.sleep(2)
for coin in batch:
if coin not in df_short.columns or coin not in df_medium.columns:
continue
recent_avg = df_short[coin].tail(24).mean() # Last 24 hours
medium_avg = df_medium[coin].mean()
momentum = round(recent_avg / medium_avg, 2) if medium_avg > 0 else 0
signals.append({
"coin": coin,
"recent_interest": round(recent_avg, 1),
"baseline_interest": round(medium_avg, 1),
"momentum_ratio": momentum,
"signal": "bullish" if momentum > 1.3 else ("bearish" if momentum < 0.7 else "neutral"),
})
time.sleep(3)
return sorted(signals, key=lambda x: -x["momentum_ratio"])
crypto_signals = build_crypto_signal(
["bitcoin", "ethereum", "solana", "cardano", "polkadot"],
geo="US"
)
print("\nCrypto interest momentum:")
for s in crypto_signals:
print(f" {s['coin']:12s}: {s['momentum_ratio']:.2f}x ({s['signal']})")
Exporting Results
def export_trends_csv(df, keyword, output_dir="trends_exports"):
"""Export interest-over-time DataFrame to CSV."""
Path(output_dir).mkdir(exist_ok=True)
safe_name = keyword.replace(" ", "_").replace("/", "_")
output_path = Path(output_dir) / f"{safe_name}_{datetime.now().strftime('%Y%m%d')}.csv"
df_export = df.copy()
if "isPartial" in df_export.columns:
df_export = df_export.drop(columns=["isPartial"])
df_export.to_csv(output_path)
print(f"Saved {len(df_export)} rows to {output_path}")
return output_path
def export_comparison_json(results_dict, output_path="trends_comparison.json"):
"""Export a keyword comparison dict to JSON."""
with open(output_path, "w") as f:
json.dump({
"exported_at": datetime.now(timezone.utc).isoformat(),
"data": results_dict,
}, f, indent=2)
print(f"Saved comparison to {output_path}")
Wrapping Up
Google Trends is still one of the more underused data sources for market research and content work. The data is free and the signal quality is solid. The engineering overhead — the NID cookie requirement, aggressive rate limiting, library instability — is real but manageable once you understand the system.
The practical workflow:
- Start with pytrends for quick experiments and simple keyword comparisons
- Switch to direct API calls when you need reliability or pytrends breaks
- Add residential proxies (ThorData) once you're running anything at scale or hitting rate limits regularly
- Build a SQLite history to track trends over time — the slope matters more than the current value
- Use the rising queries endpoint for content opportunities — breakout queries are gold
The monitoring loop pattern is the most valuable piece here: a lightweight process that checks your keyword list hourly, stores the data, and flags spikes. Run it on a small VPS and you have a 24/7 trend intelligence feed that costs almost nothing.