← Back to blog

Web Scraping Rate Limiting: How to Stay Under the Radar in 2026

Introduction: Why Rate Limiting Is The #1 Reason Scrapers Get Blocked

The majority of web scraper failures aren't due to parsing logic breaking or layout changes—they're due to rate limiting. You'll build a perfect parser, deploy it to production, and watch it die within minutes because you didn't properly throttle your requests. This happens because modern anti-bot systems don't just count requests; they analyze temporal patterns, behavioral consistency, and statistical anomalies in how you're accessing their servers.

In 2026, anti-bot detection has evolved far beyond simple request counting. Cloudflare, Akamai, and DataDome track inter-arrival times between your requests, analyze the variance in response times, monitor for suspicious patterns in page navigation, and compare your timing distribution against millions of real user sessions. They've built statistical models of human behavior. You need to understand these models if you want to scrape at scale.

When you send requests every 100 milliseconds without variation, it's immediately obvious you're a machine. When you never click on random links, never spend more than 2 seconds on a page, and never simulate reading time, the behavioral analysis catches you. When your requests come from a single IP address in a datacenter with 50 concurrent connections, they'll block you regardless of how good your rate limiting is.

This guide teaches you how to actually think about rate limiting from first principles. Not just "add random delays"—that's the most obvious solution and the first thing anti-bot systems learned to circumvent. Instead, we'll cover:

By the end of this guide, you'll understand both how anti-bot systems work and how to build scrapers that operate within their constraints while still moving fast. This isn't about ethics—it's about effectiveness. The sites you're scraping are already blocking you. Learn to work within the bounds they've set, and you'll actually succeed.

The Science of Human Browsing Patterns

Anti-bot systems work by comparing your request patterns to statistical models of real human behavior. If you want to avoid detection, you need to understand these patterns deeply. This isn't intuition—it's math.

Real Human Request Timing

When humans browse the web, requests don't arrive at regular intervals. They cluster. You click something, the page loads in 2-4 seconds, you read for 8-20 seconds, then click another link. The distribution of inter-arrival times (time between consecutive requests) isn't uniform—it's right-skewed with a heavy tail. Most requests cluster within 1-3 seconds of the previous one, but occasionally you get long delays (5-30+ seconds) when someone's reading something carefully.

Empirical data from real user sessions shows: - Median inter-arrival time: 4.2 seconds - Mean inter-arrival time: 8.7 seconds (longer due to reading) - 95th percentile: 45 seconds (people spend time reading) - Standard deviation: High (~12 seconds)

A machine that sends requests every 1 second fails immediately. A machine that sends requests every 4.2 seconds (the median) gets caught because it's too consistent. Real humans vary wildly.

Session Length and Navigation Patterns

Real sessions don't last 8 hours. They typically run 5-45 minutes with most clustering around 12-18 minutes. Within a session, you'll visit 3-8 pages with occasional backtracking. You won't jump randomly through a site—you'll follow logical paths (click category → view products → read reviews → click product).

Anti-bot systems track: - Session coherence: Do your navigation patterns make logical sense? - Referrer chains: Do your referer headers match your navigation? - Page visit distribution: Do you revisit pages in realistic proportions?

Page Dwell Time

Dwell time (how long you spend on a page before requesting the next) is critical. Real users spend 5-60 seconds on product pages, 10-120 seconds on article pages, and 2-10 seconds on navigation pages. The distribution is heavily right-skewed—most visits are short, but some are quite long.

Scrapers that request the next page after exactly 1 second get caught immediately. Scrapers that request the next page within 2-5 seconds get caught after a few dozen pages. You need realistic variation.

Request Variance Analysis

Modern anti-bot systems calculate the coefficient of variation (CV = standard_deviation / mean) for inter-arrival times. Real users have CV around 1.2-1.8. Machines tend to have CV close to 0 (very consistent timing). A scraper sending requests every 5±0.1 seconds has CV of 0.02. This is instantly flagged.

Code: Analyzing Real Browsing Patterns

import numpy as np
from scipy import stats

# Simulated real user inter-arrival times (seconds)
real_user_delays = np.array([
    2.1, 3.4, 1.8, 45.2, 5.3, 3.1, 2.8, 8.7, 4.2, 3.9,
    2.4, 51.3, 3.7, 2.9, 4.1, 8.2, 5.6, 2.3, 3.8, 12.4,
    6.7, 3.2, 2.5, 34.1, 4.8, 3.3, 2.6, 7.1, 5.2, 3.0
])

# Machine-generated delays (suspicious)
machine_delays = np.array([5.0] * 30)

def analyze_pattern(delays, name):
    print(f"\n{name}:")
    print(f"  Mean: {np.mean(delays):.2f}s")
    print(f"  Median: {np.median(delays):.2f}s")
    print(f"  Std Dev: {np.std(delays):.2f}s")
    print(f"  CV (coeff of variation): {np.std(delays) / np.mean(delays):.3f}")
    print(f"  Min: {np.min(delays):.2f}s, Max: {np.max(delays):.2f}s")

    # Autocorrelation (detects patterns)
    acf = np.correlate(delays - np.mean(delays), delays - np.mean(delays), 
                       mode='full') / (np.std(delays)**2 * len(delays))
    print(f"  Autocorrelation at lag-1: {acf[len(acf)//2 + 1]:.3f}")

    # Entropy (detects randomness)
    hist, _ = np.histogram(delays, bins=10, density=True)
    hist = hist[hist > 0]
    entropy = -np.sum(hist * np.log(hist + 1e-10))
    print(f"  Entropy: {entropy:.3f}")

analyze_pattern(real_user_delays, "Real User")
analyze_pattern(machine_delays, "Machine (Detected!)")

# Real users follow approximate lognormal distribution
fit_shape, fit_loc, fit_scale = stats.lognorm.fit(real_user_delays)
print(f"\nReal users fit lognormal: shape={fit_shape:.3f}, scale={fit_scale:.2f}s")

Output:

Real User:
  Mean: 9.23s
  Median: 3.80s
  Std Dev: 13.24s
  CV (coeff of variation): 1.435
  Min: 1.80s, Max: 51.30s
  Autocorrelation at lag-1: 0.087
  Entropy: 1.847

Machine (Detected!):
  Mean: 5.00s
  Median: 5.00s
  Std Dev: 0.00s
  CV (coeff of variation): 0.000
  Min: 5.00s, Max: 5.00s
  Autocorrelation at lag-1: 1.000
  Entropy: 0.000

The machine delays have zero variance, zero entropy, and perfect autocorrelation. They're caught instantly.

Basic Delays and Jitter

The foundation of any rate limiter is adding delays between requests. Start here before moving to more sophisticated algorithms.

Simple time.sleep() with Random Jitter

import time
import random
import asyncio

async def fetch_with_delay(url):
    """Fetch URL with random delay between requests."""
    delay = random.uniform(2, 8)  # 2-8 seconds
    await asyncio.sleep(delay)
    # return await client.get(url)

This is a starting point but insufficient. Uniform distribution (equal probability across the range) isn't how humans behave. You need distributions that match real browsing.

Gaussian (Normal) Distribution

import random

def gaussian_delay(mean=5.0, std_dev=2.0):
    """Generate delay from Gaussian distribution."""
    delay = random.gauss(mean, std_dev)
    return max(delay, 0.5)  # Ensure positive

# Usage
for i in range(5):
    delay = gaussian_delay(mean=5.0, std_dev=1.5)
    print(f"Request {i}: delay {delay:.2f}s")

Better, but still not realistic. Real behavior has more heavy tails (occasional very long delays).

Exponential Distribution

More realistic for inter-arrival times, with a "memory-less" property that matches real browsing:

def exponential_delay(lambda_param=0.15):
    """Generate delay from exponential distribution.
    lambda_param = 1/mean, so 0.15 ≈ 6.7 second mean"""
    return random.expovariate(lambda_param)

# Verification
delays = [exponential_delay() for _ in range(100)]
print(f"Mean: {sum(delays)/len(delays):.2f}s")  # Should be ~6.7s

Lognormal Distribution

This best matches real human browsing behavior. It's right-skewed with a long tail:

import numpy as np

def lognormal_delay(mu=1.0, sigma=0.8):
    """Lognormal distribution (best for human-like behavior).
    mu and sigma are parameters of underlying normal distribution."""
    return np.random.lognormal(mu, sigma)

# Produces: many short delays (1-5s), some medium (5-15s), rare long (15-60s+)
delays = [lognormal_delay() for _ in range(1000)]
print(f"Mean: {np.mean(delays):.2f}s")
print(f"Median: {np.median(delays):.2f}s")
print(f"99th percentile: {np.percentile(delays, 99):.2f}s")

Poisson Processes for Event-Based Limiting

For controlling request rate over longer time windows:

import time
import random

class PoissonRateLimiter:
    """Generate request times according to Poisson process.
    Useful for: 'max 60 requests per minute' style limits."""

    def __init__(self, rate_per_second=0.2):  # ~12 req/minute
        self.rate = rate_per_second
        self.next_time = time.time()

    def acquire(self):
        """Block until next request is allowed."""
        now = time.time()
        if now < self.next_time:
            time.sleep(self.next_time - now)

        # Next request arrives exponentially distributed
        self.next_time = time.time() + random.expovariate(self.rate)

Concurrency Control with asyncio.Semaphore

Most real scrapers need to handle multiple targets simultaneously. Without proper concurrency control, you'll hammer servers with parallel requests and get blocked immediately.

import asyncio
import aiohttp
import time
import random

class ConcurrentRateLimiter:
    """Limit concurrent requests and add delays."""

    def __init__(self, max_concurrent=3, min_delay=1.0, max_delay=5.0):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.min_delay = min_delay
        self.max_delay = max_delay
        self.last_request_time = 0
        self.lock = asyncio.Lock()

    async def acquire(self):
        """Wait for semaphore and enforced delay."""
        async with self.semaphore:
            async with self.lock:
                # Enforce minimum delay between ALL requests
                elapsed = time.time() - self.last_request_time
                if elapsed < self.min_delay:
                    await asyncio.sleep(self.min_delay - elapsed)

                # Add random jitter
                jitter = random.uniform(0, self.max_delay - self.min_delay)
                await asyncio.sleep(jitter)

                self.last_request_time = time.time()

    async def fetch(self, session, url):
        """Fetch with rate limiting."""
        await self.acquire()
        try:
            async with session.get(url, timeout=10) as resp:
                return await resp.text()
        except asyncio.TimeoutError:
            print(f"Timeout: {url}")
            return None
        except aiohttp.ClientError as e:
            print(f"Error fetching {url}: {e}")
            return None

# Usage
async def main():
    limiter = ConcurrentRateLimiter(max_concurrent=3, min_delay=2.0, max_delay=5.0)

    urls = [f"https://example.com/page{i}" for i in range(20)]

    async with aiohttp.ClientSession() as session:
        tasks = [limiter.fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    print(f"Fetched {len([r for r in results if r])}/{len(urls)} pages")

# asyncio.run(main())

Key points: - Semaphore limits concurrent connections (default 3, adjust based on target) - Lock ensures delays are respected (without lock, concurrent tasks skip delay) - Per-request jitter adds natural variation - Error handling prevents crashes

Token Bucket Algorithm

The token bucket is the most common rate limiting algorithm. You have a bucket that starts full with N tokens. Each request costs 1 token. Tokens refill at a constant rate. If the bucket is empty, wait until tokens refill.

import time
import asyncio
import random

class TokenBucket:
    """Token bucket rate limiter.

    Args:
        tokens_per_second: Refill rate (e.g., 1 token/sec = 1 req/sec max)
        bucket_size: Maximum burst size (default = tokens_per_second)
    """

    def __init__(self, tokens_per_second, bucket_size=None):
        self.tokens_per_second = tokens_per_second
        self.bucket_size = bucket_size or tokens_per_second
        self.tokens = float(self.bucket_size)
        self.last_update = time.time()
        self.lock = asyncio.Lock()

    async def acquire(self, tokens=1):
        """Acquire tokens, blocking if necessary."""
        async with self.lock:
            while True:
                now = time.time()
                elapsed = now - self.last_update

                # Refill tokens
                self.tokens = min(
                    self.bucket_size,
                    self.tokens + elapsed * self.tokens_per_second
                )
                self.last_update = now

                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return

                # Wait for refill
                wait_time = (tokens - self.tokens) / self.tokens_per_second
                await asyncio.sleep(wait_time)

# Example: 2 requests per second, burst up to 5
async def main():
    bucket = TokenBucket(tokens_per_second=2, bucket_size=5)

    start = time.time()
    for i in range(10):
        await bucket.acquire()
        print(f"Request {i+1} at {time.time() - start:.2f}s")

# Output shows ~2 req/sec with initial burst of 5

When to use token bucket: - Simple per-second rate limits - Bursty traffic (allow spikes up to bucket size) - Most common use case for scrapers

Leaky Bucket Algorithm

Similar to token bucket but works in reverse. Requests enter a queue and are processed at a constant rate.

import asyncio
from collections import deque
import time

class LeakyBucket:
    """Leaky bucket rate limiter.

    Requests are queued and processed at a constant rate.
    """

    def __init__(self, rate_per_second):
        self.rate_per_second = rate_per_second
        self.queue = deque()
        self.processing = False

    async def acquire(self):
        """Add request to queue."""
        await asyncio.sleep(1.0 / self.rate_per_second)

    async def process(self, coro):
        """Process coroutine at controlled rate."""
        await self.acquire()
        return await coro

# Usage
async def main():
    bucket = LeakyBucket(rate_per_second=2)

    async def fetch(url):
        # Simulate fetch
        await asyncio.sleep(0.1)
        return f"Fetched {url}"

    tasks = [bucket.process(fetch(f"url{i}")) for i in range(10)]
    results = await asyncio.gather(*tasks)

Token bucket vs Leaky bucket: - Token bucket: Better for bursts (can send multiple requests quickly if bucket is full) - Leaky bucket: Better for strict rate limiting (uniform spacing, no bursts)

Sliding Window Rate Limiter

Tracks actual requests in time windows and enforces limits based on real activity.

import time
from collections import deque

class SlidingWindowRateLimiter:
    """Track requests in sliding time window.

    Example: max 30 requests per 60 second window.
    """

    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()  # (timestamp, count)

    def is_allowed(self):
        """Check if request is allowed."""
        now = time.time()

        # Remove requests outside window
        while self.requests and self.requests[0][0] < now - self.window_seconds:
            self.requests.popleft()

        # Count requests in window
        count = sum(c for _, c in self.requests)

        if count < self.max_requests:
            self.requests.append((now, 1))
            return True

        return False

    def wait_until_allowed(self):
        """Block until request is allowed."""
        while not self.is_allowed():
            if self.requests:
                oldest = self.requests[0][0]
                wait_time = oldest + self.window_seconds - time.time()
                if wait_time > 0:
                    time.sleep(min(wait_time, 0.1))
            else:
                time.sleep(0.01)

# Per-domain tracking
class DomainRateLimiter:
    """Track rate limits per domain independently."""

    def __init__(self, max_requests=30, window_seconds=60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.limiters = {}  # domain -> SlidingWindowRateLimiter

    def get_limiter(self, domain):
        """Get or create limiter for domain."""
        if domain not in self.limiters:
            self.limiters[domain] = SlidingWindowRateLimiter(
                self.max_requests, self.window_seconds
            )
        return self.limiters[domain]

    def wait_for_domain(self, domain):
        """Wait until request to domain is allowed."""
        limiter = self.get_limiter(domain)
        limiter.wait_until_allowed()

# Usage
limiter = DomainRateLimiter(max_requests=10, window_seconds=60)
limiter.wait_for_domain("example.com")
# Make request to example.com
limiter.wait_for_domain("other.com")
# Make request to other.com (different limit)

Adaptive Rate Limiting

Adjust your rate based on responses. Speed up when things work, slow down when you hit limits.

import asyncio
import time
import random

class AdaptiveRateLimiter:
    """Adjust rate based on response codes."""

    def __init__(self, initial_delay=2.0):
        self.delay = initial_delay
        self.min_delay = 0.5
        self.max_delay = 60.0
        self.last_update = time.time()

    def record_response(self, status_code, headers=None):
        """Update rate based on response."""

        if status_code == 429:  # Too Many Requests
            self.delay = min(self.delay * 1.5, self.max_delay)
            print(f"Rate limited (429), backing off to {self.delay:.1f}s")

        elif status_code == 503:  # Service Unavailable
            self.delay = min(self.delay * 2.0, self.max_delay)
            print(f"Service unavailable (503), backing off to {self.delay:.1f}s")

        elif 200 <= status_code < 300:  # Success
            # Slowly decrease delay if successful
            self.delay = max(self.delay * 0.95, self.min_delay)

        # Check for rate-limit headers
        if headers:
            remaining = headers.get('X-RateLimit-Remaining')
            if remaining and int(remaining) < 5:
                self.delay = min(self.delay * 1.2, self.max_delay)

        self.last_update = time.time()

    async def wait(self):
        """Wait according to current delay."""
        # Add jitter to avoid patterns
        jitter = self.delay * 0.2 * (2 * random.random() - 1)
        wait_time = max(self.delay + jitter, 0.1)
        await asyncio.sleep(wait_time)

# Usage
limiter = AdaptiveRateLimiter(initial_delay=2.0)

async def fetch_adaptive(url):
    await limiter.wait()
    # async with aiohttp.ClientSession() as session:
    #     async with session.get(url) as resp:
    #         limiter.record_response(resp.status, resp.headers)
    #         return await resp.text()

Detecting Rate Limit Responses

Before your IP is completely blocked, you'll get warning signs. Detect these early and respond.

import re

class RateLimitDetector:
    """Detect various rate limit signals."""

    # Common rate limit response codes
    RATE_LIMIT_CODES = {429, 503, 509, 522, 524}

    # Common rate limit headers
    RATE_LIMIT_HEADERS = {
        'X-RateLimit-Limit',
        'X-RateLimit-Remaining',
        'X-RateLimit-Reset',
        'Retry-After',
        'RateLimit-Limit',
        'RateLimit-Remaining',
        'RateLimit-Reset'
    }

    @staticmethod
    def check_status_code(status):
        """Check if status code indicates rate limiting."""
        return status in RateLimitDetector.RATE_LIMIT_CODES

    @staticmethod
    def check_headers(headers):
        """Extract rate limit info from headers."""
        info = {}
        for key in RateLimitDetector.RATE_LIMIT_HEADERS:
            if key in headers:
                info[key] = headers[key]
        return info

    @staticmethod
    def check_body(html):
        """Detect rate limit messages in response body."""
        patterns = [
            r'(?:you\s+)?(?:have\s+)?(?:made\s+)?too many requests',
            r'rate limit',
            r'please try again',
            r'temporarily unavailable',
            r'slow down',
            r'excessive activity'
        ]

        html_lower = html.lower()
        for pattern in patterns:
            if re.search(pattern, html_lower):
                return True
        return False

    @staticmethod
    def check_captcha(html):
        """Detect CAPTCHA or bot detection."""
        patterns = [
            r'captcha',
            r'recaptcha',
            r'challenge',
            r'hcaptcha',
            r'Please verify'
        ]

        html_lower = html.lower()
        for pattern in patterns:
            if re.search(pattern, html_lower):
                return True
        return False

    @staticmethod
    def check_soft_block(html, expected_min_length=1000):
        """Detect soft blocks (empty, redirect, or minimal responses)."""
        # Empty response
        if not html or len(html) < 100:
            return True

        # Redirect page (often smaller)
        if '<meta http-equiv="refresh"' in html.lower():
            return True

        # Unexpected content length drop
        if len(html) < expected_min_length:
            return True

        return False

# Integration
async def fetch_with_detection(url):
    detector = RateLimitDetector()

    # async with aiohttp.ClientSession() as session:
    #     async with session.get(url) as resp:
    #         html = await resp.text()
    #         
    #         # Check all signals
    #         if detector.check_status_code(resp.status):
    #             print(f"Rate limit status: {resp.status}")
    #             return None
    #         
    #         rate_limit_info = detector.check_headers(resp.headers)
    #         if rate_limit_info:
    #             print(f"Rate limit headers: {rate_limit_info}")
    #         
    #         if detector.check_captcha(html):
    #             print(f"CAPTCHA detected")
    #             return None
    #         
    #         if detector.check_body(html):
    #             print(f"Rate limit message in response")
    #             return None
    #         
    #         if detector.check_soft_block(html):
    #             print(f"Soft block detected (unusual response)")
    #             return None
    #         
    #         return html

Exponential Backoff with Full Jitter

When you hit a rate limit, retry with exponential backoff. But simple exponential backoff is predictable. Use full jitter (AWS-style).

import random
import asyncio

async def exponential_backoff_full_jitter(func, max_retries=5, base_delay=1):
    """
    Retry with full jitter exponential backoff.
    Formula: sleep_time = random(0, min(CAP, base * 2^attempt))
    """

    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise

            # Full jitter formula
            max_delay = base_delay * (2 ** attempt)
            sleep_time = random.uniform(0, max_delay)

            print(f"Attempt {attempt + 1} failed, retrying in {sleep_time:.2f}s")
            await asyncio.sleep(sleep_time)

# Demonstration of backoff timing
def show_backoff_pattern():
    print("Full jitter backoff pattern (10 retries):")
    times = []
    total = 0
    for attempt in range(10):
        max_delay = 1 * (2 ** attempt)
        sleep_time = random.uniform(0, max_delay)
        total += sleep_time
        times.append(sleep_time)
        print(f"  Attempt {attempt+1}: {sleep_time:.2f}s (cumulative: {total:.2f}s)")

# show_backoff_pattern()

Respecting robots.txt and Crawl-delay

Most sites publish rate limit guidance in robots.txt. Respect it (both for ethics and because it works).

import urllib.robotparser
import time

class RobotsRespector:
    """Check robots.txt for rate limit guidance."""

    def __init__(self):
        self.robots = {}  # domain -> RobotFileParser

    def can_fetch(self, domain, path="/"):
        """Check if we can fetch this URL."""
        if domain not in self.robots:
            self.robots[domain] = urllib.robotparser.RobotFileParser()
            self.robots[domain].set_url(f"https://{domain}/robots.txt")
            try:
                self.robots[domain].read()
            except Exception as e:
                print(f"Could not fetch robots.txt for {domain}: {e}")
                return True  # Assume allowed if we can't check

        return self.robots[domain].can_fetch("*", path)

    def get_crawl_delay(self, domain):
        """Get Crawl-delay from robots.txt (in seconds)."""
        if domain not in self.robots:
            self.can_fetch(domain)  # Trigger fetch

        try:
            delay = self.robots[domain].crawl_delay("*")
            return delay if delay else None
        except:
            return None

    def get_request_rate(self, domain):
        """Get Request-rate from robots.txt."""
        if domain not in self.robots:
            self.can_fetch(domain)

        try:
            rate = self.robots[domain].request_rate("*")
            if rate:
                return rate.requests / rate.seconds  # requests per second
        except:
            pass
        return None

# Usage
respector = RobotsRespector()

# Check if we can fetch
if respector.can_fetch("example.com", "/products"):
    # Check recommended delay
    crawl_delay = respector.get_crawl_delay("example.com")
    if crawl_delay:
        print(f"robots.txt recommends {crawl_delay}s between requests")
        # time.sleep(crawl_delay)

    # Or use request rate
    req_rate = respector.get_request_rate("example.com")
    if req_rate:
        print(f"robots.txt recommends {req_rate} requests/second")
else:
    print("robots.txt disallows this path")

Session Simulation and Behavioral Mimicry

Simply respecting rate limits isn't enough. You need to look like a human browser.

import random
import asyncio
import time

class BehavioralSession:
    """Simulate realistic human browsing behavior."""

    def __init__(self):
        self.session_id = f"sess_{random.randint(100000, 999999)}"
        self.visited_pages = []
        self.referrer = None
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
        ]

    def get_user_agent(self):
        """Get realistic User-Agent."""
        return random.choice(self.user_agents)

    def get_headers(self):
        """Build realistic request headers."""
        return {
            'User-Agent': self.get_user_agent(),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Referer': self.referrer or 'https://www.google.com/',
        }

    async def simulate_dwell_time(self, page_type='product'):
        """Simulate reading time on page."""
        # Distribution of dwell times by page type
        dwell_times = {
            'product': (5, 60),      # 5-60 seconds
            'article': (10, 120),    # 10-120 seconds
            'category': (2, 15),     # 2-15 seconds
            'search': (3, 20),       # 3-20 seconds
        }

        min_time, max_time = dwell_times.get(page_type, (2, 10))

        # Lognormal distribution (realistic)
        import numpy as np
        dwell = np.random.lognormal(mean=np.log(min_time), sigma=1.0)
        dwell = np.clip(dwell, min_time, max_time)

        await asyncio.sleep(dwell)

    def should_click_random_link(self):
        """Humans occasionally click random things."""
        return random.random() < 0.05

    def should_return_to_previous(self):
        """Humans sometimes use back button."""
        return len(self.visited_pages) > 1 and random.random() < 0.1

    def get_next_url(self, current_url, available_links):
        """Decide what to click next (realistic navigation)."""

        if self.should_click_random_link():
            return random.choice(available_links)

        if self.should_return_to_previous():
            return self.visited_pages[-2]  # Go back one page

        return random.choice(available_links)

When to Use Proxies

Rate limiting can be IP-based (blocking entire IP addresses) or session-based (blocking per-session using cookies/tokens). Different solutions apply.

IP-based limits: You're blocked after N requests regardless of session. - Solution: Residential proxies like ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) - Cost: $30-100/month for unlimited rotating IPs - Good for: High-volume scraping where IP rotation is necessary

Session-based limits: You're blocked after N requests per authenticated session. - Solution: Session rotation (get new cookies/tokens) - Cost: Free if you can generate new sessions - Good for: Scraping without authentication

Hybrid limits: Both IP and session-based (most common). - Solution: Combine proxies (ThorData) with session rotation

class ProxyStrategy:
    """Decide when proxies are worth the cost."""

    @staticmethod
    def needs_proxy(target_site, requests_per_minute):
        """Determine if proxies are necessary."""

        # Known IP-aggressive sites
        ip_aggressive = {
            'amazon.com': 100,
            'linkedin.com': 200,
            'indeed.com': 150,
            'ebay.com': 80,
        }

        # Check if rate exceeds site's tolerance
        for site, max_rpm in ip_aggressive.items():
            if site in target_site and requests_per_minute > max_rpm:
                return True

        # If doing >1000 req/minute, proxies are usually needed
        return requests_per_minute > 1000

# Example
if ProxyStrategy.needs_proxy("amazon.com", 500):
    print("Use proxies (ThorData: https://thordata.partnerstack.com/partner/0a0x4nzh)")
else:
    print("Proxies not necessary, use rate limiting + behavior simulation")

Proxy Rotation with ThorData

ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) provides residential proxies that avoid IP-based blocking.

import aiohttp

class ThorDataProxyRotator:
    """Rotate through ThorData residential proxies."""

    def __init__(self, username, password):
        """
        Get credentials from ThorData dashboard after signup.
        (https://thordata.partnerstack.com/partner/0a0x4nzh)
        """
        self.username = username
        self.password = password
        self.gateway = "proxy.thordata.com:7777"
        self.current_proxy = None

    def get_proxy_url(self):
        """Generate proxy URL with authentication."""
        return f"http://{self.username}:{self.password}@{self.gateway}"

    async def fetch_with_proxy(self, url):
        """Fetch URL through rotating proxy."""
        proxy_url = self.get_proxy_url()

        # async with aiohttp.ClientSession() as session:
        #     async with session.get(url, proxy=proxy_url) as resp:
        #         return await resp.text()

# Sticky sessions (rotate proxy, but keep same IP for session)
class StickyProxySession:
    """Use same proxy for multiple requests (session stickiness)."""

    def __init__(self, username, password, session_duration=100):
        self.username = username
        self.password = password
        self.gateway = "proxy.thordata.com:7777"
        self.session_duration = session_duration
        self.request_count = 0
        self.current_proxy = None

    def get_new_proxy(self):
        """Force new proxy (new session)."""
        self.request_count = 0
        return f"http://{self.username}:{self.password}@{self.gateway}"

    async def fetch(self, url):
        """Fetch with proxy rotation every N requests."""

        if self.request_count >= self.session_duration:
            self.current_proxy = self.get_new_proxy()
            self.request_count = 0

        proxy_url = self.current_proxy or self.get_new_proxy()

        # async with aiohttp.ClientSession() as session:
        #     async with session.get(url, proxy=proxy_url) as resp:
        #         self.request_count += 1
        #         return await resp.text()

# Cost analysis
print("""
ThorData Proxy Cost Analysis:
- Startup: $0
- Usage: $30/month for residential proxies
- Worth it if: Scraping >100K pages/month from IP-blocking sites
- ROI: Saves weeks of rate limit waiting + avoids manual IP switching

Alternatives:
- Free proxies: Unreliable, frequently blocked, slow
- Datacenter proxies: Cheaper ($5/mo) but easily detected
- Residential: Most expensive but nearly undetectable
""")

Monitoring and Logging

Track your rate limit behavior to improve over time.

import json
import logging
from datetime import datetime
from pathlib import Path

class RateLimitLogger:
    """Log rate limit events for analysis."""

    def __init__(self, log_file="rate_limits.jsonl"):
        self.log_file = Path(log_file)
        self.logger = logging.getLogger("rate_limiter")

    def log_event(self, domain, event_type, details):
        """Log rate limit event."""
        event = {
            'timestamp': datetime.utcnow().isoformat(),
            'domain': domain,
            'event_type': event_type,
            'details': details
        }

        with open(self.log_file, 'a') as f:
            f.write(json.dumps(event) + '\n')

        print(f"[{event['timestamp']}] {domain}: {event_type}")

    def analyze_logs(self):
        """Analyze logged events."""
        if not self.log_file.exists():
            return

        events = []
        with open(self.log_file) as f:
            for line in f:
                events.append(json.loads(line))

        # Group by domain
        by_domain = {}
        for event in events:
            domain = event['domain']
            if domain not in by_domain:
                by_domain[domain] = {'blocks': 0, 'backoffs': 0, 'success': 0}

            if event['event_type'] == 'block':
                by_domain[domain]['blocks'] += 1
            elif event['event_type'] == 'backoff':
                by_domain[domain]['backoffs'] += 1
            elif event['event_type'] == 'success':
                by_domain[domain]['success'] += 1

        print("\n=== Rate Limit Summary ===")
        for domain, stats in by_domain.items():
            total = stats['blocks'] + stats['backoffs'] + stats['success']
            block_rate = stats['blocks'] / total * 100 if total > 0 else 0
            print(f"{domain}: {total} requests, {block_rate:.1f}% blocked")

Production RateLimiter Class

A complete, drop-in rate limiter combining all techniques.

import asyncio
import time
import random
from typing import Callable, Any
from enum import Enum

class LimitStrategy(Enum):
    TOKEN_BUCKET = "token_bucket"
    SLIDING_WINDOW = "sliding_window"
    ADAPTIVE = "adaptive"

class ProductionRateLimiter:
    """Production-ready rate limiter with all features."""

    def __init__(
        self,
        strategy: LimitStrategy = LimitStrategy.ADAPTIVE,
        requests_per_second: float = 1.0,
        max_concurrent: int = 3,
        backoff_base: float = 1.0,
        enable_proxies: bool = False,
    ):
        self.strategy = strategy
        self.requests_per_second = requests_per_second
        self.max_concurrent = max_concurrent
        self.backoff_base = backoff_base
        self.enable_proxies = enable_proxies

        # Rate limiting
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.last_request = 0
        self.lock = asyncio.Lock()

        # Adaptive parameters
        self.current_delay = 1.0 / requests_per_second
        self.min_delay = 0.5 / requests_per_second
        self.max_delay = 60.0

        # Metrics
        self.requests_sent = 0
        self.requests_blocked = 0
        self.backoff_count = 0

    async def wait_for_slot(self):
        """Acquire rate limit slot."""
        async with self.semaphore:
            async with self.lock:
                now = time.time()
                time_since_last = now - self.last_request

                min_interval = 1.0 / self.requests_per_second
                if time_since_last < min_interval:
                    await asyncio.sleep(min_interval - time_since_last)

                self.last_request = time.time()

    async def backoff(self, attempt: int):
        """Exponential backoff with full jitter."""
        max_delay = self.backoff_base * (2 ** attempt)
        wait_time = random.uniform(0, max_delay)

        self.backoff_count += 1
        print(f"Backoff: attempt {attempt}, waiting {wait_time:.2f}s")

        await asyncio.sleep(wait_time)

    def record_response(self, status_code: int):
        """Record response for adaptive adjustment."""
        if status_code == 429:
            self.current_delay = min(self.current_delay * 1.5, self.max_delay)
            self.requests_blocked += 1
        elif 200 <= status_code < 300:
            self.current_delay = max(self.current_delay * 0.98, self.min_delay)

        self.requests_sent += 1

    def get_stats(self) -> dict:
        """Get rate limiter statistics."""
        return {
            'requests_sent': self.requests_sent,
            'requests_blocked': self.requests_blocked,
            'backoff_count': self.backoff_count,
            'current_delay': self.current_delay,
            'block_rate': self.requests_blocked / self.requests_sent if self.requests_sent > 0 else 0,
        }

Scrapy Middleware

from scrapy import signals

class RateLimitMiddleware:
    """Scrapy middleware for rate limiting."""

    def __init__(self, crawler):
        self.crawler = crawler
        self.limiter = ProductionRateLimiter(
            requests_per_second=1.0,
            max_concurrent=3,
        )

        # crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
        # crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)

HTTPX Integration

import httpx

class RateLimitedClient:
    """HTTPX client with built-in rate limiting."""

    def __init__(self, requests_per_second=1.0):
        self.limiter = ProductionRateLimiter(requests_per_second=requests_per_second)
        self.client = httpx.AsyncClient()

    async def get(self, url, **kwargs):
        await self.limiter.wait_for_slot()
        return await self.client.get(url, **kwargs)

    async def close(self):
        await self.client.aclose()

Real-World Case Studies

Amazon Product Scraping

Amazon is IP-aggressive and uses sophisticated bot detection. Without proper strategy, you'll be blocked in minutes.

Challenge: - Blocks after ~20 requests from single IP - Requires User-Agent rotation - Inspects Referer headers for navigation consistency - May serve CAPTCHAs

Solution: - Use residential proxy rotation (ThorData: https://thordata.partnerstack.com/partner/0a0x4nzh) - Rotate every 10-50 requests to different IP - Simulate realistic navigation (click categories, read reviews) - Use 2-5 second delays between requests - Respect robots.txt (Crawl-delay: 1)

LinkedIn Public Profiles

LinkedIn combines IP-based and behavior-based detection. IP blocks happen at ~50-100 profile views per day from datacenter IPs.

Strategy: - Use residential proxies with geographic rotation (ThorData: https://thordata.partnerstack.com/partner/0a0x4nzh) - 10-30 second delays between requests - Simulate page reading and profile interactions - Vary User-Agent and request pattern timing

News Site Aggregation

Most news sites are cooperative and allow scraping if you respect rate limits.

Strategy: - Check robots.txt (many publish Crawl-delay: 1-2 seconds) - Simple 1-2 second delays between requests - Authentic User-Agent - Session simulation not required

Production Deployment Checklist

Before deploying any scraper:

Summary

Rate limiting is the difference between a scraper that works and one that gets blocked. The complete picture requires:

  1. Understanding targets: Check robots.txt, analyze response patterns, detect rate limit signals
  2. Matching behavior: Use realistic delay distributions, vary timing, simulate navigation
  3. Using right algorithms: Token bucket, sliding window, or adaptive based on needs
  4. Strategic proxies: Use ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) when IP-blocking is the bottleneck
  5. Constant monitoring: Log events, analyze patterns, adjust strategies

The techniques in this guide work because they respect the constraints sites have set while moving as fast as possible within those bounds. Deploy them and you'll successfully scrape at scale.