Python httpx for Web Scraping: Async, Retries, Proxies, and Anti-Detection (2026 Guide)
Python httpx for Web Scraping: Async, Retries, Proxies, and Anti-Detection (2026 Guide)
If you're still using requests for scraping in 2026, you're leaving both performance and reliability on the table. The requests library was designed for synchronous single-threaded HTTP — a model that made sense when web applications were simpler and scraping meant fetching a handful of pages. Modern scraping workloads are fundamentally different: you're often fetching thousands of URLs concurrently against sites running sophisticated anti-bot systems, and the tool you choose for your HTTP layer matters enormously.
httpx is the Python HTTP library that matches the current reality. It provides native async/await support without requiring a second library, HTTP/2 multiplexing that dramatically reduces connection overhead, fine-grained timeout control that lets you distinguish between slow connection establishment and slow data transfer, and an API surface close enough to requests that migration is mostly mechanical. Beyond the performance angle, httpx also gives you cleaner code: the AsyncClient context manager handles connection pooling and cleanup automatically, retry logic composes naturally with tenacity, and proxy configuration is first-class rather than an afterthought.
This guide goes beyond the basics. We cover the full production stack: browser header spoofing to avoid trivial blocks, TLS fingerprint awareness, proxy rotation with ThorData's residential network, rate limiting that mimics human behavior, CAPTCHA detection and handling, comprehensive retry logic with circuit breakers, session management for authenticated scraping, and seven complete use cases with working code you can drop into a real project.
The goal is not to teach you the httpx API — the official docs handle that well. The goal is to show you how to build an httpx-based scraper that actually works against sites that don't want to be scraped.
Why httpx Over requests in 2026
The performance case for httpx is straightforward. Consider a task that requires fetching 500 product pages. With requests in a single thread, each request blocks until complete — if each takes an average of 1.5 seconds, you're looking at 12+ minutes. With httpx.AsyncClient and a semaphore limiting you to 20 concurrent requests, that same workload completes in roughly 45-60 seconds. For anything beyond toy scripts, the difference is an order of magnitude.
Beyond raw throughput, httpx gives you:
HTTP/2 multiplexing — Real browsers use HTTP/2 and multiplex multiple requests over a single TCP connection. Some anti-bot systems fingerprint the HTTP version; sending HTTP/1.1 requests when the site serves HTTP/2 can be a detection signal.
Granular timeout control — requests has a single timeout parameter. httpx separates connect, read, write, and pool timeouts. This matters: a 30-second read timeout that also applies to connection establishment means a dead host blocks your thread for 30 seconds before failing. With httpx, you set a 5-second connect timeout and a 30-second read timeout, so dead hosts fail fast.
Better proxy support — httpx handles SOCKS5 proxies natively (with httpx[socks]), supports per-request proxy override via the proxy parameter, and integrates cleanly with residential proxy services.
Type annotations throughout — httpx is fully typed, which means IDE autocomplete works properly and type checkers catch configuration mistakes at development time rather than runtime.
Install the full package:
pip install httpx[http2,socks] tenacity
Browser Header Spoofing: The Baseline Defense
The first line of defense against being detected as a bot is sending HTTP headers that match what a real browser sends. A bare httpx/0.28 User-Agent in the request headers is immediately identifiable as a scraper. So is an outdated Chrome UA, or a UA paired with headers that don't match (e.g., claiming to be Firefox but sending sec-ch-ua headers that only Chrome sends).
A complete browser header set in 2026 looks like this:
import httpx
# Chrome 134 on Windows — the most common browser/OS combination
CHROME_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Cache-Control": "max-age=0",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"sec-ch-ua": '"Chromium";v="134", "Google Chrome";v="134", "Not:A-Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"DNT": "1",
"Connection": "keep-alive",
}
# For AJAX/API requests (XHR), headers differ slightly
CHROME_XHR_HEADERS = {
"User-Agent": CHROME_HEADERS["User-Agent"],
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"sec-ch-ua": CHROME_HEADERS["sec-ch-ua"],
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"X-Requested-With": "XMLHttpRequest",
}
# macOS Safari headers for diversity
SAFARI_HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3.1 Safari/605.1.15",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Connection": "keep-alive",
}
BROWSER_PROFILES = [CHROME_HEADERS, CHROME_HEADERS, CHROME_HEADERS, SAFARI_HEADERS] # Weight toward Chrome
def get_browser_headers() -> dict:
"""Return a random but consistent browser header set."""
import random
return dict(random.choice(BROWSER_PROFILES))
A few important rules about headers:
Consistency matters more than randomness. Real browsers send the same User-Agent for the entire session. Anti-bot systems flag clients that change their UA between requests to the same domain. Pick a profile and keep it for the session.
Header ordering matters. Some anti-bot systems (especially Akamai and Cloudflare) check the order of HTTP headers, not just their values. httpx allows you to control header ordering by using an httpx.Headers object or an OrderedDict. Chrome sends headers in a specific order — if your order differs, it's a fingerprint signal.
Missing headers are also a signal. A real Chrome browser always sends sec-ch-ua headers on secure requests. If those headers are absent, it's a signal that the client is not Chrome, regardless of the User-Agent.
TLS Fingerprinting and curl_cffi
The dirty secret of httpx-based scraping against heavily protected sites is that httpx cannot fully bypass TLS fingerprinting. Tools like Akamai Bot Manager, Cloudflare, and PerimeterX analyze the TLS ClientHello message — the cipher suite ordering, TLS extension ordering, and supported groups — to fingerprint the HTTP client independent of any headers you set. Python's ssl library (which httpx uses under the hood) has a distinct TLS fingerprint that differs from Chrome's.
For most sites, httpx with good headers is sufficient. For heavily protected sites (major e-commerce, financial data, large social platforms), you may need curl_cffi instead — a Python binding to curl-impersonate that produces Chrome-identical TLS and HTTP/2 fingerprints.
# Install: pip install curl_cffi
from curl_cffi import requests as curl_requests
import httpx
def create_client(target_site: str, proxy_url: str = None) -> any:
"""
Return the appropriate client based on target site protection level.
Prefer httpx for speed; fall back to curl_cffi for heavily protected targets.
"""
heavily_protected = [
"aliexpress.com", "amazon.com", "redfin.com",
"linkedin.com", "instagram.com", "ticketmaster.com",
]
needs_impersonation = any(domain in target_site for domain in heavily_protected)
if needs_impersonation:
session = curl_requests.Session(impersonate="chrome134")
if proxy_url:
session.proxies = {"http": proxy_url, "https": proxy_url}
return session, "curl_cffi"
else:
client = httpx.Client(
headers=get_browser_headers(),
http2=True,
follow_redirects=True,
timeout=httpx.Timeout(connect=8.0, read=30.0, write=10.0, pool=10.0),
)
if proxy_url:
client = httpx.Client(
headers=get_browser_headers(),
http2=True,
proxy=proxy_url,
follow_redirects=True,
timeout=httpx.Timeout(connect=8.0, read=30.0, write=10.0, pool=10.0),
)
return client, "httpx"
Retry Logic with Exponential Backoff
httpx has no built-in retry mechanism — by design, since retry logic is highly application-specific. The tenacity library provides the cleanest composition:
import httpx
import time
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
retry_if_result,
before_sleep_log,
)
import logging
logger = logging.getLogger(__name__)
def is_retryable_status(response) -> bool:
"""Retry on 429 (rate limited), 503 (service unavailable), and 502/504 (gateway errors)."""
return hasattr(response, "status_code") and response.status_code in {429, 500, 502, 503, 504}
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=3, max=60),
retry=(
retry_if_exception_type((httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError))
| retry_if_result(is_retryable_status)
),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
def fetch_with_retry(client: httpx.Client, url: str, **kwargs) -> httpx.Response:
"""
Fetch a URL with automatic retry for network errors and server-side failures.
Backoff schedule: 3s, 6s, 12s, 24s, 48s (capped at 60s).
"""
response = client.get(url, **kwargs)
# Check for soft blocks disguised as 200s
if response.status_code == 200:
content_type = response.headers.get("content-type", "")
if "text/html" in content_type:
body_preview = response.text[:2000].lower()
soft_block_signals = [
"access denied",
"enable javascript",
"please verify",
"unusual traffic",
"captcha",
]
if any(signal in body_preview for signal in soft_block_signals):
# Raise to trigger retry
raise httpx.HTTPStatusError(
f"Soft block detected",
request=response.request,
response=response,
)
response.raise_for_status()
return response
async def async_fetch_with_retry(
client: httpx.AsyncClient,
url: str,
max_retries: int = 4,
**kwargs,
) -> httpx.Response:
"""
Async version of fetch_with_retry with manual backoff loop.
Use this with AsyncClient for concurrent scraping.
"""
last_exception = None
for attempt in range(max_retries):
try:
response = await client.get(url, **kwargs)
if response.status_code in {429, 503}:
retry_after = int(response.headers.get("Retry-After", 0))
wait_time = max(retry_after, 2 ** attempt * 3)
logger.warning(f"Rate limited on {url}, waiting {wait_time}s")
await asyncio.sleep(wait_time)
continue
response.raise_for_status()
return response
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError) as e:
last_exception = e
wait_time = 2 ** attempt * 3
logger.warning(f"Network error on {url} (attempt {attempt + 1}/{max_retries}): {e}. Retry in {wait_time}s")
await asyncio.sleep(wait_time)
except httpx.HTTPStatusError as e:
if e.response.status_code in {400, 401, 403, 404, 410}:
raise # Don't retry client errors
last_exception = e
wait_time = 2 ** attempt * 3
await asyncio.sleep(wait_time)
raise last_exception or httpx.TimeoutException(f"Max retries reached for {url}")
Timeout Configuration: Every Parameter Matters
httpx's granular timeout control is one of its most practically valuable features. Setting a single timeout value (as requests forces you to do) creates a tradeoff: too short and you miss slow but legitimate responses, too long and dead hosts block your workers for minutes.
import httpx
# Conservative settings for scraping unknown sites
CONSERVATIVE_TIMEOUT = httpx.Timeout(
connect=8.0, # DNS + TCP handshake. If this fails, the host is unreachable.
read=45.0, # Time to receive response. Some slow sites need 30-40s.
write=15.0, # Time to send request body (relevant for POSTs with large data).
pool=15.0, # Time to wait for a connection from the pool (relevant under load).
)
# Aggressive settings for fast internal APIs or well-known sites
FAST_TIMEOUT = httpx.Timeout(connect=3.0, read=15.0, write=5.0, pool=5.0)
# For streaming responses (large file downloads, event streams)
STREAM_TIMEOUT = httpx.Timeout(connect=8.0, read=None, write=15.0, pool=15.0)
def get_timeout_for_url(url: str) -> httpx.Timeout:
"""Choose timeout profile based on target URL characteristics."""
if "api." in url or "/api/" in url or ".json" in url:
return FAST_TIMEOUT
if "download" in url or "export" in url or ".csv" in url:
return STREAM_TIMEOUT
return CONSERVATIVE_TIMEOUT
# Usage
with httpx.Client(timeout=CONSERVATIVE_TIMEOUT, headers=get_browser_headers()) as client:
response = client.get("https://example.com")
print(f"Status: {response.status_code}, Size: {len(response.content)} bytes")
Proxy Rotation with ThorData Residential Proxies
Datacenter IPs (from AWS, GCP, DigitalOcean, Hetzner) are blocked outright by most major scraping targets. They appear in public IP reputation databases, and anti-bot systems like Akamai, Cloudflare, and DataDome have them pre-blocked. For these targets, residential proxies — IPs originating from real ISP-assigned home connections — are required.
ThorData provides rotating residential proxies with country and city-level targeting. Their rotating gateway assigns a fresh IP from the residential pool on each connection (or per session if you use sticky sessions), which means you don't have to manage IP lists yourself.
import httpx
import random
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ThorDataProxy:
"""ThorData residential proxy configuration."""
username: str # Your ThorData username
password: str # Your ThorData password
gateway: str = "rotating.thordata.net"
port: int = 9080
country: str = "US"
city: Optional[str] = None
session_id: Optional[str] = None # For sticky sessions
def to_url(self) -> str:
"""Build proxy URL with targeting parameters encoded in username."""
parts = [self.username]
if self.country:
parts.append(f"country-{self.country}")
if self.city:
parts.append(f"city-{self.city.lower().replace(' ', '_')}")
if self.session_id:
parts.append(f"session-{self.session_id}")
encoded_user = "-".join(parts)
return f"http://{encoded_user}:{self.password}@{self.gateway}:{self.port}"
class ResidentialProxyPool:
"""
Manages a ThorData proxy pool with country rotation,
failure tracking, and session management.
"""
COUNTRIES = ["US", "GB", "CA", "AU", "DE", "FR", "NL", "SE"]
def __init__(self, username: str, password: str):
self.username = username
self.password = password
self._request_count = 0
self._session_start = time.time()
self._current_country_idx = 0
self._failures: dict[str, int] = {}
self._last_used: dict[str, float] = {}
def get_proxy(self, preferred_country: Optional[str] = None) -> ThorDataProxy:
"""Get a proxy, rotating country after every 50 requests."""
self._request_count += 1
if preferred_country:
country = preferred_country
else:
# Rotate country every 50 requests
if self._request_count % 50 == 0:
self._current_country_idx = (self._current_country_idx + 1) % len(self.COUNTRIES)
country = self.COUNTRIES[self._current_country_idx]
return ThorDataProxy(
username=self.username,
password=self.password,
country=country,
)
def get_sticky_proxy(self, session_key: str, country: str = "US") -> ThorDataProxy:
"""
Get a proxy that maintains the same IP for a given session key.
Use this for scraping sites that require session continuity (login flows, etc.)
"""
return ThorDataProxy(
username=self.username,
password=self.password,
country=country,
session_id=session_key,
)
def create_client(self, proxy: ThorDataProxy, **client_kwargs) -> httpx.Client:
"""Create an httpx client configured with the given proxy."""
return httpx.Client(
proxy=proxy.to_url(),
headers=get_browser_headers(),
http2=True,
follow_redirects=True,
timeout=CONSERVATIVE_TIMEOUT,
**client_kwargs,
)
async def create_async_client(self, proxy: ThorDataProxy, **client_kwargs) -> httpx.AsyncClient:
"""Create an async httpx client configured with the given proxy."""
return httpx.AsyncClient(
proxy=proxy.to_url(),
headers=get_browser_headers(),
http2=True,
follow_redirects=True,
timeout=CONSERVATIVE_TIMEOUT,
**client_kwargs,
)
# Initialize the pool (replace with real credentials from ThorData dashboard)
# Sign up at https://thordata.partnerstack.com/partner/0a0x4nzh
proxy_pool = ResidentialProxyPool(
username="your_thordata_username",
password="your_thordata_password",
)
Async Concurrency with Rate Limiting
The real power of httpx comes from AsyncClient. Here is a production-grade concurrent scraper with rate limiting, proxy rotation, and error handling:
import asyncio
import httpx
import time
import json
import random
from typing import Optional
class RateLimiter:
"""
Token bucket rate limiter for controlling request frequency.
Supports per-domain limits for scrapers targeting multiple sites.
"""
def __init__(self, requests_per_second: float = 2.0):
self.requests_per_second = requests_per_second
self.min_interval = 1.0 / requests_per_second
self._last_request: dict[str, float] = {}
self._lock = asyncio.Lock()
async def acquire(self, domain: str = "default"):
"""Wait until it's safe to make the next request to this domain."""
async with self._lock:
now = time.monotonic()
last = self._last_request.get(domain, 0)
elapsed = now - last
base_wait = max(0, self.min_interval - elapsed)
# Add ±25% jitter to avoid regular patterns
jitter = base_wait * 0.25 * (random.random() * 2 - 1)
wait = max(0, base_wait + jitter)
if wait > 0:
await asyncio.sleep(wait)
self._last_request[domain] = time.monotonic()
async def scrape_urls_concurrent(
urls: list[str],
max_concurrent: int = 5,
requests_per_second: float = 3.0,
proxy_username: Optional[str] = None,
proxy_password: Optional[str] = None,
) -> list[dict]:
"""
Scrape a list of URLs concurrently with rate limiting and proxy rotation.
Args:
urls: List of URLs to scrape
max_concurrent: Maximum simultaneous in-flight requests
requests_per_second: Target request rate (with jitter applied)
proxy_username: ThorData username (None for no proxy)
proxy_password: ThorData password
Returns:
List of result dicts with 'url', 'status', 'content', 'error' keys
"""
semaphore = asyncio.Semaphore(max_concurrent)
rate_limiter = RateLimiter(requests_per_second=requests_per_second)
results = []
pool = None
if proxy_username and proxy_password:
pool = ResidentialProxyPool(proxy_username, proxy_password)
async def fetch_one(url: str) -> dict:
async with semaphore:
# Extract domain for per-domain rate limiting
from urllib.parse import urlparse
domain = urlparse(url).netloc
await rate_limiter.acquire(domain)
proxy = pool.get_proxy() if pool else None
proxy_url = proxy.to_url() if proxy else None
async with httpx.AsyncClient(
proxy=proxy_url,
headers=get_browser_headers(),
http2=True,
follow_redirects=True,
timeout=CONSERVATIVE_TIMEOUT,
) as client:
try:
response = await async_fetch_with_retry(client, url)
return {
"url": url,
"status": response.status_code,
"content": response.text,
"headers": dict(response.headers),
"error": None,
}
except Exception as e:
return {
"url": url,
"status": None,
"content": None,
"headers": {},
"error": str(e),
}
tasks = [fetch_one(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=False)
return list(results)
# Example usage
async def main_example():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/headers",
"https://httpbin.org/user-agent",
"https://httpbin.org/ip",
"https://example.com",
]
results = await scrape_urls_concurrent(
urls=urls,
max_concurrent=3,
requests_per_second=2.0,
)
for r in results:
if r["error"]:
print(f"ERROR {r['url']}: {r['error']}")
else:
print(f"OK {r['url']}: {r['status']}, {len(r['content'])} chars")
asyncio.run(main_example())
Session and Cookie Management
Many scraping targets require you to maintain session state — either because they require login, or because they set tracking cookies on first visit that are expected on subsequent requests. httpx handles this automatically when you reuse a Client instance.
import httpx
import json
def create_authenticated_session(
login_url: str,
credentials: dict,
headers: Optional[dict] = None,
) -> httpx.Client:
"""
Create an httpx Client with an authenticated session.
Returns the client with cookies already set from the login response.
"""
client = httpx.Client(
headers=headers or get_browser_headers(),
follow_redirects=True,
timeout=CONSERVATIVE_TIMEOUT,
http2=True,
)
# Some sites set tracking cookies on the initial page load
# Visit homepage first to collect those cookies
homepage = login_url.split("/login")[0] if "/login" in login_url else login_url
try:
client.get(homepage)
time.sleep(random.uniform(1.0, 3.0)) # Brief pause like a real user
except Exception:
pass
# Perform login
response = client.post(login_url, data=credentials)
response.raise_for_status()
# Verify login succeeded
if "logout" not in response.text.lower() and response.status_code not in {200, 302}:
raise ValueError(f"Login may have failed: status {response.status_code}")
return client # Client now holds the authenticated session cookies
def scrape_with_session(
authenticated_client: httpx.Client,
urls: list[str],
delay_range: tuple = (1.0, 3.0),
) -> list[dict]:
"""
Scrape a list of URLs using an existing authenticated session.
The client maintains cookies across all requests automatically.
"""
results = []
for url in urls:
try:
response = fetch_with_retry(authenticated_client, url)
results.append({
"url": url,
"status": response.status_code,
"content": response.text,
})
except Exception as e:
results.append({"url": url, "error": str(e)})
time.sleep(random.uniform(*delay_range))
return results
# Pre-loading cookies from a browser session
def load_cookies_from_browser_export(cookie_json_path: str) -> dict:
"""
Load cookies exported from a browser (e.g., via EditThisCookie Chrome extension).
Returns a dict suitable for httpx cookies parameter.
"""
with open(cookie_json_path) as f:
cookie_list = json.load(f)
return {
cookie["name"]: cookie["value"]
for cookie in cookie_list
if not cookie.get("expired", False)
}
# Usage with pre-loaded cookies
cookies = load_cookies_from_browser_export("exported_cookies.json")
client = httpx.Client(
headers=get_browser_headers(),
cookies=cookies,
http2=True,
)
Use Case 1: E-Commerce Price Monitoring
Monitor prices across multiple e-commerce sites and alert when prices drop below thresholds.
import httpx
import json
import sqlite3
import time
import re
from bs4 import BeautifulSoup
from datetime import datetime
def setup_price_db(db_path: str = "prices.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS price_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
product_name TEXT,
price_raw TEXT,
price_numeric REAL,
currency TEXT,
in_stock INTEGER,
scraped_at TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_url ON price_records(url)")
conn.commit()
return conn
def extract_price(text: str) -> tuple[float, str]:
"""Extract numeric price and currency from text like '$29.99' or '€ 24,99'."""
text = text.strip()
currency_map = {"$": "USD", "€": "EUR", "£": "GBP", "¥": "JPY", "₹": "INR"}
currency = "USD" # default
for symbol, code in currency_map.items():
if symbol in text:
currency = code
break
# Extract first number (handles comma-separated thousands)
match = re.search(r"[\d,]+\.?\d*", text.replace(",", ""))
price = float(match.group()) if match else 0.0
return price, currency
async def monitor_product_prices(
products: list[dict],
db_path: str = "prices.db",
alert_threshold_pct: float = 0.10, # Alert if price drops >10%
) -> list[dict]:
"""
Monitor prices for a list of products.
products format: [{"name": str, "url": str, "price_selector": str, "stock_selector": str}]
"""
conn = setup_price_db(db_path)
alerts = []
async with httpx.AsyncClient(
headers=get_browser_headers(),
http2=True,
follow_redirects=True,
timeout=CONSERVATIVE_TIMEOUT,
) as client:
for product in products:
try:
response = await async_fetch_with_retry(client, product["url"])
soup = BeautifulSoup(response.text, "html.parser")
# Extract price
price_el = soup.select_one(product.get("price_selector", ".price"))
price_text = price_el.get_text(strip=True) if price_el else ""
price_numeric, currency = extract_price(price_text)
# Check stock
stock_el = soup.select_one(product.get("stock_selector", ".stock, .availability"))
in_stock = 1
if stock_el:
stock_text = stock_el.get_text(strip=True).lower()
in_stock = 0 if any(w in stock_text for w in ["out of stock", "unavailable", "sold out"]) else 1
# Save to DB
conn.execute("""
INSERT INTO price_records (url, product_name, price_raw, price_numeric, currency, in_stock, scraped_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (product["url"], product["name"], price_text, price_numeric, currency, in_stock, datetime.now().isoformat()))
conn.commit()
# Check for price drop vs previous record
prev = conn.execute("""
SELECT price_numeric FROM price_records
WHERE url = ? ORDER BY id DESC LIMIT 1 OFFSET 1
""", (product["url"],)).fetchone()
if prev and prev[0] > 0 and price_numeric > 0:
drop_pct = (prev[0] - price_numeric) / prev[0]
if drop_pct >= alert_threshold_pct:
alerts.append({
"product": product["name"],
"url": product["url"],
"old_price": prev[0],
"new_price": price_numeric,
"drop_pct": round(drop_pct * 100, 1),
"currency": currency,
})
print(f"ALERT: {product['name']} dropped {drop_pct*100:.1f}%: {prev[0]} -> {price_numeric} {currency}")
except Exception as e:
print(f"Error monitoring {product['url']}: {e}")
await asyncio.sleep(random.uniform(2.0, 5.0))
conn.close()
return alerts
Use Case 2: News Aggregator with Full-Text Extraction
Scrape articles from multiple news sources and extract clean full-text content.
import httpx
import asyncio
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
async def extract_article(client: httpx.AsyncClient, url: str) -> dict:
"""Extract article content from a news page."""
try:
response = await async_fetch_with_retry(client, url)
soup = BeautifulSoup(response.text, "html.parser")
# Remove navigation, ads, sidebars
for tag in soup.select("nav, aside, .ad, .advertisement, .sidebar, footer, header, script, style"):
tag.decompose()
# Try common article content selectors
content_selectors = [
"article",
'[class*="article-body"]',
'[class*="story-body"]',
'[class*="post-content"]',
".content",
"main",
]
content_el = None
for selector in content_selectors:
content_el = soup.select_one(selector)
if content_el and len(content_el.get_text(strip=True)) > 200:
break
if not content_el:
content_el = soup.body
# Extract metadata
title = ""
title_el = soup.select_one("h1, .headline, .article-title")
if title_el:
title = title_el.get_text(strip=True)
# Try OpenGraph/meta tags for publication date
published_date = ""
date_meta = soup.find("meta", {"property": "article:published_time"}) or \
soup.find("meta", {"name": "publish-date"})
if date_meta:
published_date = date_meta.get("content", "")
# Get clean text
paragraphs = content_el.find_all("p") if content_el else []
full_text = "\n\n".join(p.get_text(strip=True) for p in paragraphs if len(p.get_text(strip=True)) > 30)
return {
"url": url,
"title": title,
"published_date": published_date,
"full_text": full_text,
"word_count": len(full_text.split()),
"domain": urlparse(url).netloc,
}
except Exception as e:
return {"url": url, "error": str(e)}
async def aggregate_news(
rss_urls: list[str],
max_articles_per_feed: int = 20,
) -> list[dict]:
"""
Fetch RSS feeds and extract full article content for each story.
"""
import xml.etree.ElementTree as ET
articles = []
async with httpx.AsyncClient(
headers=get_browser_headers(),
http2=True,
follow_redirects=True,
timeout=CONSERVATIVE_TIMEOUT,
) as client:
# Fetch RSS feeds
all_article_urls = []
for rss_url in rss_urls:
try:
response = await client.get(rss_url)
root = ET.fromstring(response.text)
# Extract article URLs from RSS items
for item in root.findall(".//item")[:max_articles_per_feed]:
link_el = item.find("link")
if link_el is not None and link_el.text:
all_article_urls.append(link_el.text.strip())
except Exception as e:
print(f"Error fetching RSS {rss_url}: {e}")
print(f"Extracting {len(all_article_urls)} articles...")
# Extract articles with concurrency limit
semaphore = asyncio.Semaphore(5)
rate_limiter = RateLimiter(requests_per_second=3.0)
async def fetch_article(url: str) -> dict:
async with semaphore:
domain = urlparse(url).netloc
await rate_limiter.acquire(domain)
return await extract_article(client, url)
tasks = [fetch_article(url) for url in all_article_urls]
articles = await asyncio.gather(*tasks)
# Filter out failures and short articles
return [a for a in articles if a.get("word_count", 0) > 100]
Use Case 3: API Data Collection with Pagination
Many sites expose internal JSON APIs. httpx with async makes paginated API collection fast and clean.
import httpx
import asyncio
import json
from typing import AsyncIterator
async def paginated_api_fetch(
base_url: str,
params: dict,
page_key: str = "page",
total_key: str = "total",
items_key: str = "items",
max_pages: int = 100,
headers: Optional[dict] = None,
) -> AsyncIterator[list]:
"""
Async generator that yields pages of results from a paginated JSON API.
Usage:
async for page in paginated_api_fetch(url, params):
for item in page:
process(item)
"""
async with httpx.AsyncClient(
headers=headers or CHROME_XHR_HEADERS,
http2=True,
follow_redirects=True,
timeout=FAST_TIMEOUT,
) as client:
page_num = 1
total_fetched = 0
while page_num <= max_pages:
request_params = {**params, page_key: page_num}
try:
response = await async_fetch_with_retry(client, base_url, params=request_params)
data = response.json()
items = data.get(items_key, [])
if not items:
break
yield items
total_fetched += len(items)
# Check if we've gotten all results
total = data.get(total_key, 0)
if total > 0 and total_fetched >= total:
break
page_num += 1
await asyncio.sleep(random.uniform(0.5, 1.5))
except Exception as e:
print(f"Error fetching page {page_num}: {e}")
break
# Collect all results from a paginated endpoint
async def collect_all(base_url: str, params: dict) -> list:
all_items = []
async for page in paginated_api_fetch(base_url, params):
all_items.extend(page)
print(f"Collected {len(all_items)} items so far...")
return all_items
Use Case 4: Structured Data Extraction Pipeline
A general-purpose scraper that extracts structured data from pages using CSS selectors, with schema validation.
import httpx
import asyncio
from bs4 import BeautifulSoup
from dataclasses import dataclass, field, asdict
from typing import Optional
import json
@dataclass
class ExtractionSchema:
"""Define what to extract from a page using CSS selectors."""
name: str
url_pattern: str
fields: dict # field_name -> selector string or callable
list_selector: Optional[str] = None # For pages with repeating items
@dataclass
class ExtractedRecord:
"""A single record extracted from a page."""
url: str
schema_name: str
data: dict
errors: list = field(default_factory=list)
scraped_at: float = field(default_factory=time.time)
def extract_from_page(html: str, schema: ExtractionSchema) -> list[dict]:
"""Apply an extraction schema to HTML content."""
soup = BeautifulSoup(html, "html.parser")
records = []
if schema.list_selector:
# Extract multiple records from a listing page
containers = soup.select(schema.list_selector)
for container in containers:
record = {}
for field_name, selector in schema.fields.items():
if callable(selector):
record[field_name] = selector(container)
else:
el = container.select_one(selector)
record[field_name] = el.get_text(strip=True) if el else ""
records.append(record)
else:
# Extract a single record from a detail page
record = {}
for field_name, selector in schema.fields.items():
if callable(selector):
record[field_name] = selector(soup)
else:
el = soup.select_one(selector)
record[field_name] = el.get_text(strip=True) if el else ""
records.append(record)
return records
# Example: define a schema for a job listings site
JOB_LISTINGS_SCHEMA = ExtractionSchema(
name="job_listings",
url_pattern="https://jobs.example.com/search*",
list_selector=".job-card, .listing-item",
fields={
"title": "h2, .job-title",
"company": ".company-name",
"location": ".job-location",
"salary": ".salary, .compensation",
"posted_date": ".post-date, time",
"job_url": lambda el: (el.select_one("a") or {}).get("href", ""),
},
)
async def run_extraction_pipeline(
urls: list[str],
schema: ExtractionSchema,
output_file: str = "extracted_data.jsonl",
) -> int:
"""Run the extraction pipeline and write results to a JSONL file."""
count = 0
with open(output_file, "a") as f:
async for result in scrape_and_extract(urls, schema):
if result.data:
f.write(json.dumps(asdict(result)) + "\n")
f.flush()
count += len(result.data) if isinstance(result.data, list) else 1
return count
async def scrape_and_extract(urls: list[str], schema: ExtractionSchema):
"""Async generator that scrapes URLs and yields ExtractedRecord objects."""
semaphore = asyncio.Semaphore(5)
async with httpx.AsyncClient(
headers=get_browser_headers(),
http2=True,
follow_redirects=True,
timeout=CONSERVATIVE_TIMEOUT,
) as client:
async def process_url(url: str) -> ExtractedRecord:
async with semaphore:
try:
response = await async_fetch_with_retry(client, url)
records = extract_from_page(response.text, schema)
return ExtractedRecord(url=url, schema_name=schema.name, data=records)
except Exception as e:
return ExtractedRecord(url=url, schema_name=schema.name, data=[], errors=[str(e)])
tasks = [process_url(url) for url in urls]
for coro in asyncio.as_completed(tasks):
yield await coro
Output Schema
A typical scraping result record from this pipeline:
{
"url": "https://example.com/product/123",
"status": 200,
"content_length": 45231,
"headers": {
"content-type": "text/html; charset=utf-8",
"server": "nginx",
"x-cache": "HIT"
},
"extracted": {
"title": "Product Name Here",
"price": "$29.99",
"in_stock": true,
"description": "Full product description...",
"rating": "4.5",
"review_count": "234"
},
"proxy_used": "US-residential",
"scraped_at": 1743436800.0,
"duration_ms": 1247,
"retries": 0,
"error": null
}
CAPTCHA Detection and Handling
CAPTCHA encounters are inevitable at scale. The key is detecting them quickly and deciding whether to solve or skip.
import httpx
from typing import Optional
CAPTCHA_SIGNALS = [
# Text signals in page body
("text", "solve the captcha"),
("text", "prove you're human"),
("text", "verify you are human"),
("text", "unusual traffic"),
("text", "automated requests"),
("text", "bot verification"),
# URL-based signals
("url", "captcha"),
("url", "challenge"),
("url", "verify"),
# Status codes
("status", 403),
("status", 429),
("status", 503),
]
def detect_captcha(response: httpx.Response) -> tuple[bool, str]:
"""
Detect if a response is a CAPTCHA challenge.
Returns (is_captcha, reason).
"""
# Check status code
if response.status_code in {403, 429, 503}:
return True, f"HTTP {response.status_code}"
# Check response URL (may have been redirected to challenge page)
response_url = str(response.url).lower()
url_signals = ["captcha", "/challenge", "/verify", "/blocked"]
for signal in url_signals:
if signal in response_url:
return True, f"Redirect to {signal}"
# Check page content (only for HTML responses)
content_type = response.headers.get("content-type", "")
if "text/html" in content_type:
preview = response.text[:3000].lower()
text_signals = [
("captcha", "captcha form"),
("cf-challenge", "Cloudflare challenge"),
("datadome", "DataDome challenge"),
("px-captcha", "PerimeterX challenge"),
("please enable js", "JS requirement"),
("robot", "robot check"),
]
for signal, reason in text_signals:
if signal in preview:
return True, reason
return False, ""
async def fetch_with_captcha_handling(
client: httpx.AsyncClient,
url: str,
proxy_pool: Optional[ResidentialProxyPool] = None,
max_attempts: int = 3,
) -> Optional[httpx.Response]:
"""
Fetch a URL with CAPTCHA detection. On CAPTCHA, rotate proxy and retry.
Returns None if all attempts hit CAPTCHAs.
"""
for attempt in range(max_attempts):
# Rotate proxy on retry
if attempt > 0 and proxy_pool:
new_proxy = proxy_pool.get_proxy()
print(f"Rotating proxy to {new_proxy.country} for attempt {attempt + 1}")
try:
response = await client.get(url)
is_captcha, reason = detect_captcha(response)
if is_captcha:
print(f"CAPTCHA detected ({reason}) on attempt {attempt + 1}/{max_attempts}: {url}")
# Exponential backoff before retry
await asyncio.sleep(2 ** attempt * 5)
continue
return response
except Exception as e:
print(f"Error on attempt {attempt + 1}: {e}")
await asyncio.sleep(2 ** attempt * 3)
print(f"All {max_attempts} attempts failed for {url}")
return None
Wrapping Up
httpx is the right foundation for Python scraping in 2026. The core principle is layering defenses: start with proper browser headers and HTTP/2, add granular timeouts, layer in retry logic with tenacity, add residential proxies for protected targets, and implement rate limiting that produces human-like request patterns.
The progression for a new scraping project: start with bare httpx and good headers, verify you can fetch the target pages, add retry logic, then add proxies only if you hit IP-based blocks. Don't over-engineer upfront — most sites don't require residential proxies, and most data doesn't require JavaScript execution. Reach for Playwright only when the data genuinely requires JS rendering. Use curl_cffi only when TLS fingerprinting is the blocking issue. Keep the stack as simple as the target site allows, and add complexity only when a specific block forces it.