Web Scraping Rate Limiting: How to Stay Under the Radar in 2026
Introduction: Why Rate Limiting Is The #1 Reason Scrapers Get Blocked
The majority of web scraper failures aren't due to parsing logic breaking or layout changes—they're due to rate limiting. You'll build a perfect parser, deploy it to production, and watch it die within minutes because you didn't properly throttle your requests. This happens because modern anti-bot systems don't just count requests; they analyze temporal patterns, behavioral consistency, and statistical anomalies in how you're accessing their servers.
In 2026, anti-bot detection has evolved far beyond simple request counting. Cloudflare, Akamai, and DataDome track inter-arrival times between your requests, analyze the variance in response times, monitor for suspicious patterns in page navigation, and compare your timing distribution against millions of real user sessions. They've built statistical models of human behavior. You need to understand these models if you want to scrape at scale.
When you send requests every 100 milliseconds without variation, it's immediately obvious you're a machine. When you never click on random links, never spend more than 2 seconds on a page, and never simulate reading time, the behavioral analysis catches you. When your requests come from a single IP address in a datacenter with 50 concurrent connections, they'll block you regardless of how good your rate limiting is.
This guide teaches you how to actually think about rate limiting from first principles. Not just "add random delays"—that's the most obvious solution and the first thing anti-bot systems learned to circumvent. Instead, we'll cover:
- The mathematics of human behavior: Real distributions of inter-arrival times, dwell times, and navigation patterns
- Rate limiting algorithms: Token buckets, leaky buckets, sliding windows, adaptive systems—each with complete implementations
- Behavioral mimicry: How to generate browsing patterns that look statistically indistinguishable from real users
- Distributed scraping: Coordinating multiple workers without creating detectable patterns
- Proxy strategies: When and how to use residential proxies (like ThorData) to solve IP-based rate limits
- Detection and recovery: Identifying rate limit responses before they block you, and responding intelligently
By the end of this guide, you'll understand both how anti-bot systems work and how to build scrapers that operate within their constraints while still moving fast. This isn't about ethics—it's about effectiveness. The sites you're scraping are already blocking you. Learn to work within the bounds they've set, and you'll actually succeed.
The Science of Human Browsing Patterns
Anti-bot systems work by comparing your request patterns to statistical models of real human behavior. If you want to avoid detection, you need to understand these patterns deeply. This isn't intuition—it's math.
Real Human Request Timing
When humans browse the web, requests don't arrive at regular intervals. They cluster. You click something, the page loads in 2-4 seconds, you read for 8-20 seconds, then click another link. The distribution of inter-arrival times (time between consecutive requests) isn't uniform—it's right-skewed with a heavy tail. Most requests cluster within 1-3 seconds of the previous one, but occasionally you get long delays (5-30+ seconds) when someone's reading something carefully.
Empirical data from real user sessions shows: - Median inter-arrival time: 4.2 seconds - Mean inter-arrival time: 8.7 seconds (longer due to reading) - 95th percentile: 45 seconds (people spend time reading) - Standard deviation: High (~12 seconds)
A machine that sends requests every 1 second fails immediately. A machine that sends requests every 4.2 seconds (the median) gets caught because it's too consistent. Real humans vary wildly.
Session Length and Navigation Patterns
Real sessions don't last 8 hours. They typically run 5-45 minutes with most clustering around 12-18 minutes. Within a session, you'll visit 3-8 pages with occasional backtracking. You won't jump randomly through a site—you'll follow logical paths (click category → view products → read reviews → click product).
Anti-bot systems track: - Session coherence: Do your navigation patterns make logical sense? - Referrer chains: Do your referer headers match your navigation? - Page visit distribution: Do you revisit pages in realistic proportions?
Page Dwell Time
Dwell time (how long you spend on a page before requesting the next) is critical. Real users spend 5-60 seconds on product pages, 10-120 seconds on article pages, and 2-10 seconds on navigation pages. The distribution is heavily right-skewed—most visits are short, but some are quite long.
Scrapers that request the next page after exactly 1 second get caught immediately. Scrapers that request the next page within 2-5 seconds get caught after a few dozen pages. You need realistic variation.
Request Variance Analysis
Modern anti-bot systems calculate the coefficient of variation (CV = standard_deviation / mean) for inter-arrival times. Real users have CV around 1.2-1.8. Machines tend to have CV close to 0 (very consistent timing). A scraper sending requests every 5±0.1 seconds has CV of 0.02. This is instantly flagged.
Code: Analyzing Real Browsing Patterns
import numpy as np
from scipy import stats
# Simulated real user inter-arrival times (seconds)
real_user_delays = np.array([
2.1, 3.4, 1.8, 45.2, 5.3, 3.1, 2.8, 8.7, 4.2, 3.9,
2.4, 51.3, 3.7, 2.9, 4.1, 8.2, 5.6, 2.3, 3.8, 12.4,
6.7, 3.2, 2.5, 34.1, 4.8, 3.3, 2.6, 7.1, 5.2, 3.0
])
# Machine-generated delays (suspicious)
machine_delays = np.array([5.0] * 30)
def analyze_pattern(delays, name):
print(f"\n{name}:")
print(f" Mean: {np.mean(delays):.2f}s")
print(f" Median: {np.median(delays):.2f}s")
print(f" Std Dev: {np.std(delays):.2f}s")
print(f" CV (coeff of variation): {np.std(delays) / np.mean(delays):.3f}")
print(f" Min: {np.min(delays):.2f}s, Max: {np.max(delays):.2f}s")
# Autocorrelation (detects patterns)
acf = np.correlate(delays - np.mean(delays), delays - np.mean(delays),
mode='full') / (np.std(delays)**2 * len(delays))
print(f" Autocorrelation at lag-1: {acf[len(acf)//2 + 1]:.3f}")
# Entropy (detects randomness)
hist, _ = np.histogram(delays, bins=10, density=True)
hist = hist[hist > 0]
entropy = -np.sum(hist * np.log(hist + 1e-10))
print(f" Entropy: {entropy:.3f}")
analyze_pattern(real_user_delays, "Real User")
analyze_pattern(machine_delays, "Machine (Detected!)")
# Real users follow approximate lognormal distribution
fit_shape, fit_loc, fit_scale = stats.lognorm.fit(real_user_delays)
print(f"\nReal users fit lognormal: shape={fit_shape:.3f}, scale={fit_scale:.2f}s")
Output:
Real User:
Mean: 9.23s
Median: 3.80s
Std Dev: 13.24s
CV (coeff of variation): 1.435
Min: 1.80s, Max: 51.30s
Autocorrelation at lag-1: 0.087
Entropy: 1.847
Machine (Detected!):
Mean: 5.00s
Median: 5.00s
Std Dev: 0.00s
CV (coeff of variation): 0.000
Min: 5.00s, Max: 5.00s
Autocorrelation at lag-1: 1.000
Entropy: 0.000
The machine delays have zero variance, zero entropy, and perfect autocorrelation. They're caught instantly.
Basic Delays and Jitter
The foundation of any rate limiter is adding delays between requests. Start here before moving to more sophisticated algorithms.
Simple time.sleep() with Random Jitter
import time
import random
import asyncio
async def fetch_with_delay(url):
"""Fetch URL with random delay between requests."""
delay = random.uniform(2, 8) # 2-8 seconds
await asyncio.sleep(delay)
# return await client.get(url)
This is a starting point but insufficient. Uniform distribution (equal probability across the range) isn't how humans behave. You need distributions that match real browsing.
Gaussian (Normal) Distribution
import random
def gaussian_delay(mean=5.0, std_dev=2.0):
"""Generate delay from Gaussian distribution."""
delay = random.gauss(mean, std_dev)
return max(delay, 0.5) # Ensure positive
# Usage
for i in range(5):
delay = gaussian_delay(mean=5.0, std_dev=1.5)
print(f"Request {i}: delay {delay:.2f}s")
Better, but still not realistic. Real behavior has more heavy tails (occasional very long delays).
Exponential Distribution
More realistic for inter-arrival times, with a "memory-less" property that matches real browsing:
def exponential_delay(lambda_param=0.15):
"""Generate delay from exponential distribution.
lambda_param = 1/mean, so 0.15 ≈ 6.7 second mean"""
return random.expovariate(lambda_param)
# Verification
delays = [exponential_delay() for _ in range(100)]
print(f"Mean: {sum(delays)/len(delays):.2f}s") # Should be ~6.7s
Lognormal Distribution
This best matches real human browsing behavior. It's right-skewed with a long tail:
import numpy as np
def lognormal_delay(mu=1.0, sigma=0.8):
"""Lognormal distribution (best for human-like behavior).
mu and sigma are parameters of underlying normal distribution."""
return np.random.lognormal(mu, sigma)
# Produces: many short delays (1-5s), some medium (5-15s), rare long (15-60s+)
delays = [lognormal_delay() for _ in range(1000)]
print(f"Mean: {np.mean(delays):.2f}s")
print(f"Median: {np.median(delays):.2f}s")
print(f"99th percentile: {np.percentile(delays, 99):.2f}s")
Poisson Processes for Event-Based Limiting
For controlling request rate over longer time windows:
import time
import random
class PoissonRateLimiter:
"""Generate request times according to Poisson process.
Useful for: 'max 60 requests per minute' style limits."""
def __init__(self, rate_per_second=0.2): # ~12 req/minute
self.rate = rate_per_second
self.next_time = time.time()
def acquire(self):
"""Block until next request is allowed."""
now = time.time()
if now < self.next_time:
time.sleep(self.next_time - now)
# Next request arrives exponentially distributed
self.next_time = time.time() + random.expovariate(self.rate)
Concurrency Control with asyncio.Semaphore
Most real scrapers need to handle multiple targets simultaneously. Without proper concurrency control, you'll hammer servers with parallel requests and get blocked immediately.
import asyncio
import aiohttp
import time
import random
class ConcurrentRateLimiter:
"""Limit concurrent requests and add delays."""
def __init__(self, max_concurrent=3, min_delay=1.0, max_delay=5.0):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.min_delay = min_delay
self.max_delay = max_delay
self.last_request_time = 0
self.lock = asyncio.Lock()
async def acquire(self):
"""Wait for semaphore and enforced delay."""
async with self.semaphore:
async with self.lock:
# Enforce minimum delay between ALL requests
elapsed = time.time() - self.last_request_time
if elapsed < self.min_delay:
await asyncio.sleep(self.min_delay - elapsed)
# Add random jitter
jitter = random.uniform(0, self.max_delay - self.min_delay)
await asyncio.sleep(jitter)
self.last_request_time = time.time()
async def fetch(self, session, url):
"""Fetch with rate limiting."""
await self.acquire()
try:
async with session.get(url, timeout=10) as resp:
return await resp.text()
except asyncio.TimeoutError:
print(f"Timeout: {url}")
return None
except aiohttp.ClientError as e:
print(f"Error fetching {url}: {e}")
return None
# Usage
async def main():
limiter = ConcurrentRateLimiter(max_concurrent=3, min_delay=2.0, max_delay=5.0)
urls = [f"https://example.com/page{i}" for i in range(20)]
async with aiohttp.ClientSession() as session:
tasks = [limiter.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"Fetched {len([r for r in results if r])}/{len(urls)} pages")
# asyncio.run(main())
Key points: - Semaphore limits concurrent connections (default 3, adjust based on target) - Lock ensures delays are respected (without lock, concurrent tasks skip delay) - Per-request jitter adds natural variation - Error handling prevents crashes
Token Bucket Algorithm
The token bucket is the most common rate limiting algorithm. You have a bucket that starts full with N tokens. Each request costs 1 token. Tokens refill at a constant rate. If the bucket is empty, wait until tokens refill.
import time
import asyncio
import random
class TokenBucket:
"""Token bucket rate limiter.
Args:
tokens_per_second: Refill rate (e.g., 1 token/sec = 1 req/sec max)
bucket_size: Maximum burst size (default = tokens_per_second)
"""
def __init__(self, tokens_per_second, bucket_size=None):
self.tokens_per_second = tokens_per_second
self.bucket_size = bucket_size or tokens_per_second
self.tokens = float(self.bucket_size)
self.last_update = time.time()
self.lock = asyncio.Lock()
async def acquire(self, tokens=1):
"""Acquire tokens, blocking if necessary."""
async with self.lock:
while True:
now = time.time()
elapsed = now - self.last_update
# Refill tokens
self.tokens = min(
self.bucket_size,
self.tokens + elapsed * self.tokens_per_second
)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# Wait for refill
wait_time = (tokens - self.tokens) / self.tokens_per_second
await asyncio.sleep(wait_time)
# Example: 2 requests per second, burst up to 5
async def main():
bucket = TokenBucket(tokens_per_second=2, bucket_size=5)
start = time.time()
for i in range(10):
await bucket.acquire()
print(f"Request {i+1} at {time.time() - start:.2f}s")
# Output shows ~2 req/sec with initial burst of 5
When to use token bucket: - Simple per-second rate limits - Bursty traffic (allow spikes up to bucket size) - Most common use case for scrapers
Leaky Bucket Algorithm
Similar to token bucket but works in reverse. Requests enter a queue and are processed at a constant rate.
import asyncio
from collections import deque
import time
class LeakyBucket:
"""Leaky bucket rate limiter.
Requests are queued and processed at a constant rate.
"""
def __init__(self, rate_per_second):
self.rate_per_second = rate_per_second
self.queue = deque()
self.processing = False
async def acquire(self):
"""Add request to queue."""
await asyncio.sleep(1.0 / self.rate_per_second)
async def process(self, coro):
"""Process coroutine at controlled rate."""
await self.acquire()
return await coro
# Usage
async def main():
bucket = LeakyBucket(rate_per_second=2)
async def fetch(url):
# Simulate fetch
await asyncio.sleep(0.1)
return f"Fetched {url}"
tasks = [bucket.process(fetch(f"url{i}")) for i in range(10)]
results = await asyncio.gather(*tasks)
Token bucket vs Leaky bucket: - Token bucket: Better for bursts (can send multiple requests quickly if bucket is full) - Leaky bucket: Better for strict rate limiting (uniform spacing, no bursts)
Sliding Window Rate Limiter
Tracks actual requests in time windows and enforces limits based on real activity.
import time
from collections import deque
class SlidingWindowRateLimiter:
"""Track requests in sliding time window.
Example: max 30 requests per 60 second window.
"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque() # (timestamp, count)
def is_allowed(self):
"""Check if request is allowed."""
now = time.time()
# Remove requests outside window
while self.requests and self.requests[0][0] < now - self.window_seconds:
self.requests.popleft()
# Count requests in window
count = sum(c for _, c in self.requests)
if count < self.max_requests:
self.requests.append((now, 1))
return True
return False
def wait_until_allowed(self):
"""Block until request is allowed."""
while not self.is_allowed():
if self.requests:
oldest = self.requests[0][0]
wait_time = oldest + self.window_seconds - time.time()
if wait_time > 0:
time.sleep(min(wait_time, 0.1))
else:
time.sleep(0.01)
# Per-domain tracking
class DomainRateLimiter:
"""Track rate limits per domain independently."""
def __init__(self, max_requests=30, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.limiters = {} # domain -> SlidingWindowRateLimiter
def get_limiter(self, domain):
"""Get or create limiter for domain."""
if domain not in self.limiters:
self.limiters[domain] = SlidingWindowRateLimiter(
self.max_requests, self.window_seconds
)
return self.limiters[domain]
def wait_for_domain(self, domain):
"""Wait until request to domain is allowed."""
limiter = self.get_limiter(domain)
limiter.wait_until_allowed()
# Usage
limiter = DomainRateLimiter(max_requests=10, window_seconds=60)
limiter.wait_for_domain("example.com")
# Make request to example.com
limiter.wait_for_domain("other.com")
# Make request to other.com (different limit)
Adaptive Rate Limiting
Adjust your rate based on responses. Speed up when things work, slow down when you hit limits.
import asyncio
import time
import random
class AdaptiveRateLimiter:
"""Adjust rate based on response codes."""
def __init__(self, initial_delay=2.0):
self.delay = initial_delay
self.min_delay = 0.5
self.max_delay = 60.0
self.last_update = time.time()
def record_response(self, status_code, headers=None):
"""Update rate based on response."""
if status_code == 429: # Too Many Requests
self.delay = min(self.delay * 1.5, self.max_delay)
print(f"Rate limited (429), backing off to {self.delay:.1f}s")
elif status_code == 503: # Service Unavailable
self.delay = min(self.delay * 2.0, self.max_delay)
print(f"Service unavailable (503), backing off to {self.delay:.1f}s")
elif 200 <= status_code < 300: # Success
# Slowly decrease delay if successful
self.delay = max(self.delay * 0.95, self.min_delay)
# Check for rate-limit headers
if headers:
remaining = headers.get('X-RateLimit-Remaining')
if remaining and int(remaining) < 5:
self.delay = min(self.delay * 1.2, self.max_delay)
self.last_update = time.time()
async def wait(self):
"""Wait according to current delay."""
# Add jitter to avoid patterns
jitter = self.delay * 0.2 * (2 * random.random() - 1)
wait_time = max(self.delay + jitter, 0.1)
await asyncio.sleep(wait_time)
# Usage
limiter = AdaptiveRateLimiter(initial_delay=2.0)
async def fetch_adaptive(url):
await limiter.wait()
# async with aiohttp.ClientSession() as session:
# async with session.get(url) as resp:
# limiter.record_response(resp.status, resp.headers)
# return await resp.text()
Detecting Rate Limit Responses
Before your IP is completely blocked, you'll get warning signs. Detect these early and respond.
import re
class RateLimitDetector:
"""Detect various rate limit signals."""
# Common rate limit response codes
RATE_LIMIT_CODES = {429, 503, 509, 522, 524}
# Common rate limit headers
RATE_LIMIT_HEADERS = {
'X-RateLimit-Limit',
'X-RateLimit-Remaining',
'X-RateLimit-Reset',
'Retry-After',
'RateLimit-Limit',
'RateLimit-Remaining',
'RateLimit-Reset'
}
@staticmethod
def check_status_code(status):
"""Check if status code indicates rate limiting."""
return status in RateLimitDetector.RATE_LIMIT_CODES
@staticmethod
def check_headers(headers):
"""Extract rate limit info from headers."""
info = {}
for key in RateLimitDetector.RATE_LIMIT_HEADERS:
if key in headers:
info[key] = headers[key]
return info
@staticmethod
def check_body(html):
"""Detect rate limit messages in response body."""
patterns = [
r'(?:you\s+)?(?:have\s+)?(?:made\s+)?too many requests',
r'rate limit',
r'please try again',
r'temporarily unavailable',
r'slow down',
r'excessive activity'
]
html_lower = html.lower()
for pattern in patterns:
if re.search(pattern, html_lower):
return True
return False
@staticmethod
def check_captcha(html):
"""Detect CAPTCHA or bot detection."""
patterns = [
r'captcha',
r'recaptcha',
r'challenge',
r'hcaptcha',
r'Please verify'
]
html_lower = html.lower()
for pattern in patterns:
if re.search(pattern, html_lower):
return True
return False
@staticmethod
def check_soft_block(html, expected_min_length=1000):
"""Detect soft blocks (empty, redirect, or minimal responses)."""
# Empty response
if not html or len(html) < 100:
return True
# Redirect page (often smaller)
if '<meta http-equiv="refresh"' in html.lower():
return True
# Unexpected content length drop
if len(html) < expected_min_length:
return True
return False
# Integration
async def fetch_with_detection(url):
detector = RateLimitDetector()
# async with aiohttp.ClientSession() as session:
# async with session.get(url) as resp:
# html = await resp.text()
#
# # Check all signals
# if detector.check_status_code(resp.status):
# print(f"Rate limit status: {resp.status}")
# return None
#
# rate_limit_info = detector.check_headers(resp.headers)
# if rate_limit_info:
# print(f"Rate limit headers: {rate_limit_info}")
#
# if detector.check_captcha(html):
# print(f"CAPTCHA detected")
# return None
#
# if detector.check_body(html):
# print(f"Rate limit message in response")
# return None
#
# if detector.check_soft_block(html):
# print(f"Soft block detected (unusual response)")
# return None
#
# return html
Exponential Backoff with Full Jitter
When you hit a rate limit, retry with exponential backoff. But simple exponential backoff is predictable. Use full jitter (AWS-style).
import random
import asyncio
async def exponential_backoff_full_jitter(func, max_retries=5, base_delay=1):
"""
Retry with full jitter exponential backoff.
Formula: sleep_time = random(0, min(CAP, base * 2^attempt))
"""
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
# Full jitter formula
max_delay = base_delay * (2 ** attempt)
sleep_time = random.uniform(0, max_delay)
print(f"Attempt {attempt + 1} failed, retrying in {sleep_time:.2f}s")
await asyncio.sleep(sleep_time)
# Demonstration of backoff timing
def show_backoff_pattern():
print("Full jitter backoff pattern (10 retries):")
times = []
total = 0
for attempt in range(10):
max_delay = 1 * (2 ** attempt)
sleep_time = random.uniform(0, max_delay)
total += sleep_time
times.append(sleep_time)
print(f" Attempt {attempt+1}: {sleep_time:.2f}s (cumulative: {total:.2f}s)")
# show_backoff_pattern()
Respecting robots.txt and Crawl-delay
Most sites publish rate limit guidance in robots.txt. Respect it (both for ethics and because it works).
import urllib.robotparser
import time
class RobotsRespector:
"""Check robots.txt for rate limit guidance."""
def __init__(self):
self.robots = {} # domain -> RobotFileParser
def can_fetch(self, domain, path="/"):
"""Check if we can fetch this URL."""
if domain not in self.robots:
self.robots[domain] = urllib.robotparser.RobotFileParser()
self.robots[domain].set_url(f"https://{domain}/robots.txt")
try:
self.robots[domain].read()
except Exception as e:
print(f"Could not fetch robots.txt for {domain}: {e}")
return True # Assume allowed if we can't check
return self.robots[domain].can_fetch("*", path)
def get_crawl_delay(self, domain):
"""Get Crawl-delay from robots.txt (in seconds)."""
if domain not in self.robots:
self.can_fetch(domain) # Trigger fetch
try:
delay = self.robots[domain].crawl_delay("*")
return delay if delay else None
except:
return None
def get_request_rate(self, domain):
"""Get Request-rate from robots.txt."""
if domain not in self.robots:
self.can_fetch(domain)
try:
rate = self.robots[domain].request_rate("*")
if rate:
return rate.requests / rate.seconds # requests per second
except:
pass
return None
# Usage
respector = RobotsRespector()
# Check if we can fetch
if respector.can_fetch("example.com", "/products"):
# Check recommended delay
crawl_delay = respector.get_crawl_delay("example.com")
if crawl_delay:
print(f"robots.txt recommends {crawl_delay}s between requests")
# time.sleep(crawl_delay)
# Or use request rate
req_rate = respector.get_request_rate("example.com")
if req_rate:
print(f"robots.txt recommends {req_rate} requests/second")
else:
print("robots.txt disallows this path")
Session Simulation and Behavioral Mimicry
Simply respecting rate limits isn't enough. You need to look like a human browser.
import random
import asyncio
import time
class BehavioralSession:
"""Simulate realistic human browsing behavior."""
def __init__(self):
self.session_id = f"sess_{random.randint(100000, 999999)}"
self.visited_pages = []
self.referrer = None
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
]
def get_user_agent(self):
"""Get realistic User-Agent."""
return random.choice(self.user_agents)
def get_headers(self):
"""Build realistic request headers."""
return {
'User-Agent': self.get_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Referer': self.referrer or 'https://www.google.com/',
}
async def simulate_dwell_time(self, page_type='product'):
"""Simulate reading time on page."""
# Distribution of dwell times by page type
dwell_times = {
'product': (5, 60), # 5-60 seconds
'article': (10, 120), # 10-120 seconds
'category': (2, 15), # 2-15 seconds
'search': (3, 20), # 3-20 seconds
}
min_time, max_time = dwell_times.get(page_type, (2, 10))
# Lognormal distribution (realistic)
import numpy as np
dwell = np.random.lognormal(mean=np.log(min_time), sigma=1.0)
dwell = np.clip(dwell, min_time, max_time)
await asyncio.sleep(dwell)
def should_click_random_link(self):
"""Humans occasionally click random things."""
return random.random() < 0.05
def should_return_to_previous(self):
"""Humans sometimes use back button."""
return len(self.visited_pages) > 1 and random.random() < 0.1
def get_next_url(self, current_url, available_links):
"""Decide what to click next (realistic navigation)."""
if self.should_click_random_link():
return random.choice(available_links)
if self.should_return_to_previous():
return self.visited_pages[-2] # Go back one page
return random.choice(available_links)
When to Use Proxies
Rate limiting can be IP-based (blocking entire IP addresses) or session-based (blocking per-session using cookies/tokens). Different solutions apply.
IP-based limits: You're blocked after N requests regardless of session. - Solution: Residential proxies like ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) - Cost: $30-100/month for unlimited rotating IPs - Good for: High-volume scraping where IP rotation is necessary
Session-based limits: You're blocked after N requests per authenticated session. - Solution: Session rotation (get new cookies/tokens) - Cost: Free if you can generate new sessions - Good for: Scraping without authentication
Hybrid limits: Both IP and session-based (most common). - Solution: Combine proxies (ThorData) with session rotation
class ProxyStrategy:
"""Decide when proxies are worth the cost."""
@staticmethod
def needs_proxy(target_site, requests_per_minute):
"""Determine if proxies are necessary."""
# Known IP-aggressive sites
ip_aggressive = {
'amazon.com': 100,
'linkedin.com': 200,
'indeed.com': 150,
'ebay.com': 80,
}
# Check if rate exceeds site's tolerance
for site, max_rpm in ip_aggressive.items():
if site in target_site and requests_per_minute > max_rpm:
return True
# If doing >1000 req/minute, proxies are usually needed
return requests_per_minute > 1000
# Example
if ProxyStrategy.needs_proxy("amazon.com", 500):
print("Use proxies (ThorData: https://thordata.partnerstack.com/partner/0a0x4nzh)")
else:
print("Proxies not necessary, use rate limiting + behavior simulation")
Proxy Rotation with ThorData
ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) provides residential proxies that avoid IP-based blocking.
import aiohttp
class ThorDataProxyRotator:
"""Rotate through ThorData residential proxies."""
def __init__(self, username, password):
"""
Get credentials from ThorData dashboard after signup.
(https://thordata.partnerstack.com/partner/0a0x4nzh)
"""
self.username = username
self.password = password
self.gateway = "proxy.thordata.com:7777"
self.current_proxy = None
def get_proxy_url(self):
"""Generate proxy URL with authentication."""
return f"http://{self.username}:{self.password}@{self.gateway}"
async def fetch_with_proxy(self, url):
"""Fetch URL through rotating proxy."""
proxy_url = self.get_proxy_url()
# async with aiohttp.ClientSession() as session:
# async with session.get(url, proxy=proxy_url) as resp:
# return await resp.text()
# Sticky sessions (rotate proxy, but keep same IP for session)
class StickyProxySession:
"""Use same proxy for multiple requests (session stickiness)."""
def __init__(self, username, password, session_duration=100):
self.username = username
self.password = password
self.gateway = "proxy.thordata.com:7777"
self.session_duration = session_duration
self.request_count = 0
self.current_proxy = None
def get_new_proxy(self):
"""Force new proxy (new session)."""
self.request_count = 0
return f"http://{self.username}:{self.password}@{self.gateway}"
async def fetch(self, url):
"""Fetch with proxy rotation every N requests."""
if self.request_count >= self.session_duration:
self.current_proxy = self.get_new_proxy()
self.request_count = 0
proxy_url = self.current_proxy or self.get_new_proxy()
# async with aiohttp.ClientSession() as session:
# async with session.get(url, proxy=proxy_url) as resp:
# self.request_count += 1
# return await resp.text()
# Cost analysis
print("""
ThorData Proxy Cost Analysis:
- Startup: $0
- Usage: $30/month for residential proxies
- Worth it if: Scraping >100K pages/month from IP-blocking sites
- ROI: Saves weeks of rate limit waiting + avoids manual IP switching
Alternatives:
- Free proxies: Unreliable, frequently blocked, slow
- Datacenter proxies: Cheaper ($5/mo) but easily detected
- Residential: Most expensive but nearly undetectable
""")
Monitoring and Logging
Track your rate limit behavior to improve over time.
import json
import logging
from datetime import datetime
from pathlib import Path
class RateLimitLogger:
"""Log rate limit events for analysis."""
def __init__(self, log_file="rate_limits.jsonl"):
self.log_file = Path(log_file)
self.logger = logging.getLogger("rate_limiter")
def log_event(self, domain, event_type, details):
"""Log rate limit event."""
event = {
'timestamp': datetime.utcnow().isoformat(),
'domain': domain,
'event_type': event_type,
'details': details
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(event) + '\n')
print(f"[{event['timestamp']}] {domain}: {event_type}")
def analyze_logs(self):
"""Analyze logged events."""
if not self.log_file.exists():
return
events = []
with open(self.log_file) as f:
for line in f:
events.append(json.loads(line))
# Group by domain
by_domain = {}
for event in events:
domain = event['domain']
if domain not in by_domain:
by_domain[domain] = {'blocks': 0, 'backoffs': 0, 'success': 0}
if event['event_type'] == 'block':
by_domain[domain]['blocks'] += 1
elif event['event_type'] == 'backoff':
by_domain[domain]['backoffs'] += 1
elif event['event_type'] == 'success':
by_domain[domain]['success'] += 1
print("\n=== Rate Limit Summary ===")
for domain, stats in by_domain.items():
total = stats['blocks'] + stats['backoffs'] + stats['success']
block_rate = stats['blocks'] / total * 100 if total > 0 else 0
print(f"{domain}: {total} requests, {block_rate:.1f}% blocked")
Production RateLimiter Class
A complete, drop-in rate limiter combining all techniques.
import asyncio
import time
import random
from typing import Callable, Any
from enum import Enum
class LimitStrategy(Enum):
TOKEN_BUCKET = "token_bucket"
SLIDING_WINDOW = "sliding_window"
ADAPTIVE = "adaptive"
class ProductionRateLimiter:
"""Production-ready rate limiter with all features."""
def __init__(
self,
strategy: LimitStrategy = LimitStrategy.ADAPTIVE,
requests_per_second: float = 1.0,
max_concurrent: int = 3,
backoff_base: float = 1.0,
enable_proxies: bool = False,
):
self.strategy = strategy
self.requests_per_second = requests_per_second
self.max_concurrent = max_concurrent
self.backoff_base = backoff_base
self.enable_proxies = enable_proxies
# Rate limiting
self.semaphore = asyncio.Semaphore(max_concurrent)
self.last_request = 0
self.lock = asyncio.Lock()
# Adaptive parameters
self.current_delay = 1.0 / requests_per_second
self.min_delay = 0.5 / requests_per_second
self.max_delay = 60.0
# Metrics
self.requests_sent = 0
self.requests_blocked = 0
self.backoff_count = 0
async def wait_for_slot(self):
"""Acquire rate limit slot."""
async with self.semaphore:
async with self.lock:
now = time.time()
time_since_last = now - self.last_request
min_interval = 1.0 / self.requests_per_second
if time_since_last < min_interval:
await asyncio.sleep(min_interval - time_since_last)
self.last_request = time.time()
async def backoff(self, attempt: int):
"""Exponential backoff with full jitter."""
max_delay = self.backoff_base * (2 ** attempt)
wait_time = random.uniform(0, max_delay)
self.backoff_count += 1
print(f"Backoff: attempt {attempt}, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
def record_response(self, status_code: int):
"""Record response for adaptive adjustment."""
if status_code == 429:
self.current_delay = min(self.current_delay * 1.5, self.max_delay)
self.requests_blocked += 1
elif 200 <= status_code < 300:
self.current_delay = max(self.current_delay * 0.98, self.min_delay)
self.requests_sent += 1
def get_stats(self) -> dict:
"""Get rate limiter statistics."""
return {
'requests_sent': self.requests_sent,
'requests_blocked': self.requests_blocked,
'backoff_count': self.backoff_count,
'current_delay': self.current_delay,
'block_rate': self.requests_blocked / self.requests_sent if self.requests_sent > 0 else 0,
}
Integration with Popular Libraries
Scrapy Middleware
from scrapy import signals
class RateLimitMiddleware:
"""Scrapy middleware for rate limiting."""
def __init__(self, crawler):
self.crawler = crawler
self.limiter = ProductionRateLimiter(
requests_per_second=1.0,
max_concurrent=3,
)
# crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
# crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
HTTPX Integration
import httpx
class RateLimitedClient:
"""HTTPX client with built-in rate limiting."""
def __init__(self, requests_per_second=1.0):
self.limiter = ProductionRateLimiter(requests_per_second=requests_per_second)
self.client = httpx.AsyncClient()
async def get(self, url, **kwargs):
await self.limiter.wait_for_slot()
return await self.client.get(url, **kwargs)
async def close(self):
await self.client.aclose()
Real-World Case Studies
Amazon Product Scraping
Amazon is IP-aggressive and uses sophisticated bot detection. Without proper strategy, you'll be blocked in minutes.
Challenge: - Blocks after ~20 requests from single IP - Requires User-Agent rotation - Inspects Referer headers for navigation consistency - May serve CAPTCHAs
Solution: - Use residential proxy rotation (ThorData: https://thordata.partnerstack.com/partner/0a0x4nzh) - Rotate every 10-50 requests to different IP - Simulate realistic navigation (click categories, read reviews) - Use 2-5 second delays between requests - Respect robots.txt (Crawl-delay: 1)
LinkedIn Public Profiles
LinkedIn combines IP-based and behavior-based detection. IP blocks happen at ~50-100 profile views per day from datacenter IPs.
Strategy: - Use residential proxies with geographic rotation (ThorData: https://thordata.partnerstack.com/partner/0a0x4nzh) - 10-30 second delays between requests - Simulate page reading and profile interactions - Vary User-Agent and request pattern timing
News Site Aggregation
Most news sites are cooperative and allow scraping if you respect rate limits.
Strategy: - Check robots.txt (many publish Crawl-delay: 1-2 seconds) - Simple 1-2 second delays between requests - Authentic User-Agent - Session simulation not required
Production Deployment Checklist
Before deploying any scraper:
- [ ] Rate limiter configured for target site
- [ ] robots.txt checked for Crawl-delay directives
- [ ] Proxies set up if doing >500 req/day (consider ThorData: https://thordata.partnerstack.com/partner/0a0x4nzh)
- [ ] User-Agent rotation enabled
- [ ] Referer headers match navigation paths
- [ ] Error handling configured
- [ ] Logging for rate limit events
- [ ] Session simulation verified
- [ ] Proxy rotation strategy tested
- [ ] Backoff logic verified
- [ ] Data quality spot-checked
- [ ] Resource limits configured
Summary
Rate limiting is the difference between a scraper that works and one that gets blocked. The complete picture requires:
- Understanding targets: Check robots.txt, analyze response patterns, detect rate limit signals
- Matching behavior: Use realistic delay distributions, vary timing, simulate navigation
- Using right algorithms: Token bucket, sliding window, or adaptive based on needs
- Strategic proxies: Use ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) when IP-blocking is the bottleneck
- Constant monitoring: Log events, analyze patterns, adjust strategies
The techniques in this guide work because they respect the constraints sites have set while moving as fast as possible within those bounds. Deploy them and you'll successfully scrape at scale.