Web Scraping Without Getting Blocked: A 2026 Practical Guide
Web Scraping Without Getting Blocked: A 2026 Practical Guide
There is a war being fought on the internet that most people never see. On one side: websites spending real money on sophisticated anti-bot infrastructure designed to distinguish automated traffic from human browsing. On the other: scrapers trying to collect data that is, in most cases, publicly visible to any human with a browser.
The anti-bot industry has grown dramatically in the last few years. Cloudflare, DataDome, PerimeterX, Akamai, Kasada — these services collectively protect billions of page views per day, and their detection methods have gotten substantially more sophisticated. Modern bot detection doesn't just look at your IP address and User-Agent string. It analyzes TLS fingerprints, JavaScript execution environments, mouse movement entropy, typing cadence, scroll patterns, WebGL renderer capabilities, installed fonts, and hundreds of other signals that together form a behavioral fingerprint.
That's the bad news. The good news is that most websites don't deploy the sophisticated stuff.
The economics matter here: enterprise anti-bot systems cost thousands of dollars per month. Cloudflare Bot Management, DataDome, and similar services are priced for large enterprises, not mid-sized e-commerce sites or content publishers. The vast majority of sites that scraper developers actually want to scrape are protected by much simpler measures — basic rate limiting, User-Agent checks, and maybe a simple JavaScript challenge.
This means the question isn't "how do I defeat state-of-the-art bot detection?" It's "how do I understand which layer is blocking me and apply the minimum countermeasure needed?"
This guide explains each layer of anti-bot protection in detail, with complete Python code for each countermeasure. It covers the full stack from basic header manipulation up through full browser automation and behavioral simulation, proxy rotation with residential IP services, CAPTCHA handling strategies, and retry logic that distinguishes between recoverable and fatal blocks. You'll also get a decision framework for picking the right tool for each job — because using Playwright on a site that only checks headers wastes time and resources, and using requests on a site with JavaScript fingerprinting guarantees failure.
By the end, you'll be able to look at any block and diagnose exactly which layer triggered it, what the fix is, and how to verify it worked.
The Four Layers of Anti-Bot Defense
Think of anti-bot protection as a stack. Each layer catches a different type of scraper, and each requires a different countermeasure. The key insight: you only need to pass the layers that are actually present. Most sites don't deploy all four.
Layer 1: IP Rate Limiting
What it does: Counts requests per IP address per time window. Too many requests too fast triggers a block — either a 429 response with a Retry-After header, a temporary IP ban, or a silent redirect to an error page.
Who it catches: Scripts that blast requests in tight loops with no throttling.
Detection signals: HTTP 429 responses, sudden drop from 200 to 403 after N requests, Retry-After headers.
The fix: Throttle requests and rotate source IPs.
import time
import random
import requests
from dataclasses import dataclass
@dataclass
class ThrottleConfig:
min_delay: float = 1.0
max_delay: float = 3.0
burst_size: int = 10 # Requests before mandatory longer pause
burst_pause_min: float = 5.0
burst_pause_max: float = 15.0
def throttled_fetch(
urls: list[str],
session: requests.Session,
config: ThrottleConfig = ThrottleConfig()
) -> list[tuple[str, str | None]]:
results = []
for i, url in enumerate(urls):
# Mandatory longer pause after each burst
if i > 0 and i % config.burst_size == 0:
pause = random.uniform(config.burst_pause_min, config.burst_pause_max)
print(f"Burst pause: {pause:.1f}s after {i} requests")
time.sleep(pause)
elif i > 0:
time.sleep(random.uniform(config.min_delay, config.max_delay))
try:
resp = session.get(url, timeout=30)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
print(f"Rate limited — waiting {retry_after}s")
time.sleep(retry_after + random.uniform(1, 5))
resp = session.get(url, timeout=30) # One retry
results.append((url, resp.text if resp.status_code == 200 else None))
except requests.RequestException as e:
results.append((url, None))
return results
For IP rotation, residential proxy services handle the heavy lifting. With ThorData, each request automatically exits through a different residential IP:
import requests
def make_thordata_session(username: str, password: str, country: str = "US") -> requests.Session:
"""Create a session that routes through ThorData residential proxies."""
session = requests.Session()
# ThorData rotates IPs automatically — each request gets a fresh IP
proxy_url = f"http://{username}:{password}@gate.thordata.com:7777"
# For sticky sessions (same IP for multiple requests), use session endpoint
# proxy_url = f"http://{username}-session-{random.randint(1,10000)}:{password}@gate.thordata.com:7777"
session.proxies = {
"http": proxy_url,
"https": proxy_url,
}
return session
# Basic usage
session = make_thordata_session("your_username", "your_password", country="US")
resp = session.get("https://example.com/products")
Layer 2: Header and User-Agent Checks
What it does: Inspects HTTP request headers. Python's requests library sends python-requests/2.31.0 as the User-Agent by default. Every anti-bot system in existence knows that string means a script.
Who it catches: Scripts that don't set browser-like headers.
Detection signals: Immediate 403 on first request, blocks correlating with requests from the same User-Agent.
The fix: Send a complete, consistent, realistic browser header set.
The important word is complete. Setting only User-Agent while omitting Accept-Language, Accept-Encoding, and Sec-Fetch-* headers is still a strong bot signal — real browsers send all of these on every request. Inconsistent headers (a Chrome User-Agent with Firefox's Accept header) are worse than honest bot headers.
import random
from typing import NamedTuple
class BrowserProfile(NamedTuple):
user_agent: str
accept: str
accept_language: str
sec_ch_ua: str
sec_ch_ua_mobile: str
sec_ch_ua_platform: str
# Curated profiles reflecting 2026 browser market share
BROWSER_PROFILES = [
BrowserProfile(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
accept="text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
accept_language="en-US,en;q=0.9",
sec_ch_ua='"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
sec_ch_ua_mobile="?0",
sec_ch_ua_platform='"Windows"',
),
BrowserProfile(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
accept="text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
accept_language="en-US,en;q=0.9",
sec_ch_ua='"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
sec_ch_ua_mobile="?0",
sec_ch_ua_platform='"macOS"',
),
BrowserProfile(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 14_7_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1 Safari/605.1.15",
accept="text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
accept_language="en-US,en;q=0.9",
sec_ch_ua='"Safari";v="18", "Not_A Brand";v="8"',
sec_ch_ua_mobile="?0",
sec_ch_ua_platform='"macOS"',
),
BrowserProfile(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0",
accept="text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
accept_language="en-US,en;q=0.5",
sec_ch_ua="",
sec_ch_ua_mobile="",
sec_ch_ua_platform="",
),
]
def get_browser_headers() -> dict[str, str]:
profile = random.choice(BROWSER_PROFILES)
headers = {
"User-Agent": profile.user_agent,
"Accept": profile.accept,
"Accept-Language": profile.accept_language,
"Accept-Encoding": "gzip, deflate, br, zstd",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
}
if profile.sec_ch_ua:
headers["Sec-Ch-Ua"] = profile.sec_ch_ua
headers["Sec-Ch-Ua-Mobile"] = profile.sec_ch_ua_mobile
headers["Sec-Ch-Ua-Platform"] = profile.sec_ch_ua_platform
return headers
Layer 3: TLS and Browser Fingerprinting
What it does: Advanced systems analyze signals that are hard to fake with plain HTTP clients:
- TLS fingerprint (JA3/JA4): The exact cipher suites and extensions in your TLS handshake form a fingerprint. Python's
requests(usingurllib3and OpenSSL) has a different TLS fingerprint than Chrome. Cloudflare Bot Management checks this. - JavaScript environment probing: Challenges run JavaScript to check for browser APIs, canvas rendering, WebGL, AudioContext, installed fonts, navigator properties, and timing behavior. A headless Chrome instance is missing some APIs and has subtle differences in others.
- HTTP/2 fingerprinting: Browsers make HTTP/2 requests with specific header ordering and settings frames. Python's
requestsuses HTTP/1.1 by default, which is an immediate bot signal on sites that expect HTTP/2.
Who it catches: Scrapers using plain requests, even with perfect headers.
The fix for TLS: Use httpx with http2=True, or use curl_cffi to impersonate a real browser's TLS fingerprint.
# Option A: httpx with HTTP/2 (fixes HTTP/2 fingerprint)
import httpx
async def fetch_with_http2(url: str) -> str:
async with httpx.AsyncClient(
http2=True,
headers=get_browser_headers(),
follow_redirects=True,
timeout=30.0,
) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.text
# Option B: curl_cffi — impersonates Chrome's exact TLS + HTTP/2 fingerprint
# pip install curl-cffi
from curl_cffi import requests as curl_requests
def fetch_with_chrome_fingerprint(url: str) -> str:
resp = curl_requests.get(
url,
impersonate="chrome124", # Exact Chrome 124 TLS + HTTP/2 fingerprint
timeout=30,
)
resp.raise_for_status()
return resp.text
The fix for JavaScript challenges: Use a real browser via Playwright with stealth settings.
from playwright.async_api import async_playwright, BrowserContext
import asyncio
import random
async def create_stealth_browser_context(playwright) -> tuple:
"""Create a browser context with anti-detection settings."""
browser = await playwright.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--window-size=1920,1080",
"--disable-dev-shm-usage",
"--no-sandbox",
"--disable-setuid-sandbox",
],
)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
locale="en-US",
timezone_id="America/New_York",
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
extra_http_headers={
"Accept-Language": "en-US,en;q=0.9",
},
)
# Patch navigator.webdriver (the most checked automation flag)
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
configurable: true
});
// Spoof plugins (headless Chrome has no plugins)
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
configurable: true
});
// Spoof languages
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en'],
configurable: true
});
// Remove Chrome automation flags
window.chrome = { runtime: {} };
// Fix permission query behavior
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
return browser, context
async def scrape_with_playwright(url: str) -> str:
async with async_playwright() as p:
browser, context = await create_stealth_browser_context(p)
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=30000)
return await page.content()
finally:
await browser.close()
Layer 4: Behavioral Analysis
What it does: Tracks interaction patterns across a session. Real users scroll, pause, move the mouse non-linearly, click at slightly off-center positions, spend variable time reading content, and navigate from page to page with human-scale timing gaps. Bots navigate in perfectly mechanical patterns — constant delays, no mouse movement, instant navigation.
Advanced systems build a behavioral fingerprint of each visitor over their session and score it against baseline human behavior.
Who it catches: Automated browsers that load pages and extract data without any simulation.
The fix: Add realistic human-like behavioral simulation.
import asyncio
import random
import math
async def human_scroll(page, total_distance: int = None):
"""Simulate realistic human scrolling behavior."""
viewport = await page.evaluate("() => ({ height: window.innerHeight, scrollHeight: document.body.scrollHeight })")
max_scroll = viewport["scrollHeight"] - viewport["height"]
if total_distance is None:
# Scroll between 30% and 90% of page
total_distance = int(max_scroll * random.uniform(0.3, 0.9))
scrolled = 0
while scrolled < total_distance:
# Variable scroll chunks (humans scroll in uneven amounts)
chunk = random.randint(80, 350)
await page.evaluate(f"window.scrollBy(0, {chunk})")
scrolled += chunk
# Random pauses — humans stop to read
if random.random() < 0.3:
await asyncio.sleep(random.uniform(0.8, 3.0))
else:
await asyncio.sleep(random.uniform(0.05, 0.2))
# Occasionally scroll back up a bit (reading behavior)
if random.random() < 0.1:
back = random.randint(30, 120)
await page.evaluate(f"window.scrollBy(0, -{back})")
await asyncio.sleep(random.uniform(0.2, 0.8))
async def human_mouse_move(page, target_x: int, target_y: int):
"""Move mouse to target using a bezier curve path (not straight line)."""
# Get current position (default to somewhere in the viewport)
current = await page.evaluate("() => ({ x: window.lastMouseX || 400, y: window.lastMouseY || 300 })")
# Generate intermediate points along a slightly curved path
steps = random.randint(15, 35)
for i in range(steps):
t = i / steps
# Quadratic bezier with random control point
ctrl_x = (current["x"] + target_x) / 2 + random.randint(-50, 50)
ctrl_y = (current["y"] + target_y) / 2 + random.randint(-30, 30)
x = (1-t)**2 * current["x"] + 2*(1-t)*t * ctrl_x + t**2 * target_x
y = (1-t)**2 * current["y"] + 2*(1-t)*t * ctrl_y + t**2 * target_y
await page.mouse.move(int(x), int(y))
await asyncio.sleep(random.uniform(0.01, 0.04))
async def realistic_page_visit(page, url: str) -> str:
"""Visit a page the way a human would."""
await page.goto(url, wait_until="domcontentloaded")
# Brief pause after page loads (reading title, getting oriented)
await asyncio.sleep(random.uniform(0.5, 2.0))
# Move mouse to a random position
await human_mouse_move(page, random.randint(200, 1200), random.randint(100, 500))
# Scroll through the page
await human_scroll(page)
# Wait before extracting (don't extract immediately after loading)
await asyncio.sleep(random.uniform(1.0, 3.0))
return await page.content()
Anti-Detection: Complete Integration
Here's a complete scraper that combines all four countermeasures into a single class:
import asyncio
import random
import time
import logging
from playwright.async_api import async_playwright
import httpx
import requests
logger = logging.getLogger(__name__)
class StealthScraper:
"""
Multi-layer anti-detection scraper.
Automatically selects the minimum required detection bypass.
"""
def __init__(
self,
proxy_url: str | None = None,
use_browser: bool = False,
min_delay: float = 1.5,
max_delay: float = 4.0,
):
self.proxy_url = proxy_url
self.use_browser = use_browser
self.min_delay = min_delay
self.max_delay = max_delay
self._last_request_time = 0.0
def _wait(self):
"""Enforce minimum delay between requests."""
elapsed = time.time() - self._last_request_time
min_wait = random.uniform(self.min_delay, self.max_delay)
if elapsed < min_wait:
time.sleep(min_wait - elapsed)
self._last_request_time = time.time()
def fetch(self, url: str) -> str | None:
"""Fetch a URL using plain requests (best for Layer 1-2 sites)."""
self._wait()
session = requests.Session()
session.headers.update(get_browser_headers())
if self.proxy_url:
session.proxies = {"http": self.proxy_url, "https": self.proxy_url}
for attempt in range(3):
try:
resp = session.get(url, timeout=30)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
logger.warning(f"Rate limited on {url}, waiting {retry_after}s")
time.sleep(retry_after)
continue
if resp.status_code == 403:
logger.warning(f"Blocked on {url} (HTTP 403)")
return None
resp.raise_for_status()
if "captcha" in resp.text.lower():
logger.warning(f"CAPTCHA on {url}")
return None
return resp.text
except requests.RequestException as e:
logger.error(f"Request error on {url}: {e}")
if attempt < 2:
time.sleep(random.uniform(3, 10))
return None
async def fetch_browser(self, url: str) -> str | None:
"""Fetch using Playwright with full stealth (for Layer 3-4 sites)."""
async with async_playwright() as p:
launch_args = [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
]
if self.proxy_url:
launch_args.append(f"--proxy-server={self.proxy_url}")
browser = await p.chromium.launch(headless=True, args=launch_args)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
locale="en-US",
timezone_id="America/New_York",
user_agent=get_browser_headers()["User-Agent"],
)
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
window.chrome = { runtime: {} };
""")
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=45000)
await asyncio.sleep(random.uniform(1, 3))
await human_scroll(page)
return await page.content()
except Exception as e:
logger.error(f"Browser fetch error on {url}: {e}")
return None
finally:
await browser.close()
CAPTCHA Handling
CAPTCHAs are the last line of defense. When all other layers fail, the site falls back to requiring human interaction. You have four options:
1. Avoid them entirely — if CAPTCHAs only appear after aggressive scraping, slow down. Most sites only CAPTCHA-gate users who have triggered rate limits or behavioral flags.
2. Third-party CAPTCHA solving — services like 2captcha, Anti-Captcha, and CapSolver use human workers or ML models to solve CAPTCHAs for a small per-solve fee (typically $1-3 per thousand solves).
import requests
import time
def solve_recaptcha_v2(site_key: str, page_url: str, api_key: str) -> str | None:
"""Solve reCAPTCHA v2 using 2captcha API."""
# Submit the CAPTCHA
submit_resp = requests.post("http://2captcha.com/in.php", data={
"key": api_key,
"method": "userrecaptcha",
"googlekey": site_key,
"pageurl": page_url,
"json": 1,
})
result = submit_resp.json()
if result["status"] != 1:
raise RuntimeError(f"CAPTCHA submission failed: {result}")
task_id = result["request"]
# Poll for the solution (usually takes 20-40 seconds)
for _ in range(20):
time.sleep(5)
check_resp = requests.get(
f"http://2captcha.com/res.php?key={api_key}&action=get&id={task_id}&json=1"
)
check_result = check_resp.json()
if check_result["status"] == 1:
return check_result["request"] # The g-recaptcha-response token
if check_result["request"] != "CAPCHA_NOT_READY":
raise RuntimeError(f"CAPTCHA solving failed: {check_result}")
return None # Timed out
# Usage with Playwright
async def submit_form_with_captcha(page, form_url: str, captcha_api_key: str):
await page.goto(form_url)
# Find the site key from the page
site_key = await page.evaluate("""
() => document.querySelector('[data-sitekey]')?.getAttribute('data-sitekey')
""")
if site_key:
token = solve_recaptcha_v2(site_key, form_url, captcha_api_key)
if token:
# Inject the solved token
await page.evaluate(f"""
document.getElementById('g-recaptcha-response').value = '{token}';
___grecaptcha_cfg.clients[0].aa.l.callback('{token}');
""")
3. Cloudflare bypass libraries — cloudscraper handles many Cloudflare JS challenges automatically, though it needs regular updates as Cloudflare evolves.
4. Accept the limitation — if a page is CAPTCHA-gated on every request, scraping it at scale isn't feasible without paying for solve services.
Proxy Rotation at Scale
For large scraping operations, the proxy setup matters as much as the anti-detection logic. Here's a production-grade proxy rotation implementation:
import requests
import random
import time
import threading
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class ProxyStats:
successes: int = 0
failures: int = 0
last_used: float = 0.0
blocked_until: float = 0.0
@property
def success_rate(self) -> float:
total = self.successes + self.failures
return self.successes / total if total > 0 else 1.0
@property
def is_blocked(self) -> bool:
return time.time() < self.blocked_until
class ProxyRotator:
"""
Manages a pool of proxies with health tracking and automatic rotation.
Works with ThorData and other proxy services.
"""
def __init__(
self,
proxy_urls: list[str],
strategy: str = "round_robin", # "round_robin", "random", "least_used"
min_success_rate: float = 0.5,
block_duration: float = 300.0, # seconds to block a failing proxy
):
self.proxies = proxy_urls
self.strategy = strategy
self.min_success_rate = min_success_rate
self.block_duration = block_duration
self.stats: dict[str, ProxyStats] = {p: ProxyStats() for p in proxy_urls}
self._index = 0
self._lock = threading.Lock()
def get_proxy(self) -> str | None:
"""Get the next proxy according to the rotation strategy."""
with self._lock:
available = [
p for p in self.proxies
if not self.stats[p].is_blocked
and self.stats[p].success_rate >= self.min_success_rate
]
if not available:
# All proxies blocked/failed — reset the worst ones
self._reset_worst_proxies()
available = self.proxies
if self.strategy == "round_robin":
proxy = available[self._index % len(available)]
self._index += 1
elif self.strategy == "random":
proxy = random.choice(available)
elif self.strategy == "least_used":
proxy = min(available, key=lambda p: self.stats[p].last_used)
else:
proxy = available[0]
self.stats[proxy].last_used = time.time()
return proxy
def report_success(self, proxy_url: str):
self.stats[proxy_url].successes += 1
def report_failure(self, proxy_url: str, is_blocked: bool = False):
self.stats[proxy_url].failures += 1
if is_blocked:
self.stats[proxy_url].blocked_until = time.time() + self.block_duration
def _reset_worst_proxies(self):
"""Unblock the 20% with the shortest block duration remaining."""
sorted_proxies = sorted(
self.proxies,
key=lambda p: self.stats[p].blocked_until
)
reset_count = max(1, len(self.proxies) // 5)
for proxy in sorted_proxies[:reset_count]:
self.stats[proxy].blocked_until = 0
def fetch(self, url: str, session_factory: Callable = None) -> requests.Response | None:
proxy = self.get_proxy()
session = (session_factory() if session_factory else requests.Session())
session.headers.update(get_browser_headers())
if proxy:
session.proxies = {"http": proxy, "https": proxy}
try:
resp = session.get(url, timeout=30)
if resp.status_code in (403, 429):
self.report_failure(proxy, is_blocked=True)
return None
self.report_success(proxy)
return resp
except requests.RequestException:
self.report_failure(proxy)
return None
# ThorData one-liner integration
def thordata_rotator(username: str, password: str) -> ProxyRotator:
"""
ThorData automatically handles rotation on their end —
each request via the same endpoint gets a fresh IP.
Use multiple session tokens for parallel workers.
"""
# One proxy URL per parallel worker (each with unique session token)
proxy_urls = [
f"http://{username}-session-{i}:{password}@gate.thordata.com:7777"
for i in range(10)
]
return ProxyRotator(proxy_urls, strategy="round_robin")
Rate Limiting and Delay Strategies
Beyond basic sleeps, sophisticated rate limiting adapts to server responses:
import time
import random
from collections import deque
class AdaptiveRateLimiter:
"""
Adaptive rate limiter that backs off on errors and speeds up on success.
"""
def __init__(
self,
initial_rps: float = 0.5, # requests per second
min_rps: float = 0.1,
max_rps: float = 2.0,
backoff_factor: float = 0.5, # multiply RPS by this on failure
recovery_factor: float = 1.1, # multiply RPS by this on success
):
self.current_rps = initial_rps
self.min_rps = min_rps
self.max_rps = max_rps
self.backoff_factor = backoff_factor
self.recovery_factor = recovery_factor
self._last_request_time = 0.0
self._recent_results = deque(maxlen=20)
def wait(self):
"""Wait the appropriate amount of time before the next request."""
delay = 1.0 / self.current_rps
# Add ±20% jitter to avoid rhythmic patterns
delay *= random.uniform(0.8, 1.2)
elapsed = time.time() - self._last_request_time
if elapsed < delay:
time.sleep(delay - elapsed)
self._last_request_time = time.time()
def report_success(self):
self._recent_results.append(True)
# Gradually increase rate on sustained success
if len(self._recent_results) >= 10 and all(self._recent_results):
self.current_rps = min(self.max_rps, self.current_rps * self.recovery_factor)
def report_block(self):
self._recent_results.append(False)
# Immediately back off on block
self.current_rps = max(self.min_rps, self.current_rps * self.backoff_factor)
# Add extra delay after a block
time.sleep(random.uniform(5, 15))
def report_rate_limit(self, retry_after: int = 60):
self._recent_results.append(False)
self.current_rps = max(self.min_rps, self.current_rps * self.backoff_factor)
time.sleep(retry_after + random.uniform(1, 5))
# Usage
limiter = AdaptiveRateLimiter(initial_rps=0.5)
for url in urls:
limiter.wait()
resp = session.get(url)
if resp.status_code == 200:
limiter.report_success()
elif resp.status_code == 429:
limiter.report_rate_limit(int(resp.headers.get("Retry-After", 60)))
elif resp.status_code == 403:
limiter.report_block()
Real-World Use Cases
1. E-commerce Price Intelligence
import sqlite3
from datetime import datetime
def build_price_tracker(product_urls: list[str], db_path: str = "prices.db"):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS price_history (
url TEXT, price REAL, currency TEXT, in_stock INTEGER,
scraped_at TEXT, source_domain TEXT
)
""")
scraper = StealthScraper(proxy_url="http://user:[email protected]:7777")
limiter = AdaptiveRateLimiter(initial_rps=0.3)
for url in product_urls:
limiter.wait()
html = scraper.fetch(url)
if not html:
continue
from bs4 import BeautifulSoup
import re
from urllib.parse import urlparse
soup = BeautifulSoup(html, "lxml")
price_el = soup.select_one("[itemprop='price'], span.price, .price-box")
if price_el:
price_str = price_el.get("content") or price_el.get_text(strip=True)
price_num = float(re.sub(r"[^\d.]", "", price_str) or 0)
conn.execute(
"INSERT INTO price_history VALUES (?,?,?,?,?,?)",
(url, price_num, "USD", 1, datetime.now().isoformat(), urlparse(url).netloc)
)
conn.commit()
limiter.report_success()
2. News Article Aggregator
import httpx
import asyncio
from bs4 import BeautifulSoup
async def aggregate_news(rss_urls: list[str]) -> list[dict]:
"""Fetch full article text from RSS feed URLs."""
articles = []
async with httpx.AsyncClient(
http2=True,
headers=get_browser_headers(),
follow_redirects=True,
timeout=20.0,
) as client:
async def fetch_article(url: str) -> dict | None:
await asyncio.sleep(random.uniform(0.5, 2.0))
try:
resp = await client.get(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "lxml")
# Try structured data first
import json
for script in soup.find_all("script", type="application/ld+json"):
try:
data = json.loads(script.string or "")
if data.get("@type") == "NewsArticle":
return {
"url": url,
"headline": data.get("headline"),
"author": data.get("author", {}).get("name"),
"published": data.get("datePublished"),
"body": data.get("articleBody", "")[:2000],
}
except:
pass
# Fallback: extract main content
for sel in ["article", "main", ".article-content", "#article-body"]:
content_el = soup.select_one(sel)
if content_el:
return {
"url": url,
"headline": (soup.find("h1") or BeautifulSoup("", "lxml")).get_text(strip=True),
"body": content_el.get_text(separator="\n", strip=True)[:2000],
}
return None
except Exception as e:
return None
tasks = [fetch_article(url) for url in rss_urls]
results = await asyncio.gather(*tasks)
articles = [r for r in results if r]
return articles
3. Social Media Public Data
# Public profile scraping (no authentication, public data only)
async def scrape_public_profile(username: str) -> dict | None:
async with async_playwright() as p:
browser, context = await create_stealth_browser_context(p)
page = await context.new_page()
try:
# Navigate naturally through the site
await page.goto("https://example.com", wait_until="networkidle")
await asyncio.sleep(random.uniform(1, 3))
# Search for the user
await human_mouse_move(page, 400, 100)
await page.fill("[name='q']", username)
await asyncio.sleep(random.uniform(0.5, 1.5))
await page.press("[name='q']", "Enter")
await page.wait_for_load_state("networkidle")
# Navigate to profile
profile_link = page.locator(f"a[href*='/{username}']").first
await human_mouse_move(page, *await profile_link.bounding_box().values())
await profile_link.click()
await page.wait_for_load_state("networkidle")
await human_scroll(page)
content = await page.content()
return {"username": username, "html": content}
finally:
await browser.close()
Decision Framework
Before building your scraper, use this framework to pick the minimum required approach:
| Signal | Layer | Best Tool |
|---|---|---|
| 429 Too Many Requests | IP rate limiting | requests + delays + ThorData proxy rotation |
| Immediate 403 Forbidden | Header/UA check | requests with proper headers |
| JS challenge / CAPTCHA | Browser fingerprinting | curl_cffi or Playwright |
| Blocks after browsing normally | Behavioral analysis | Playwright + behavioral simulation |
| Empty HTML / no content | JS-rendered page | Playwright (not anti-bot, different problem) |
| Cookie-based blocks | Session/cookie tracking | requests.Session() with cookies |
| Geographic blocks | IP geolocation | ThorData geo-targeted proxies |
Start simple. Upgrade only when blocked.
Most scraping jobs never need to go past Layer 2. Before reaching for Playwright, verify the site actually uses JavaScript fingerprinting — load the page in a browser with JavaScript disabled (F12 → Settings → Disable JavaScript). If the content loads, you don't need a browser.
The Ethical Framework
None of these techniques change the ethical and legal context. Rate limits exist because unlimited scraping can cause real harm to small sites — bandwidth costs money, server load affects real users. robots.txt is a convention, not a technical barrier, but ignoring it for sites that explicitly prohibit scraping creates legal exposure in many jurisdictions.
Practically speaking: respect robots.txt, stay under 1 request per second on sites without explicit guidance, don't scrape personal data without a legal basis, and check the site's terms of service before building anything at scale.
Smart scraping is sustainable scraping. The goal is to collect the data you need without breaking the things you depend on.