← Back to blog

How to Scrape Instagram Hashtags in 2026: Posts, Trends & Engagement Data

How to Scrape Instagram Hashtags in 2026: Posts, Trends & Engagement Data

Instagram hashtag data is valuable for trend tracking, competitive analysis, influencer research, and content strategy. Knowing which hashtags drive the most engagement, what content is performing in a niche, and how hashtag popularity shifts over time requires programmatic access.

Instagram does not offer a public hashtag API. The official Graph API's hashtag endpoints are buried behind App Review gates. What actually works in 2026 is a combination of the private mobile API (with session authentication) and some careful scraping of the public web interface.

What Hashtag Data Is Available

When you query a hashtag on Instagram, you can access:

What you cannot get without authentication: any hashtag data at all. Instagram blocks unauthenticated requests to hashtag endpoints entirely.

The Mobile API Approach

Instagram's mobile app uses a private REST API. The hashtag feed endpoints from this API are the most reliable source of hashtag data in 2026.

You need a valid sessionid cookie from a logged-in Instagram account. Get it by logging into Instagram in a browser, opening DevTools, and finding the sessionid cookie under Application > Cookies.

Step 1: Look Up the Hashtag ID

Instagram's hashtag API uses numeric IDs, not names. First resolve the name to an ID:

import requests
import json
import time
import random

SESSION_ID = 'your-session-id-here'
APP_ID = '936619743392459'  # public Instagram web app ID

MOBILE_HEADERS = {
    'User-Agent': 'Instagram 317.0.0.34.109 Android (30/11; 420dpi; 1080x2220; samsung; SM-G991B; o1s; exynos2100)',
    'X-IG-App-ID': APP_ID,
    'Accept': 'application/json',
    'Accept-Language': 'en-US,en;q=0.9',
    'Accept-Encoding': 'gzip, deflate',
}

WEB_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36',
    'X-IG-App-ID': APP_ID,
    'Accept': '*/*',
    'X-Requested-With': 'XMLHttpRequest',
}

COOKIES = {'sessionid': SESSION_ID}

def get_hashtag_id(tag_name: str) -> str | None:
    url = 'https://i.instagram.com/api/v1/tags/web_info/'
    params = {'tag_name': tag_name}
    resp = requests.get(url, params=params, headers=MOBILE_HEADERS, cookies=COOKIES, timeout=15)
    if resp.status_code == 200:
        data = resp.json()
        tag_data = data.get('data', {}).get('hashtag', {})
        return tag_data.get('id')
    print(f'  Failed to get hashtag ID: {resp.status_code}')
    return None

# Example
tag_id = get_hashtag_id('python')
print(f'Hashtag ID for #python: {tag_id}')

Step 2: Fetch Top Posts for a Hashtag

def fetch_hashtag_top_posts(tag_id: str, max_id: str = None) -> dict:
    url = f'https://i.instagram.com/api/v1/feed/tag/{tag_id}/'
    params = {'rank_token': 'top', 'ranked_content': 'true'}
    if max_id:
        params['max_id'] = max_id
    resp = requests.get(url, params=params, headers=MOBILE_HEADERS, cookies=COOKIES, timeout=15)
    resp.raise_for_status()
    return resp.json()

def extract_post_data(item: dict) -> dict:
    media_type = item.get('media_type', 0)
    type_map = {1: 'photo', 2: 'video', 8: 'carousel'}
    caption_obj = item.get('caption') or {}

    # Get best image URL
    image_url = None
    if item.get('image_versions2'):
        candidates = item['image_versions2'].get('candidates', [])
        if candidates:
            image_url = candidates[0].get('url')  # first = highest res

    # Get video URL if it is a video post
    video_url = None
    if media_type == 2:
        versions = item.get('video_versions', [])
        if versions:
            video_url = versions[0].get('url')

    # Get location if tagged
    location = None
    if item.get('location'):
        location = {
            'name': item['location'].get('name'),
            'lat': item['location'].get('lat'),
            'lng': item['location'].get('lng'),
        }

    return {
        'id': item.get('pk'),
        'shortcode': item.get('code'),
        'media_type': type_map.get(media_type, 'unknown'),
        'like_count': item.get('like_count', 0),
        'comment_count': item.get('comment_count', 0),
        'view_count': item.get('view_count'),  # only for videos
        'taken_at': item.get('taken_at'),
        'caption': caption_obj.get('text', ''),
        'author_id': item.get('user', {}).get('pk'),
        'author_username': item.get('user', {}).get('username'),
        'image_url': image_url,
        'video_url': video_url,
        'location': location,
        'post_url': f"https://www.instagram.com/p/{item.get('code')}/" if item.get('code') else None,
    }

def scrape_hashtag_posts(tag_name: str, max_pages: int = 5) -> list:
    tag_id = get_hashtag_id(tag_name)
    if not tag_id:
        print(f'Could not resolve hashtag: {tag_name}')
        return []

    all_posts = []
    max_id = None

    for page_num in range(max_pages):
        try:
            data = fetch_hashtag_top_posts(tag_id, max_id)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                print('  Rate limited — waiting 60s')
                time.sleep(60)
                continue
            print(f'  HTTP error: {e}')
            break

        items = data.get('items', [])
        if not items:
            break

        for item in items:
            all_posts.append(extract_post_data(item))

        print(f'  Page {page_num+1}: {len(items)} posts (total: {len(all_posts)})')

        if not data.get('more_available'):
            break
        max_id = data.get('next_max_id')
        time.sleep(random.uniform(2, 5))

    return all_posts

Step 3: Fetch Recent Posts Feed

def fetch_hashtag_recent_posts(tag_name: str, max_pages: int = 3) -> list:
    # Recent posts use a GraphQL-style endpoint on the web API
    base_url = 'https://www.instagram.com/api/v1/tags/logged_out_web_info/'
    params = {'tag_name': tag_name}
    resp = requests.get(base_url, params=params, headers=WEB_HEADERS, cookies=COOKIES, timeout=15)
    resp.raise_for_status()
    data = resp.json()
    tag_data = data.get('data', {}).get('hashtag', {})

    recent_media = tag_data.get('edge_hashtag_to_media', {})
    posts = []

    for edge in recent_media.get('edges', []):
        node = edge.get('node', {})
        posts.append({
            'id': node.get('id'),
            'shortcode': node.get('shortcode'),
            'media_type': 'video' if node.get('is_video') else 'photo',
            'like_count': node.get('edge_liked_by', {}).get('count', 0),
            'comment_count': node.get('edge_media_to_comment', {}).get('count', 0),
            'caption': node.get('edge_media_to_caption', {}).get('edges', [{}])[0].get('node', {}).get('text', ''),
            'thumbnail_url': node.get('thumbnail_src'),
            'taken_at_timestamp': node.get('taken_at_timestamp'),
            'post_url': f"https://www.instagram.com/p/{node.get('shortcode')}/",
        })

    return posts

Instagram surfaces related hashtags in the explore interface. You can extract these to build hashtag networks or discover niche tags:

def get_related_hashtags(tag_name: str) -> list[str]:
    url = 'https://www.instagram.com/api/v1/tags/suggested/'
    params = {'query': tag_name}
    resp = requests.get(url, params=params, headers=WEB_HEADERS, cookies=COOKIES, timeout=15)
    if resp.status_code != 200:
        return []
    data = resp.json()
    return [item.get('name') for item in data if item.get('name')]

def build_hashtag_network(seed_tags: list[str], depth: int = 2) -> dict:
    network = {}
    queue = list(seed_tags)
    visited = set()
    current_depth = 0

    while queue and current_depth < depth:
        next_queue = []
        for tag in queue:
            if tag in visited:
                continue
            visited.add(tag)
            related = get_related_hashtags(tag)
            network[tag] = related
            next_queue.extend(related[:5])  # limit expansion
            print(f'  #{tag} -> {len(related)} related tags')
            time.sleep(random.uniform(1.5, 3))
        queue = next_queue
        current_depth += 1

    return network

# Build a hashtag network starting from a few seed tags
network = build_hashtag_network(['python', 'machinelearning', 'datascience'], depth=1)
for tag, related in list(network.items())[:3]:
    print(f'#{tag}: {related[:5]}')

Tracking Hashtag Post Counts Over Time

To track hashtag growth, poll the post count periodically:

import json
from pathlib import Path
from datetime import datetime

TRACKER_FILE = Path('hashtag_tracker.json')

def get_hashtag_info(tag_name: str) -> dict:
    url = 'https://i.instagram.com/api/v1/tags/web_info/'
    params = {'tag_name': tag_name}
    resp = requests.get(url, params=params, headers=MOBILE_HEADERS, cookies=COOKIES, timeout=15)
    if resp.status_code != 200:
        return {}
    data = resp.json()
    tag = data.get('data', {}).get('hashtag', {})
    media_count = tag.get('media_count', 0)
    return {
        'name': tag_name,
        'id': tag.get('id'),
        'media_count': media_count,
        'timestamp': datetime.now().isoformat(),
    }

def track_hashtags(tags: list[str]) -> None:
    if TRACKER_FILE.exists():
        tracker_data = json.loads(TRACKER_FILE.read_text())
    else:
        tracker_data = {}

    for tag in tags:
        info = get_hashtag_info(tag)
        if not info:
            continue
        if tag not in tracker_data:
            tracker_data[tag] = []
        tracker_data[tag].append({
            'media_count': info['media_count'],
            'timestamp': info['timestamp'],
        })
        count = info['media_count']
        print(f'  #{tag}: {count:,} posts')
        time.sleep(random.uniform(1, 2))

    TRACKER_FILE.write_text(json.dumps(tracker_data, indent=2))
    print(f'Tracker updated: {TRACKER_FILE}')

track_hashtags(['python', 'webdev', 'reactjs', 'nodejs', 'golang'])

Engagement Analysis

Once you have post data, you can analyze engagement patterns:

import statistics

def analyze_hashtag_engagement(posts: list[dict]) -> dict:
    if not posts:
        return {}

    likes = [p.get('like_count', 0) for p in posts]
    comments = [p.get('comment_count', 0) for p in posts]
    views = [p.get('view_count', 0) for p in posts if p.get('view_count')]

    by_type = {}
    for post in posts:
        t = post.get('media_type', 'unknown')
        if t not in by_type:
            by_type[t] = []
        by_type[t].append(post)

    stats = {
        'total_posts': len(posts),
        'avg_likes': statistics.mean(likes) if likes else 0,
        'median_likes': statistics.median(likes) if likes else 0,
        'max_likes': max(likes) if likes else 0,
        'avg_comments': statistics.mean(comments) if comments else 0,
        'avg_views': statistics.mean(views) if views else 0,
        'by_media_type': {
            t: {
                'count': len(items),
                'avg_likes': statistics.mean([p['like_count'] for p in items]) if items else 0,
            }
            for t, items in by_type.items()
        },
    }

    # Find top posts
    sorted_posts = sorted(posts, key=lambda p: p.get('like_count', 0), reverse=True)
    stats['top_3_posts'] = sorted_posts[:3]

    return stats

# Analyze posts for a hashtag
posts = scrape_hashtag_posts('python', max_pages=3)
stats = analyze_hashtag_engagement(posts)
print(f'Total posts: {stats["total_posts"]}')
print(f'Avg likes: {stats["avg_likes"]:.0f}')
print(f'Median likes: {stats["median_likes"]:.0f}')
print(f'Avg comments: {stats["avg_comments"]:.0f}')
print(f'Media type breakdown: {stats["by_media_type"]}')

Anti-Detection and Rate Limits

Instagram's rate limiting on the private API is strict. From a single session and IP:

Mitigation Strategies

Randomize delays: Always sleep between requests with jitter. A fixed 2-second interval is a bot signal. Use random.uniform(1.5, 4.0) for natural patterns.

Rotate session cookies: Maintain a pool of session IDs from multiple accounts. Distribute requests across sessions to stay under per-account limits.

Use residential proxies: Instagram fingerprints IP characteristics. Datacenter IPs are flagged instantly. ThorData's residential proxy pool provides real residential IPs that match the traffic patterns Instagram's systems expect.

# Proxy-aware session factory
# https://thordata.partnerstack.com/partner/0a0x4nzb (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
PROXY_URL = 'http://USER:[email protected]:9000'

def make_session(session_id: str, proxy_url: str = None) -> requests.Session:
    s = requests.Session()
    s.headers.update(MOBILE_HEADERS)
    s.cookies.set('sessionid', session_id, domain='.instagram.com')
    if proxy_url:
        s.proxies = {'http': proxy_url, 'https': proxy_url}
    return s

# Rotate across multiple sessions
SESSION_POOL = [
    'session_id_account_1',
    'session_id_account_2',
    'session_id_account_3',
]

def get_rotating_session() -> requests.Session:
    session_id = random.choice(SESSION_POOL)
    return make_session(session_id, PROXY_URL)

CDN URL Expiry Warning

Instagram's CDN image and video URLs expire, typically within 24-48 hours. The URLs contain a timestamp parameter (oe=HEXVALUE) that specifies the expiry.

If you need to store media, download it immediately at collection time. Do not store the CDN URL and expect it to work later.

import httpx
from pathlib import Path

def download_post_media(post: dict, output_dir: str = 'instagram_media') -> None:
    out = Path(output_dir)
    out.mkdir(exist_ok=True)

    post_id = post.get('id', 'unknown')
    img_url = post.get('image_url')
    video_url = post.get('video_url')

    if img_url:
        try:
            resp = httpx.get(img_url, timeout=30)
            resp.raise_for_status()
            img_file = out / f'{post_id}.jpg'
            img_file.write_bytes(resp.content)
        except Exception as e:
            print(f'  Failed to download image for {post_id}: {e}')

    if video_url:
        try:
            resp = httpx.get(video_url, timeout=60)
            resp.raise_for_status()
            vid_file = out / f'{post_id}.mp4'
            vid_file.write_bytes(resp.content)
        except Exception as e:
            print(f'  Failed to download video for {post_id}: {e}')

Saving Hashtag Data

import json
from pathlib import Path
from datetime import datetime

def save_hashtag_data(tag_name: str, posts: list, stats: dict) -> None:
    out_dir = Path('instagram_hashtags')
    out_dir.mkdir(exist_ok=True)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M')

    output = {
        'hashtag': tag_name,
        'scraped_at': datetime.now().isoformat(),
        'stats': stats,
        'posts': posts,
    }

    out_file = out_dir / f'{tag_name}_{timestamp}.json'
    out_file.write_text(json.dumps(output, indent=2, ensure_ascii=False))
    print(f'Saved {len(posts)} posts for #{tag_name} to {out_file}')

Using the Instagram private API violates Meta's Terms of Service. Meta has pursued legal action against companies scraping at commercial scale. For individual research, competitive analysis, or personal projects, enforcement risk is low — but account bans are common if you push volume limits.

Public hashtag data sits in a legally safer zone since courts have generally held that scraping publicly accessible data is not a CFAA violation. The risk increases significantly when you are accessing behind login walls, which the private API approach requires.

Keep volumes low (under 100 posts per hashtag per day per account), rotate sessions, use residential proxies, and randomize timing. This combination keeps detection risk minimal while still enabling useful data collection.

Complete Workflow

#!/usr/bin/env python3
"""
Instagram hashtag data collector.
Collects top posts, engagement stats, and related hashtags.
"""

import json, time, random, requests
from pathlib import Path
from datetime import datetime

# Configuration
SESSION_ID = 'your-session-id'
OUTPUT_DIR = Path('instagram_data')
OUTPUT_DIR.mkdir(exist_ok=True)

def run_hashtag_collection(tag_name: str, pages: int = 3) -> None:
    print(f'Collecting #{tag_name}...')

    # Step 1: Get hashtag ID
    tag_id = get_hashtag_id(tag_name)
    if not tag_id:
        print(f'  Could not resolve #{tag_name}')
        return
    print(f'  ID: {tag_id}')

    # Step 2: Collect posts
    posts = scrape_hashtag_posts(tag_name, max_pages=pages)
    print(f'  Collected {len(posts)} posts')

    # Step 3: Analyze
    stats = analyze_hashtag_engagement(posts)
    print(f'  Avg likes: {stats["avg_likes"]:.0f}')
    print(f'  Avg comments: {stats["avg_comments"]:.0f}')

    # Step 4: Get related hashtags
    related = get_related_hashtags(tag_name)
    print(f'  Related tags: {related[:5]}')

    # Step 5: Save
    output = {
        'hashtag': tag_name,
        'id': tag_id,
        'scraped_at': datetime.now().isoformat(),
        'related_hashtags': related,
        'engagement_stats': stats,
        'posts': posts,
    }
    out_file = OUTPUT_DIR / f'{tag_name}.json'
    out_file.write_text(json.dumps(output, indent=2, ensure_ascii=False))
    print(f'  Saved to {out_file}')

tags_to_collect = ['python', 'webdev', 'machinelearning']
for tag in tags_to_collect:
    run_hashtag_collection(tag)
    time.sleep(random.uniform(5, 10))

Summary

Instagram hashtag scraping in 2026 requires session authentication via the private mobile API. The core flow is: resolve the hashtag name to a numeric ID, then fetch the top-posts feed with pagination. Engagement data — likes, comments, view counts, and media metadata — comes back as clean JSON.

Rate limits are strict: 100-200 requests per day per session. Rotate session cookies across multiple accounts and use residential proxy rotation from ThorData to distribute the load. Download media immediately since CDN URLs expire within 24-48 hours. Keep volumes moderate and delays randomized to avoid session bans.

Competitor Hashtag Analysis

One of the most valuable applications is mapping a competitor's hashtag strategy — which tags they use, which drive the most engagement, and what content themes resonate with their audience:

import re
from collections import Counter

def extract_hashtags_from_caption(caption: str) -> list[str]:
    if not caption:
        return []
    return [tag.lower() for tag in re.findall(r'#(\w+)', caption)]


def analyze_competitor_hashtags(
    competitor_posts: list[dict],
    top_n: int = 30,
) -> dict:
    all_hashtags = []
    hashtag_engagement = {}

    for post in competitor_posts:
        caption = post.get("caption", "")
        hashtags = extract_hashtags_from_caption(caption)
        likes = post.get("like_count", 0)
        comments = post.get("comment_count", 0)
        engagement = likes + comments

        for tag in hashtags:
            all_hashtags.append(tag)
            if tag not in hashtag_engagement:
                hashtag_engagement[tag] = {"total_engagement": 0, "post_count": 0}
            hashtag_engagement[tag]["total_engagement"] += engagement
            hashtag_engagement[tag]["post_count"] += 1

    # Frequency-based ranking
    frequency = Counter(all_hashtags)

    # Engagement-based ranking
    for tag in hashtag_engagement:
        count = hashtag_engagement[tag]["post_count"]
        hashtag_engagement[tag]["avg_engagement"] = (
            hashtag_engagement[tag]["total_engagement"] / count if count > 0 else 0
        )

    top_by_frequency = frequency.most_common(top_n)
    top_by_engagement = sorted(
        hashtag_engagement.items(),
        key=lambda x: x[1]["avg_engagement"],
        reverse=True,
    )[:top_n]

    return {
        "total_posts_analyzed": len(competitor_posts),
        "unique_hashtags": len(frequency),
        "avg_hashtags_per_post": len(all_hashtags) / len(competitor_posts) if competitor_posts else 0,
        "top_by_frequency": [{"tag": tag, "count": count} for tag, count in top_by_frequency],
        "top_by_engagement": [
            {
                "tag": tag,
                "avg_engagement": data["avg_engagement"],
                "post_count": data["post_count"],
            }
            for tag, data in top_by_engagement
        ],
    }

Hashtag Post Trend Detection

Track how hashtag post volumes change over time to detect emerging trends before they peak:

import json
import time
import random
import requests
from pathlib import Path
from datetime import datetime, timedelta
from collections import defaultdict

TREND_DB = Path("hashtag_trends.json")


def load_trend_data() -> dict:
    if TREND_DB.exists():
        return json.loads(TREND_DB.read_text())
    return {}


def save_trend_data(data: dict) -> None:
    TREND_DB.write_text(json.dumps(data, indent=2))


def record_hashtag_snapshot(
    tag_names: list[str],
    session_id: str,
) -> None:
    trend_data = load_trend_data()
    now = datetime.now().isoformat()

    headers = {
        "User-Agent": "Instagram 317.0.0.34.109 Android (30/11; 420dpi; 1080x2220; samsung; SM-G991B; o1s; exynos2100)",
        "X-IG-App-ID": "936619743392459",
    }
    cookies = {"sessionid": session_id}

    for tag_name in tag_names:
        resp = requests.get(
            "https://i.instagram.com/api/v1/tags/web_info/",
            params={"tag_name": tag_name},
            headers=headers,
            cookies=cookies,
            timeout=15,
        )
        if resp.status_code != 200:
            print(f"  Failed to fetch #{tag_name}: {resp.status_code}")
            time.sleep(2)
            continue

        data = resp.json().get("data", {}).get("hashtag", {})
        media_count = data.get("media_count", 0)

        if tag_name not in trend_data:
            trend_data[tag_name] = []

        trend_data[tag_name].append({
            "timestamp": now,
            "media_count": media_count,
        })

        print(f"  #{tag_name}: {media_count:,} posts")
        time.sleep(random.uniform(1, 2))

    save_trend_data(trend_data)
    print(f"Snapshot recorded for {len(tag_names)} hashtags")


def calculate_growth_rates(tag_name: str, days: int = 7) -> dict | None:
    trend_data = load_trend_data()
    snapshots = trend_data.get(tag_name, [])
    if len(snapshots) < 2:
        return None

    # Sort by timestamp
    sorted_snaps = sorted(snapshots, key=lambda x: x["timestamp"])

    # Filter to desired time window
    cutoff = (datetime.now() - timedelta(days=days)).isoformat()
    window = [s for s in sorted_snaps if s["timestamp"] >= cutoff]
    if len(window) < 2:
        window = sorted_snaps[-2:]

    first = window[0]
    last = window[-1]

    posts_added = last["media_count"] - first["media_count"]
    time_diff_days = (
        datetime.fromisoformat(last["timestamp"]) - datetime.fromisoformat(first["timestamp"])
    ).total_seconds() / 86400

    daily_growth = posts_added / time_diff_days if time_diff_days > 0 else 0
    growth_rate_pct = (posts_added / first["media_count"] * 100) if first["media_count"] > 0 else 0

    return {
        "tag": tag_name,
        "current_count": last["media_count"],
        "posts_added": posts_added,
        "time_days": round(time_diff_days, 1),
        "daily_growth": round(daily_growth, 0),
        "growth_rate_pct": round(growth_rate_pct, 3),
    }

Multi-Account Session Management

For sustained hashtag monitoring across many tags, rotate session cookies to stay under rate limits:

import random
import time
import requests
from typing import Optional

# Session pool — collect these from browser cookies
SESSION_POOL = [
    "session_id_account_1",
    "session_id_account_2",
    "session_id_account_3",
]

# Request counter per session to track usage
session_request_counts = {s: 0 for s in SESSION_POOL}
MAX_REQUESTS_PER_SESSION = 80  # stay well under daily limit


def get_least_used_session() -> str:
    return min(session_request_counts, key=session_request_counts.get)


class SessionRotator:
    def __init__(self, sessions: list[str], proxy_url: str = None):
        self.sessions = sessions
        self.proxy_url = proxy_url
        self.counts = {s: 0 for s in sessions}
        self.blocked = set()

    def get_session(self) -> Optional[str]:
        available = [
            s for s in self.sessions
            if s not in self.blocked and self.counts.get(s, 0) < MAX_REQUESTS_PER_SESSION
        ]
        if not available:
            return None
        return min(available, key=lambda s: self.counts.get(s, 0))

    def mark_used(self, session_id: str) -> None:
        self.counts[session_id] = self.counts.get(session_id, 0) + 1

    def mark_blocked(self, session_id: str) -> None:
        self.blocked.add(session_id)
        print(f"  Session marked blocked — {len(self.blocked)}/{len(self.sessions)} unavailable")

    def make_request(self, url: str, params: dict = None) -> Optional[dict]:
        session_id = self.get_session()
        if not session_id:
            print("  All sessions exhausted or blocked")
            return None

        proxies = None
        if self.proxy_url:
            proxies = {"http": self.proxy_url, "https": self.proxy_url}

        headers = {
            "User-Agent": "Instagram 317.0.0.34.109 Android (30/11; 420dpi; 1080x2220; samsung; SM-G991B; o1s; exynos2100)",
            "X-IG-App-ID": "936619743392459",
        }
        cookies = {"sessionid": session_id}

        try:
            resp = requests.get(url, params=params, headers=headers, cookies=cookies,
                                proxies=proxies, timeout=15)
            if resp.status_code == 429:
                print(f"  Rate limit on session — rotating")
                self.mark_blocked(session_id)
                return None
            if resp.status_code in (401, 403):
                print(f"  Session invalid — removing")
                self.mark_blocked(session_id)
                return None
            resp.raise_for_status()
            self.mark_used(session_id)
            return resp.json()
        except Exception as e:
            print(f"  Request failed: {e}")
            return None


# Usage
# rotator = SessionRotator(SESSION_POOL, proxy_url="http://USER:[email protected]:9000")
# data = rotator.make_request("https://i.instagram.com/api/v1/tags/web_info/", params={"tag_name": "python"})

Batch Hashtag Data Collection

Collect data across many hashtags efficiently with rate-aware batching:

import json
import time
import random
from pathlib import Path
from datetime import datetime


def collect_hashtag_batch(
    tag_names: list[str],
    output_dir: str = "hashtag_data",
    session_id: str = None,
    proxy_url: str = None,
    delay_range: tuple = (2.0, 5.0),
) -> list[dict]:
    from pathlib import Path
    out = Path(output_dir)
    out.mkdir(exist_ok=True)

    collected = []
    failed = []

    for i, tag_name in enumerate(tag_names):
        print(f"Processing #{tag_name} ({i+1}/{len(tag_names)})")

        # Get hashtag info
        tag_id = get_hashtag_id(tag_name)
        if not tag_id:
            failed.append(tag_name)
            continue

        # Get posts
        posts = scrape_hashtag_posts(tag_name, max_pages=2)
        if not posts:
            print(f"  No posts found for #{tag_name}")
            failed.append(tag_name)
            continue

        # Get related tags
        related = get_related_hashtags(tag_name)

        # Calculate stats
        stats = analyze_hashtag_engagement(posts)

        result = {
            "tag": tag_name,
            "id": tag_id,
            "scraped_at": datetime.now().isoformat(),
            "post_count": len(posts),
            "related_tags": related[:10],
            "engagement": {
                "avg_likes": stats.get("avg_likes", 0),
                "median_likes": stats.get("median_likes", 0),
                "avg_comments": stats.get("avg_comments", 0),
            },
            "posts": posts,
        }

        collected.append(result)

        # Save individual file
        tag_file = out / f"{tag_name}.json"
        tag_file.write_text(json.dumps(result, indent=2, ensure_ascii=False))

        delay = random.uniform(*delay_range)
        time.sleep(delay)

    # Save summary
    summary_file = out / f"summary_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
    summary_file.write_text(json.dumps({
        "collected": len(collected),
        "failed": failed,
        "tags": [{
            "tag": r["tag"],
            "post_count_scraped": r["post_count"],
            "avg_likes": r["engagement"]["avg_likes"],
            "avg_comments": r["engagement"]["avg_comments"],
            "related_count": len(r["related_tags"]),
        } for r in collected],
    }, indent=2))

    print(f"\nBatch complete: {len(collected)} collected, {len(failed)} failed")
    if failed:
        print(f"Failed tags: {failed}")

    return collected

Hashtag Research Workflow: From Niche to Network

The most powerful use of Instagram hashtag scraping is building a complete picture of a niche's hashtag ecosystem:

def research_niche(
    seed_tag: str,
    session_id: str,
    depth: int = 2,
    max_tags_per_level: int = 10,
) -> dict:
    """
    Start from a seed hashtag and discover the full network
    of related tags, their engagement, and trending content.
    """
    network = {}
    queue = [seed_tag]
    visited = set()
    level = 0

    while queue and level <= depth:
        next_level = []
        for tag in queue[:max_tags_per_level]:
            if tag in visited:
                continue
            visited.add(tag)

            print(f"Level {level}: #{tag}")

            # Get basic info
            tag_id = get_hashtag_id(tag)
            if not tag_id:
                continue

            # Get a small sample of posts
            posts = scrape_hashtag_posts(tag, max_pages=1)
            stats = analyze_hashtag_engagement(posts) if posts else {}

            # Get related tags (these form the next level)
            related = get_related_hashtags(tag)
            next_level.extend(related[:5])

            network[tag] = {
                "id": tag_id,
                "level": level,
                "posts_sampled": len(posts),
                "avg_likes": stats.get("avg_likes", 0),
                "related_tags": related[:10],
            }
            time.sleep(random.uniform(1.5, 3))

        queue = [t for t in next_level if t not in visited]
        level += 1

    # Identify hub tags (most connected)
    connection_counts = Counter()
    for tag, data in network.items():
        for related in data.get("related_tags", []):
            if related in network:
                connection_counts[related] += 1

    # Add connection count to network data
    for tag in network:
        network[tag]["connection_count"] = connection_counts.get(tag, 0)

    # Rank tags by engagement within the network
    ranked_by_engagement = sorted(
        network.items(),
        key=lambda x: x[1].get("avg_likes", 0),
        reverse=True,
    )

    print(f"\nNiche network analysis for #{seed_tag}:")
    print(f"  Total tags discovered: {len(network)}")
    print(f"\nTop tags by engagement:")
    for tag, data in ranked_by_engagement[:10]:
        print(f"  #{tag}: {data['avg_likes']:.0f} avg likes (level {data['level']})")

    return network

When to Use ThorData Residential Proxies

Instagram's detection specifically targets requests from IP ranges associated with data centers and cloud providers. The platform cross-references IP addresses against known hosting provider CIDR blocks and applies stricter rate limits to them.

For hashtag research at scale — monitoring dozens of tags, collecting thousands of posts, tracking trends over time — residential proxy rotation is necessary. ThorData provides residential IPs from real ISPs in the United States, Europe, and Asia-Pacific. Each request goes through a different household IP, making the traffic pattern indistinguishable from thousands of real users browsing Instagram.

The key advantage for Instagram specifically: residential IPs have natural usage patterns embedded in their history. Unlike fresh datacenter IPs, they have legitimate browsing history that Instagram's reputation scoring systems consider trustworthy.

# Configure ThorData with country targeting for Instagram
# https://thordata.partnerstack.com/partner/0a0x4nzb (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
PROXY_URL = "http://USER:[email protected]:9000"  # US residential IPs

def instagram_request_with_proxy(url: str, params: dict = None) -> dict | None:
    proxies = {"http": PROXY_URL, "https": PROXY_URL}
    headers = {
        "User-Agent": "Instagram 317.0.0.34.109 Android (30/11; 420dpi; 1080x2220; samsung; SM-G991B; o1s; exynos2100)",
        "X-IG-App-ID": "936619743392459",
    }
    cookies = {"sessionid": SESSION_ID}

    for attempt in range(3):
        try:
            resp = requests.get(url, params=params, headers=headers, cookies=cookies,
                                proxies=proxies, timeout=15)
            if resp.status_code == 429:
                wait = 30 * (2 ** attempt)
                print(f"  Rate limited — waiting {wait}s")
                time.sleep(wait)
                continue
            resp.raise_for_status()
            return resp.json()
        except Exception as e:
            print(f"  Attempt {attempt+1} failed: {e}")
            time.sleep(5)
    return None

Hashtag Performance Benchmarking by Category

Different hashtag categories have very different engagement norms. A 500-like average might be exceptional in a B2B niche but below-average in fitness. Build category benchmarks to contextualize data:

import json
import statistics
from pathlib import Path
from collections import defaultdict

CATEGORY_BENCHMARKS = {
    "fitness": {"avg_likes_per_1k_followers": 35, "top_threshold": 100},
    "tech": {"avg_likes_per_1k_followers": 15, "top_threshold": 50},
    "food": {"avg_likes_per_1k_followers": 45, "top_threshold": 150},
    "travel": {"avg_likes_per_1k_followers": 40, "top_threshold": 120},
    "fashion": {"avg_likes_per_1k_followers": 30, "top_threshold": 100},
    "business": {"avg_likes_per_1k_followers": 10, "top_threshold": 35},
}


def calculate_hashtag_performance_score(
    posts: list[dict],
    category: str = "tech",
) -> dict:
    if not posts:
        return {}

    benchmark = CATEGORY_BENCHMARKS.get(category, {"avg_likes_per_1k_followers": 25, "top_threshold": 80})

    likes = [p.get("like_count", 0) for p in posts]
    comments = [p.get("comment_count", 0) for p in posts]
    interactions = [l + c for l, c in zip(likes, comments)]

    avg_interactions = statistics.mean(interactions)
    median_interactions = statistics.median(interactions)
    top_threshold = benchmark["top_threshold"]

    high_performers = [p for p in posts if p.get("like_count", 0) >= top_threshold]
    high_performer_rate = len(high_performers) / len(posts) * 100

    return {
        "avg_interactions": round(avg_interactions, 1),
        "median_interactions": round(median_interactions, 1),
        "high_performer_rate": round(high_performer_rate, 1),
        "total_posts_sampled": len(posts),
        "category_benchmark": benchmark,
        "performance_vs_benchmark": round(avg_interactions / benchmark["avg_likes_per_1k_followers"], 2),
        "top_posts": sorted(posts, key=lambda p: p.get("like_count", 0), reverse=True)[:3],
    }


def compare_hashtags_in_category(
    tag_names: list[str],
    category: str,
    session_id: str,
) -> list[dict]:
    results = []
    for tag in tag_names:
        print(f"Analyzing #{tag}...")
        posts = scrape_hashtag_posts(tag, max_pages=2)
        if not posts:
            continue
        score = calculate_hashtag_performance_score(posts, category)
        results.append({
            "tag": tag,
            "category": category,
            **score,
        })
        import time, random
        time.sleep(random.uniform(3, 6))

    results.sort(key=lambda x: x.get("avg_interactions", 0), reverse=True)

    print(f"\nHashtag comparison for {category} category:")
    print(f"{'Tag':<25} {'Avg Inter':>10} {'High%':>7} {'vs Benchmark':>13}")
    print("-" * 58)
    for r in results:
        print(
            f"#{r['tag']:<24} {r['avg_interactions']:>10.0f} "
            f"{r['high_performer_rate']:>6.1f}% "
            f"{r['performance_vs_benchmark']:>13.2f}x"
        )

    return results

Building a Hashtag Content Calendar

Use scraped top-performing post data to identify optimal posting times and content types:

from datetime import datetime
from collections import defaultdict
import statistics


def analyze_posting_patterns(posts: list[dict]) -> dict:
    by_hour = defaultdict(list)
    by_weekday = defaultdict(list)
    by_media_type = defaultdict(list)

    for post in posts:
        taken_at = post.get("taken_at")
        like_count = post.get("like_count", 0)

        if taken_at:
            dt = datetime.fromtimestamp(taken_at)
            by_hour[dt.hour].append(like_count)
            by_weekday[dt.weekday()].append(like_count)

        media_type = post.get("media_type", "photo")
        by_media_type[media_type].append(like_count)

    weekday_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]

    # Find best posting times
    best_hours = sorted(
        [(hour, statistics.mean(likes)) for hour, likes in by_hour.items() if likes],
        key=lambda x: x[1],
        reverse=True,
    )

    best_days = sorted(
        [(day, statistics.mean(likes)) for day, likes in by_weekday.items() if likes],
        key=lambda x: x[1],
        reverse=True,
    )

    media_performance = {
        mt: {"avg_likes": statistics.mean(likes), "count": len(likes)}
        for mt, likes in by_media_type.items()
        if likes
    }

    return {
        "best_posting_hours": [{"hour": h, "avg_likes": round(avg, 0)} for h, avg in best_hours[:5]],
        "best_posting_days": [{"day": weekday_names[d], "avg_likes": round(avg, 0)} for d, avg in best_days[:3]],
        "media_type_performance": media_performance,
    }


def generate_posting_recommendations(analysis: dict) -> None:
    best_hours = analysis.get("best_posting_hours", [])
    best_days = analysis.get("best_posting_days", [])
    media = analysis.get("media_type_performance", {})

    print("\nContent Calendar Recommendations:")
    print()

    if best_hours:
        top_hours = [f"{h['hour']:02d}:00" for h in best_hours[:3]]
        print(f"Best posting times: {', '.join(top_hours)}")

    if best_days:
        top_days = [d["day"] for d in best_days[:3]]
        print(f"Best days: {', '.join(top_days)}")

    if media:
        best_media = max(media.items(), key=lambda x: x[1]["avg_likes"])
        print(f"Best performing media type: {best_media[0]} ({best_media[1]['avg_likes']:.0f} avg likes)")

    print()
    print("Optimal content calendar:")
    if best_days and best_hours:
        for day_data in best_days[:3]:
            day = day_data["day"]
            time_str = f"{best_hours[0]['hour']:02d}:00"
            print(f"  {day} at {time_str}")

Export and Integration with Marketing Tools

Export collected hashtag data in formats compatible with social media management tools:

import json
import csv
from pathlib import Path
from datetime import datetime


def export_hashtag_report(
    tag_name: str,
    posts: list[dict],
    stats: dict,
    patterns: dict,
    output_dir: str = "hashtag_reports",
) -> dict:
    out = Path(output_dir)
    out.mkdir(exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M")

    # JSON - full data
    json_file = out / f"{tag_name}_{timestamp}.json"
    json_file.write_text(json.dumps({
        "hashtag": tag_name,
        "generated_at": datetime.now().isoformat(),
        "stats": stats,
        "posting_patterns": patterns,
        "top_posts": sorted(posts, key=lambda p: p.get("like_count", 0), reverse=True)[:20],
    }, indent=2, ensure_ascii=False))

    # CSV - posts for spreadsheet analysis
    csv_file = out / f"{tag_name}_posts_{timestamp}.csv"
    fieldnames = ["id", "media_type", "like_count", "comment_count", "taken_at",
                  "author_username", "caption_preview", "post_url"]
    with open(csv_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for post in posts:
            caption = post.get("caption", "")
            writer.writerow({
                "id": post.get("id"),
                "media_type": post.get("media_type"),
                "like_count": post.get("like_count", 0),
                "comment_count": post.get("comment_count", 0),
                "taken_at": post.get("taken_at"),
                "author_username": post.get("author_username"),
                "caption_preview": caption[:100] if caption else "",
                "post_url": post.get("post_url"),
            })

    print(f"Reports saved for #{tag_name}:")
    print(f"  JSON: {json_file}")
    print(f"  CSV:  {csv_file}")
    return {"json": str(json_file), "csv": str(csv_file)}

Building a Hashtag Health Score

Not all hashtags are equally useful for content discovery or marketing. A "hashtag health score" combines post volume, engagement rate, and posting velocity into a single actionable metric:

from dataclasses import dataclass
from typing import Optional
import math

@dataclass
class HashtagMetrics:
    name: str
    post_count: int
    avg_likes: float
    avg_comments: float
    posts_per_day: float
    top_post_likes: int

def compute_health_score(m: HashtagMetrics) -> dict:
    """
    Score components:
    - Volume score: log10(post_count) normalized to 0-30 (penalizes >10M as oversaturated)
    - Engagement score: (avg_likes + avg_comments*3) / top_post_likes * 40
    - Velocity score: sqrt(posts_per_day) normalized to 0-30
    """
    # Volume: sweet spot is 100K-5M posts
    if m.post_count < 1000:
        vol_score = m.post_count / 1000 * 10
    elif m.post_count <= 5_000_000:
        vol_score = 10 + (math.log10(m.post_count) - 3) / (math.log10(5e6) - 3) * 20
    else:
        # Oversaturation penalty
        vol_score = max(0, 30 - (math.log10(m.post_count) - math.log10(5e6)) * 15)

    # Engagement: compare community avg vs top post
    if m.top_post_likes > 0:
        eng_score = min(40, (m.avg_likes + m.avg_comments * 3) / m.top_post_likes * 40)
    else:
        eng_score = 0

    # Velocity: active hashtags rank higher
    vel_score = min(30, math.sqrt(m.posts_per_day) * 3)

    total = vol_score + eng_score + vel_score
    return {
        "hashtag": m.name,
        "total_score": round(total, 1),
        "volume_score": round(vol_score, 1),
        "engagement_score": round(eng_score, 1),
        "velocity_score": round(vel_score, 1),
        "recommendation": (
            "ideal" if 50 <= total <= 80
            else "oversaturated" if total > 80
            else "niche" if 30 <= total < 50
            else "too_small"
        ),
    }

# Example usage
metrics = HashtagMetrics(
    name="webdevelopment",
    post_count=8_200_000,
    avg_likes=420,
    avg_comments=18,
    posts_per_day=1800,
    top_post_likes=45000,
)
result = compute_health_score(metrics)
print(result)
# {'hashtag': 'webdevelopment', 'total_score': 68.3, ..., 'recommendation': 'ideal'}

Run this against 50-100 candidate hashtags and sort by total_score to identify the best tags for a given niche.

Automating Competitor Hashtag Monitoring Reports

Combine the scraper with automated weekly reports comparing your hashtag performance vs competitors:

import json
from datetime import datetime
from pathlib import Path

def generate_hashtag_report(
    your_hashtags: list[str],
    competitor_hashtags: dict[str, list[str]],
    data: dict[str, dict],
    output_path: str = "hashtag_report.json"
):
    """Generate a competitive hashtag intelligence report."""
    report = {
        "generated_at": datetime.utcnow().isoformat(),
        "your_performance": {},
        "competitor_performance": {},
        "overlap_analysis": {},
        "opportunities": [],
    }

    your_set = set(your_hashtags)
    competitor_sets = {name: set(tags) for name, tags in competitor_hashtags.items()}
    all_competitor_tags = set.union(*competitor_sets.values())

    # Tags used by competitors but not you
    gaps = all_competitor_tags - your_set
    for tag in gaps:
        if tag in data:
            d = data[tag]
            report["opportunities"].append({
                "hashtag": tag,
                "post_count": d.get("post_count"),
                "used_by": [name for name, tags in competitor_sets.items() if tag in tags],
                "avg_engagement": d.get("avg_likes", 0) + d.get("avg_comments", 0) * 3,
            })

    # Sort by engagement
    report["opportunities"].sort(key=lambda x: x["avg_engagement"], reverse=True)

    Path(output_path).write_text(json.dumps(report, indent=2))
    print(f"Report saved: {output_path} ({len(report['opportunities'])} gap opportunities)")
    return report

Schedule this with cron or a simple asyncio loop to get weekly competitive intelligence automatically. Pair with ThorData residential proxies to keep sessions clean across multiple competitor account checks.