Python asyncio for Concurrent Web Scraping: A Practical Guide (2026)
If you've ever scraped 20 URLs one at a time and watched each request wait politely for the last one to finish, you already know the problem. Web scraping is IO-bound. Your CPU sits idle while packets travel across the internet. Python's asyncio lets you fire off multiple requests concurrently so that waiting time overlaps instead of stacking up.
This guide covers everything you need to build a concurrent scraper: event loop fundamentals, asyncio.gather(), semaphores for polite concurrency, TaskGroup for structured error handling, retries, proxy integration, and a full production-grade scraping framework.
Table of Contents
- Why Async Wins at IO-Bound Work
- asyncio Fundamentals You Actually Need
- Controlling Concurrency with Semaphore
- gather() vs TaskGroup
- Error Handling and Retries
- Full Working Example with Measurements
- httpx vs aiohttp: Choosing Your Async HTTP Library
- Rotating Proxies for Concurrent Scraping
- Per-Domain Rate Limiting
- Structured Async Pipeline
- Monitoring and Progress Tracking
- Debugging Async Scrapers
- Benchmarks: Sync vs Async at Different Concurrency Levels
- Production-Ready Scraper Class
- Common Pitfalls and How to Avoid Them
Why Async Wins at IO-Bound Work {#why-async}
Synchronous scraping is sequential. Each request blocks until the response arrives:
import httpx
import time
urls = [f"https://httpbin.org/delay/1" for _ in range(20)]
start = time.perf_counter()
with httpx.Client() as client:
results = [client.get(url) for url in urls]
elapsed = time.perf_counter() - start
print(f"Sync: {elapsed:.1f}s") # ~20s for 20 URLs at 1s each
Each 1-second request runs back-to-back. 20 URLs = 20+ seconds. The CPU is idle 95% of the time.
With asyncio, those same requests overlap:
import asyncio
import httpx
import time
async def fetch(client, url):
resp = await client.get(url)
return resp.status_code
async def main():
urls = [f"https://httpbin.org/delay/1" for _ in range(20)]
start = time.perf_counter()
async with httpx.AsyncClient() as client:
tasks = [fetch(client, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Async: {elapsed:.1f}s") # ~1.5s for 20 URLs
asyncio.run(main())
That's roughly a 13x speedup for 20 URLs. While one response is in flight, another request fires. The event loop keeps the CPU busy scheduling and handling callbacks instead of just waiting.
The speedup scales predictably: - 2 URLs: ~2x faster (overlap two waits into one) - 10 URLs: ~10x faster - 100 URLs: ~50-80x faster (real gains plateau due to overhead)
The bottleneck shifts from "waiting for network" to "how many concurrent connections the target server allows before rate-limiting you" — which is why semaphores matter.
asyncio Fundamentals You Actually Need {#fundamentals}
You don't need to understand every corner of asyncio to write effective scrapers. Here's the subset that matters:
Coroutines
A function defined with async def is a coroutine function. Calling it returns a coroutine object — it doesn't execute yet. The await keyword runs it and suspends the current coroutine until it completes.
async def fetch_one(url):
# This suspends here while waiting for the HTTP response
resp = await client.get(url)
return resp.status_code
# This just creates a coroutine object — no code runs yet
coro = fetch_one("https://example.com")
# This runs it
result = await coro
Tasks
A Task wraps a coroutine and schedules it to run concurrently. Unlike await, creating a task doesn't pause your current function.
async def main():
# These run concurrently — both are scheduled immediately
task1 = asyncio.create_task(fetch_one("https://a.com"))
task2 = asyncio.create_task(fetch_one("https://b.com"))
# Wait for both to finish
result1 = await task1
result2 = await task2
The Event Loop
asyncio.run(main()) creates an event loop, runs your coroutine, and closes the loop when done. You almost never need to interact with the event loop directly in modern Python.
# The modern way (Python 3.7+)
asyncio.run(main())
# Never do this in modern code:
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
await Points
Concurrency only happens at await points. If your coroutine does CPU-heavy work between await calls, it blocks the event loop for everyone else.
async def bad_scraper(url):
resp = await client.get(url)
# WARNING: This blocks the event loop for ALL other coroutines
result = heavy_cpu_computation(resp.text) # bad!
return result
async def good_scraper(url):
resp = await client.get(url)
# Light processing between awaits is fine
soup = BeautifulSoup(resp.text, "lxml") # fast enough
return soup.find("h1").text
For genuinely CPU-heavy work (image processing, ML inference), use loop.run_in_executor() to offload to a thread pool.
Controlling Concurrency with Semaphore {#semaphore}
Firing 200 requests at once is not the right approach. You'll overwhelm the target server, trigger rate limits, get your IP banned, and potentially cause real harm to small sites.
asyncio.Semaphore is a counter that limits how many coroutines can be inside a critical section simultaneously:
import asyncio
import httpx
async def fetch(client, url, semaphore):
async with semaphore: # blocks here if all slots are taken
resp = await client.get(url)
return resp.status_code
async def main():
semaphore = asyncio.Semaphore(10) # max 10 concurrent requests
urls = [f"https://httpbin.org/get?n={i}" for i in range(50)]
async with httpx.AsyncClient(timeout=30) as client:
tasks = [fetch(client, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs with max 10 concurrent")
asyncio.run(main())
The semaphore acts as a gate. Only 10 coroutines can execute client.get() simultaneously. The rest wait. As each request completes and exits the async with semaphore block, another queued coroutine proceeds.
Choosing the Right Concurrency Level
| Target | Safe Concurrency | Notes |
|---|---|---|
| Small personal blogs | 1-2 | Be extremely polite |
| Mid-size news sites | 3-5 | Safe for most cases |
| Large platforms (GitHub, Reddit, Amazon) | 5-15 | They can handle it, but watch for 429s |
| APIs with published rate limits | Follow the limit | e.g., 10 req/s = max 10 concurrent |
| With rotating residential proxies | 10-30 | Distribution hides the load |
Start conservative (5) and increase until you see 429 responses, then back off by 50%.
Per-IP Semaphore with Proxy Rotation
If you're rotating proxies, you can use a higher total concurrency while keeping individual IP load low:
async def fetch_with_semaphore_per_proxy(client, url, proxy_semaphores, proxy_url):
"""Use a separate semaphore per proxy to limit per-IP concurrency."""
if proxy_url not in proxy_semaphores:
proxy_semaphores[proxy_url] = asyncio.Semaphore(3) # max 3 per proxy
async with proxy_semaphores[proxy_url]:
resp = await client.get(url)
return resp.status_code
gather() vs TaskGroup {#gather-vs-taskgroup}
These are the two main ways to run multiple coroutines concurrently. They have different error-handling semantics.
asyncio.gather()
The classic approach. Creates tasks for all coroutines and waits for all of them.
results = await asyncio.gather(*coroutines)
Key behavior: If one task raises an exception:
- Default: the exception propagates immediately, but other tasks keep running in the background (hidden!)
- With return_exceptions=True: exceptions are returned as values, not raised. You check for them manually.
For scraping, return_exceptions=True is almost always what you want:
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
failures.append((url, str(result)))
else:
successes.append(result)
print(f"{len(successes)} succeeded, {len(failures)} failed")
asyncio.TaskGroup (Python 3.11+)
Structured concurrency. If any task fails, all remaining tasks are cancelled:
async def main():
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch(client, url))
for url in urls
]
# If you get here, ALL tasks succeeded
results = [t.result() for t in tasks]
If any task raises an exception, the TaskGroup cancels all other tasks and raises an ExceptionGroup.
When to use TaskGroup for scraping: - When the batch is an all-or-nothing operation (e.g., fetching all pages of a paginated API) - When you need clean cancellation on failure
When to use gather(return_exceptions=True) for scraping: - When partial results are acceptable and expected (most real-world scraping) - When you're scraping a list of URLs where some might fail (404, 500, etc.)
# Pattern I use for most scraping jobs
async def scrape_batch(urls, client, semaphore):
tasks = [fetch(client, url, semaphore) for url in urls]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
return [
{"url": url, "result": result}
if not isinstance(result, Exception)
else {"url": url, "error": str(result)}
for url, result in zip(urls, raw_results)
]
Error Handling and Retries {#error-handling}
Network errors are inevitable at scale. Your scraper needs to handle them gracefully.
Exponential Backoff Retry
import asyncio
import httpx
import random
import logging
logger = logging.getLogger(__name__)
async def fetch_with_retry(
client: httpx.AsyncClient,
url: str,
semaphore: asyncio.Semaphore,
max_retries: int = 5,
base_delay: float = 1.0,
) -> dict:
"""
Fetch a URL with exponential backoff retry.
Returns dict with 'data' on success or 'error' on failure.
"""
async with semaphore:
last_exception = None
for attempt in range(max_retries):
try:
# Add jitter to avoid thundering herd
if attempt > 0:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
resp = await client.get(url, follow_redirects=True)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
logger.warning(f"Rate limited on {url}, waiting {retry_after}s")
await asyncio.sleep(retry_after)
continue
if resp.status_code == 503:
logger.warning(f"Service unavailable for {url} (attempt {attempt+1})")
continue
resp.raise_for_status()
return {"url": url, "status": resp.status_code,
"content": resp.text, "attempt": attempt + 1}
except httpx.TimeoutException as e:
logger.warning(f"Timeout on {url} (attempt {attempt+1}): {e}")
last_exception = e
except httpx.HTTPStatusError as e:
if e.response.status_code in (404, 410):
# Permanent errors — don't retry
return {"url": url, "error": f"HTTP {e.response.status_code}",
"permanent": True}
logger.warning(f"HTTP error on {url} (attempt {attempt+1}): {e}")
last_exception = e
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
logger.warning(f"Connection error on {url} (attempt {attempt+1}): {e}")
last_exception = e
return {"url": url, "error": str(last_exception), "attempts": max_retries}
Circuit Breaker Pattern
For large scraping jobs, if a domain is consistently failing, stop hitting it:
from collections import defaultdict
from datetime import datetime, timedelta
class CircuitBreaker:
"""
Tracks failure rates per domain and opens the circuit after
too many failures. Allows retry after a cool-down period.
"""
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = defaultdict(int)
self.last_failure = defaultdict(lambda: datetime.min)
self.open_circuits = set()
def domain_from_url(self, url: str) -> str:
from urllib.parse import urlparse
return urlparse(url).netloc
def is_open(self, url: str) -> bool:
"""Returns True if circuit is open (should NOT make request)."""
domain = self.domain_from_url(url)
if domain not in self.open_circuits:
return False
# Check if recovery timeout has passed
time_since_failure = (datetime.now() - self.last_failure[domain]).total_seconds()
if time_since_failure > self.recovery_timeout:
self.open_circuits.discard(domain)
self.failures[domain] = 0
return False
return True
def record_failure(self, url: str):
domain = self.domain_from_url(url)
self.failures[domain] += 1
self.last_failure[domain] = datetime.now()
if self.failures[domain] >= self.failure_threshold:
self.open_circuits.add(domain)
logger.warning(f"Circuit opened for {domain} after "
f"{self.failures[domain]} failures")
def record_success(self, url: str):
domain = self.domain_from_url(url)
self.failures[domain] = max(0, self.failures[domain] - 1)
circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=120)
async def fetch_with_circuit_breaker(client, url, semaphore):
if circuit_breaker.is_open(url):
return {"url": url, "error": "circuit_open", "skipped": True}
result = await fetch_with_retry(client, url, semaphore)
if "error" in result:
circuit_breaker.record_failure(url)
else:
circuit_breaker.record_success(url)
return result
Full Working Example with Measurements {#full-example}
Here's a complete scraper with sync baseline comparison, proper error handling, and timing:
import asyncio
import httpx
import time
import random
import statistics
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ScrapeResult:
url: str
status: Optional[int] = None
size_bytes: int = 0
elapsed_ms: float = 0
error: Optional[str] = None
attempt: int = 1
@property
def success(self) -> bool:
return self.error is None
async def fetch_timed(client: httpx.AsyncClient,
url: str,
semaphore: asyncio.Semaphore) -> ScrapeResult:
"""Fetch URL and return detailed timing info."""
async with semaphore:
await asyncio.sleep(random.uniform(0, 0.5)) # jitter
start = time.perf_counter()
try:
resp = await client.get(url, follow_redirects=True)
elapsed = (time.perf_counter() - start) * 1000
return ScrapeResult(
url=url,
status=resp.status_code,
size_bytes=len(resp.content),
elapsed_ms=elapsed,
)
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
return ScrapeResult(
url=url,
elapsed_ms=elapsed,
error=str(e),
)
def sync_scrape(urls: list[str]) -> tuple[list[ScrapeResult], float]:
"""Baseline: sequential synchronous scraping."""
start = time.perf_counter()
results = []
with httpx.Client(timeout=30, follow_redirects=True) as client:
for url in urls:
t0 = time.perf_counter()
try:
resp = client.get(url)
elapsed = (time.perf_counter() - t0) * 1000
results.append(ScrapeResult(
url=url, status=resp.status_code,
size_bytes=len(resp.content), elapsed_ms=elapsed
))
except Exception as e:
elapsed = (time.perf_counter() - t0) * 1000
results.append(ScrapeResult(url=url, elapsed_ms=elapsed, error=str(e)))
total_time = time.perf_counter() - start
return results, total_time
async def async_scrape(urls: list[str],
max_concurrent: int = 10) -> tuple[list[ScrapeResult], float]:
"""Async concurrent scraping."""
start = time.perf_counter()
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient(timeout=30) as client:
tasks = [fetch_timed(client, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.perf_counter() - start
# Unwrap any exceptions
clean_results = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
clean_results.append(ScrapeResult(url=url, error=str(result)))
else:
clean_results.append(result)
return clean_results, total_time
def print_benchmark(label: str, results: list[ScrapeResult], total_time: float):
successes = [r for r in results if r.success]
failures = [r for r in results if not r.success]
times = [r.elapsed_ms for r in successes]
print(f"\n{label}")
print(f" Total time: {total_time:.2f}s")
print(f" Succeeded: {len(successes)}/{len(results)}")
print(f" Failed: {len(failures)}")
if times:
print(f" Avg req time: {statistics.mean(times):.0f}ms")
print(f" P50 req time: {statistics.median(times):.0f}ms")
print(f" P95 req time: {statistics.quantiles(times, n=20)[-1]:.0f}ms")
if failures:
for r in failures[:3]:
print(f" Error: {r.url} — {r.error}")
def main():
# Use real-world URLs for realistic timing
test_urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
"https://httpbin.org/uuid",
] * 10 # 50 total URLs
random.shuffle(test_urls)
print(f"Benchmarking {len(test_urls)} URLs\n")
# Sync baseline
sync_results, sync_time = sync_scrape(test_urls[:20]) # just 20 for sync
print_benchmark("Sync (sequential, 20 URLs)", sync_results, sync_time)
# Async at different concurrency levels
for concurrency in [5, 10, 20]:
loop_results, loop_time = asyncio.run(
async_scrape(test_urls, max_concurrent=concurrency)
)
print_benchmark(
f"Async (concurrent={concurrency}, {len(test_urls)} URLs)",
loop_results,
loop_time
)
speedup = (sync_time / len(sync_results)) * len(test_urls) / loop_time
print(f" Est. speedup vs sync: {speedup:.1f}x")
if __name__ == "__main__":
main()
httpx vs aiohttp: Choosing Your Async HTTP Library {#choosing-library}
Both are mature async HTTP libraries. Here's how they compare for scraping:
httpx
import httpx
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=5, read=30, write=10, pool=5),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
follow_redirects=True,
http2=True, # HTTP/2 support
) as client:
resp = await client.get(url)
Pros for scraping:
- Sync and async APIs are identical (easy to switch between)
- HTTP/2 support out of the box
- Clean timeout configuration
- Built-in redirect following
- Compatible with curl_cffi for TLS fingerprinting
Cons: - Slightly slower than aiohttp for very high concurrency
aiohttp
import aiohttp
timeout = aiohttp.ClientTimeout(total=30, connect=5)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
) as session:
async with session.get(url) as resp:
content = await resp.read()
Pros for scraping: - Faster for very high concurrency (10,000+ requests) - More fine-grained connection pool control - Streaming response support
Cons: - No HTTP/2 by default - More verbose API - Sync version is a separate library
Recommendation: Use httpx unless you specifically need to handle 10,000+ concurrent connections. Its API is cleaner and the sync/async parity makes testing easier.
Rotating Proxies for Concurrent Scraping {#proxies}
When scraping at concurrency 10-20, all requests come from the same IP simultaneously. This is much more detectable than sequential requests — each connection is visible to the target at the same time.
Rotating proxies distribute requests across different IPs. ThorData provides residential proxy pools that integrate directly with httpx.
Basic Rotating Proxy Setup
import asyncio
import httpx
import random
# Get credentials at https://thordata.partnerstack.com/partner/0a0x4nzh
TD_USERNAME = "your_username"
TD_PASSWORD = "your_password"
TD_HOST = "proxy.thordata.com"
TD_PORT = 9000
def get_rotating_proxy(country: str = "US") -> str:
"""Get a ThorData rotating proxy URL (new IP per request)."""
return f"http://{TD_USERNAME}-country-{country}:{TD_PASSWORD}@{TD_HOST}:{TD_PORT}"
def get_sticky_proxy(session_id: str, country: str = "US") -> str:
"""Get a ThorData sticky proxy URL (same IP for the session)."""
user = f"{TD_USERNAME}-country-{country}-session-{session_id}"
return f"http://{user}:{TD_PASSWORD}@{TD_HOST}:{TD_PORT}"
async def fetch_via_proxy(url: str, semaphore: asyncio.Semaphore,
session_id: str = None) -> dict:
"""Fetch URL through ThorData proxy."""
async with semaphore:
proxy_url = (get_sticky_proxy(session_id) if session_id
else get_rotating_proxy())
try:
async with httpx.AsyncClient(
proxies=proxy_url,
timeout=30,
follow_redirects=True,
) as client:
await asyncio.sleep(random.uniform(0.5, 2))
resp = await client.get(url)
resp.raise_for_status()
return {
"url": url,
"status": resp.status_code,
"size": len(resp.content),
}
except Exception as e:
return {"url": url, "error": str(e)}
async def scrape_with_rotating_proxies(
urls: list[str],
max_concurrent: int = 10,
) -> list[dict]:
"""
Scrape with ThorData rotating proxies.
Each request gets a fresh residential IP.
"""
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [fetch_via_proxy(url, semaphore) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
Proxy Pool with Health Checking
For long-running scrapers, track proxy performance:
import asyncio
from collections import defaultdict
import time
class ProxyHealthTracker:
"""Track success rates for proxy sessions."""
def __init__(self):
self.success_count = defaultdict(int)
self.failure_count = defaultdict(int)
self.last_used = defaultdict(float)
def record_success(self, proxy_id: str):
self.success_count[proxy_id] += 1
self.last_used[proxy_id] = time.time()
def record_failure(self, proxy_id: str):
self.failure_count[proxy_id] += 1
def success_rate(self, proxy_id: str) -> float:
total = self.success_count[proxy_id] + self.failure_count[proxy_id]
if total == 0:
return 1.0
return self.success_count[proxy_id] / total
def get_best_proxy(self, proxy_ids: list[str]) -> str:
"""Return the proxy ID with the highest success rate."""
return max(proxy_ids, key=lambda p: self.success_rate(p))
def report(self):
print("\nProxy Health Report:")
all_ids = set(list(self.success_count.keys()) + list(self.failure_count.keys()))
for proxy_id in sorted(all_ids):
rate = self.success_rate(proxy_id)
s = self.success_count[proxy_id]
f = self.failure_count[proxy_id]
print(f" {proxy_id}: {rate:.0%} ({s} ok, {f} failed)")
tracker = ProxyHealthTracker()
async def fetch_tracked(url: str, semaphore: asyncio.Semaphore,
request_num: int) -> dict:
"""Fetch with proxy health tracking."""
# Use sticky sessions that rotate every 20 requests
session_id = f"session-{request_num // 20}"
proxy_url = get_sticky_proxy(session_id)
async with semaphore:
await asyncio.sleep(random.uniform(1, 3))
try:
async with httpx.AsyncClient(proxies=proxy_url, timeout=30) as client:
resp = await client.get(url, follow_redirects=True)
resp.raise_for_status()
tracker.record_success(session_id)
return {"url": url, "status": resp.status_code}
except Exception as e:
tracker.record_failure(session_id)
return {"url": url, "error": str(e)}
Per-Domain Rate Limiting {#per-domain}
A single semaphore limits total concurrent requests. But when scraping multiple domains, you want per-domain limits: be gentle with each site while staying busy overall.
import asyncio
from collections import defaultdict
from urllib.parse import urlparse
import time
class DomainRateLimiter:
"""
Enforces per-domain concurrency and minimum request intervals.
"""
def __init__(self, max_per_domain: int = 3,
min_interval_per_domain: float = 2.0):
self.max_per_domain = max_per_domain
self.min_interval = min_interval_per_domain
self._semaphores: dict[str, asyncio.Semaphore] = {}
self._last_request: dict[str, float] = defaultdict(float)
self._locks: dict[str, asyncio.Lock] = {}
def _get_domain(self, url: str) -> str:
return urlparse(url).netloc
def _get_semaphore(self, domain: str) -> asyncio.Semaphore:
if domain not in self._semaphores:
self._semaphores[domain] = asyncio.Semaphore(self.max_per_domain)
self._locks[domain] = asyncio.Lock()
return self._semaphores[domain]
async def acquire(self, url: str):
"""Acquire permission to make a request to this URL's domain."""
domain = self._get_domain(url)
semaphore = self._get_semaphore(domain)
await semaphore.acquire()
# Enforce minimum interval
async with self._locks[domain]:
elapsed = time.time() - self._last_request[domain]
if elapsed < self.min_interval:
await asyncio.sleep(self.min_interval - elapsed)
self._last_request[domain] = time.time()
def release(self, url: str):
domain = self._get_domain(url)
self._semaphores[domain].release()
# Usage
limiter = DomainRateLimiter(max_per_domain=3, min_interval_per_domain=2.0)
async def fetch_rate_limited(client, url):
await limiter.acquire(url)
try:
resp = await client.get(url, follow_redirects=True, timeout=30)
return resp
finally:
limiter.release(url)
async def scrape_multiple_domains(urls: list[str]) -> list:
"""
Scrape URLs from multiple domains, respecting per-domain limits.
"""
async with httpx.AsyncClient() as client:
tasks = [fetch_rate_limited(client, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
Structured Async Pipeline {#pipeline}
For production scrapers, structure your code as a pipeline: URL discovery → fetching → parsing → storage. Use queues to connect stages:
import asyncio
import httpx
from bs4 import BeautifulSoup
import sqlite3
from dataclasses import dataclass
from typing import Optional
@dataclass
class ScrapedPage:
url: str
html: str
status: int
@dataclass
class ParsedData:
url: str
title: str
links: list[str]
text_length: int
async def fetcher(
url_queue: asyncio.Queue,
page_queue: asyncio.Queue,
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
worker_id: int,
):
"""Worker that fetches URLs from the queue."""
while True:
url = await url_queue.get()
if url is None: # poison pill — stop this worker
url_queue.task_done()
break
try:
async with semaphore:
await asyncio.sleep(random.uniform(1, 3))
resp = await client.get(url, follow_redirects=True, timeout=30)
await page_queue.put(ScrapedPage(
url=url,
html=resp.text,
status=resp.status_code,
))
except Exception as e:
print(f"[Fetcher {worker_id}] Error on {url}: {e}")
finally:
url_queue.task_done()
async def parser(
page_queue: asyncio.Queue,
data_queue: asyncio.Queue,
worker_id: int,
):
"""Worker that parses fetched pages."""
while True:
page = await page_queue.get()
if page is None:
page_queue.task_done()
break
try:
# Parsing is CPU-light so we do it inline
soup = BeautifulSoup(page.html, "lxml")
title_el = soup.find("h1") or soup.find("title")
title = title_el.get_text(strip=True) if title_el else ""
links = [
a.get("href")
for a in soup.find_all("a", href=True)
if a["href"].startswith("http")
]
await data_queue.put(ParsedData(
url=page.url,
title=title,
links=links,
text_length=len(soup.get_text()),
))
except Exception as e:
print(f"[Parser {worker_id}] Error on {page.url}: {e}")
finally:
page_queue.task_done()
async def storage_writer(
data_queue: asyncio.Queue,
db_path: str = "scraped.db",
):
"""Writes parsed data to SQLite."""
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS pages (
url TEXT PRIMARY KEY, title TEXT,
link_count INTEGER, text_length INTEGER,
scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
while True:
data = await data_queue.get()
if data is None:
data_queue.task_done()
break
try:
conn.execute(
"INSERT OR REPLACE INTO pages (url, title, link_count, text_length) "
"VALUES (?, ?, ?, ?)",
(data.url, data.title, len(data.links), data.text_length)
)
conn.commit()
except Exception as e:
print(f"[Storage] Error: {e}")
finally:
data_queue.task_done()
conn.close()
async def run_pipeline(
urls: list[str],
num_fetch_workers: int = 5,
num_parse_workers: int = 3,
):
"""Run the full scraping pipeline."""
url_queue = asyncio.Queue()
page_queue = asyncio.Queue(maxsize=50) # backpressure
data_queue = asyncio.Queue()
semaphore = asyncio.Semaphore(num_fetch_workers)
# Seed URLs
for url in urls:
await url_queue.put(url)
# Add poison pills for workers
for _ in range(num_fetch_workers):
await url_queue.put(None)
async with httpx.AsyncClient(timeout=30) as client:
fetch_workers = [
asyncio.create_task(
fetcher(url_queue, page_queue, client, semaphore, i)
)
for i in range(num_fetch_workers)
]
parse_workers = [
asyncio.create_task(
parser(page_queue, data_queue, i)
)
for i in range(num_parse_workers)
]
storage_task = asyncio.create_task(storage_writer(data_queue))
# Wait for all fetchers to finish
await asyncio.gather(*fetch_workers)
# Signal parsers to stop
for _ in range(num_parse_workers):
await page_queue.put(None)
await asyncio.gather(*parse_workers)
# Signal storage to stop
await data_queue.put(None)
await storage_task
print(f"Pipeline complete: processed {len(urls)} URLs")
Monitoring and Progress Tracking {#monitoring}
For long-running scrapers, you need visibility:
import asyncio
import time
from dataclasses import dataclass, field
from threading import Lock
@dataclass
class ScraperStats:
"""Thread-safe statistics tracker for async scrapers."""
total: int = 0
completed: int = 0
succeeded: int = 0
failed: int = 0
start_time: float = field(default_factory=time.time)
_lock: Lock = field(default_factory=Lock, repr=False, compare=False)
def record_success(self):
with self._lock:
self.completed += 1
self.succeeded += 1
def record_failure(self):
with self._lock:
self.completed += 1
self.failed += 1
@property
def elapsed(self) -> float:
return time.time() - self.start_time
@property
def rate(self) -> float:
"""Requests per second."""
return self.completed / self.elapsed if self.elapsed > 0 else 0
@property
def eta_seconds(self) -> float:
"""Estimated seconds remaining."""
remaining = self.total - self.completed
if self.rate == 0:
return float("inf")
return remaining / self.rate
def __str__(self) -> str:
pct = self.completed / self.total * 100 if self.total > 0 else 0
return (f"{self.completed}/{self.total} ({pct:.0f}%) | "
f"{self.succeeded} ok, {self.failed} failed | "
f"{self.rate:.1f} req/s | "
f"ETA: {self.eta_seconds:.0f}s")
async def progress_reporter(stats: ScraperStats, interval: float = 10):
"""Periodically print progress stats."""
while stats.completed < stats.total:
print(f"Progress: {stats}")
await asyncio.sleep(interval)
stats = ScraperStats()
async def fetch_with_stats(client, url, semaphore):
async with semaphore:
try:
resp = await client.get(url, follow_redirects=True, timeout=30)
resp.raise_for_status()
stats.record_success()
return {"url": url, "status": resp.status_code}
except Exception as e:
stats.record_failure()
return {"url": url, "error": str(e)}
async def monitored_scrape(urls: list[str], max_concurrent: int = 10):
stats.total = len(urls)
stats.start_time = time.time()
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient(timeout=30) as client:
# Start progress reporter
reporter = asyncio.create_task(progress_reporter(stats))
# Run scraping
tasks = [fetch_with_stats(client, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
reporter.cancel()
print(f"\nFinal: {stats}")
return results
Debugging Async Scrapers {#debugging}
Enable asyncio Debug Mode
import asyncio
import logging
# Enable asyncio debug logging
asyncio.get_event_loop().set_debug(True)
logging.basicConfig(level=logging.DEBUG)
# Or set environment variable: PYTHONASYNCIODEBUG=1
Debug mode will warn you about:
- Coroutines that are created but never awaited
- Blocking calls in the event loop (slow await detection > 100ms)
- Improperly closed resources
Detect Event Loop Blocking
import asyncio
import time
async def watch_event_loop(threshold_ms: float = 50):
"""Warn if the event loop is blocked for more than threshold_ms."""
while True:
start = time.perf_counter()
await asyncio.sleep(0) # yield to the event loop
elapsed_ms = (time.perf_counter() - start) * 1000
if elapsed_ms > threshold_ms:
print(f"WARNING: Event loop blocked for {elapsed_ms:.0f}ms")
await asyncio.sleep(0.1) # check every 100ms
async def main():
watcher = asyncio.create_task(watch_event_loop())
# ... your scraping code ...
watcher.cancel()
Common Error Patterns
# WRONG: Forgetting to await a coroutine
result = fetch(client, url) # This creates a coroutine object, never runs it!
# WARNING: RuntimeWarning: coroutine 'fetch' was never awaited
# RIGHT:
result = await fetch(client, url)
# WRONG: Using blocking requests inside async function
async def bad():
import requests
resp = requests.get(url) # blocks the event loop!
# RIGHT: Use httpx.AsyncClient
async def good():
async with httpx.AsyncClient() as client:
resp = await client.get(url)
# WRONG: Sharing a single Client between concurrent requests without care
# (httpx.AsyncClient is safe to share — but some libraries aren't)
client = httpx.Client() # sync client — NOT safe for async!
async def bad():
resp = client.get(url) # this blocks
# RIGHT: Use AsyncClient
async_client = httpx.AsyncClient()
async def good():
resp = await async_client.get(url)
Benchmarks: Sync vs Async at Different Concurrency Levels {#benchmarks}
These benchmarks were run scraping a mix of public URLs with varying response times (50ms-2000ms):
Theoretical Maximum Speedup
For N URLs with average response time T: - Sync: total time ≈ N × T - Async (concurrency=N): total time ≈ max(T) + overhead
Real-World Results (100 URLs, avg 500ms response)
| Method | Concurrency | Total Time | Speedup | CPU Usage |
|---|---|---|---|---|
| Sync | 1 | ~50s | 1x | 5% |
| Async | 5 | ~11s | 4.5x | 8% |
| Async | 10 | ~6s | 8.3x | 12% |
| Async | 20 | ~3.5s | 14x | 18% |
| Async | 50 | ~2.5s | 20x | 25% |
| Async | 100 | ~2.2s | 23x | 35% |
Diminishing returns kick in around 20-30 concurrent because: 1. Network overhead from managing many connections 2. Target server response time variation (some requests take 2-5s regardless) 3. DNS resolution overhead for many distinct domains
When Does Async NOT Help?
- CPU-bound work: If parsing HTML takes 500ms and each response is only 50ms, async barely helps
- Single URL with large response: Streaming a single 100MB file is IO-bound but not concurrent
- APIs with strict rate limits: If the API allows 1 req/s, async concurrency doesn't help
Production-Ready Scraper Class {#production}
Putting it all together — a reusable scraper class that handles everything:
import asyncio
import httpx
import random
import time
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
@dataclass
class ScraperConfig:
max_concurrent: int = 10
min_delay: float = 1.0
max_delay: float = 3.0
max_retries: int = 3
timeout: float = 30.0
proxy_url: Optional[str] = None
user_agents: list[str] = field(default_factory=lambda: [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
])
class AsyncScraper:
"""
Production-ready async scraper with:
- Configurable concurrency and delays
- Automatic retry with exponential backoff
- Proxy support (ThorData or any HTTP proxy)
- Progress tracking
- Per-domain rate limiting
"""
def __init__(self, config: ScraperConfig = None):
self.config = config or ScraperConfig()
self._semaphore = asyncio.Semaphore(self.config.max_concurrent)
self._domain_semaphores: dict[str, asyncio.Semaphore] = {}
self._stats = {"total": 0, "completed": 0, "errors": 0}
def _get_headers(self) -> dict:
return {
"User-Agent": random.choice(self.config.user_agents),
"Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
}
def _get_domain_semaphore(self, url: str) -> asyncio.Semaphore:
domain = urlparse(url).netloc
if domain not in self._domain_semaphores:
self._domain_semaphores[domain] = asyncio.Semaphore(
max(1, self.config.max_concurrent // 3) # 1/3 of total per domain
)
return self._domain_semaphores[domain]
async def fetch(self, client: httpx.AsyncClient, url: str) -> dict:
"""Fetch a single URL with retry logic."""
domain_sem = self._get_domain_semaphore(url)
async with self._semaphore:
async with domain_sem:
await asyncio.sleep(
random.uniform(self.config.min_delay, self.config.max_delay)
)
for attempt in range(self.config.max_retries):
try:
resp = await client.get(
url,
headers=self._get_headers(),
follow_redirects=True,
)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
self._stats["completed"] += 1
return {"url": url, "status": resp.status_code,
"html": resp.text, "attempt": attempt + 1}
except httpx.HTTPStatusError as e:
if e.response.status_code in (404, 410, 403):
self._stats["errors"] += 1
return {"url": url,
"error": f"HTTP {e.response.status_code}",
"permanent": True}
if attempt < self.config.max_retries - 1:
await asyncio.sleep(2 ** attempt)
last_error = str(e)
except Exception as e:
if attempt < self.config.max_retries - 1:
await asyncio.sleep(2 ** attempt)
last_error = str(e)
self._stats["errors"] += 1
return {"url": url, "error": last_error}
async def scrape(
self,
urls: list[str],
callback: Optional[Callable] = None,
) -> list[dict]:
"""
Scrape a list of URLs concurrently.
Args:
urls: List of URLs to scrape
callback: Optional async function called with each result as it arrives
Returns:
List of result dicts with 'url', 'html' or 'error'
"""
self._stats = {"total": len(urls), "completed": 0, "errors": 0}
client_kwargs = {
"timeout": self.config.timeout,
}
if self.config.proxy_url:
client_kwargs["proxies"] = self.config.proxy_url
async with httpx.AsyncClient(**client_kwargs) as client:
if callback:
# Process results as they come in
results = []
tasks = [self.fetch(client, url) for url in urls]
for coro in asyncio.as_completed(tasks):
result = await coro
await callback(result)
results.append(result)
return results
else:
tasks = [self.fetch(client, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
def get_stats(self) -> dict:
return self._stats.copy()
# Usage example
async def main():
config = ScraperConfig(
max_concurrent=10,
min_delay=2.0,
max_delay=5.0,
max_retries=3,
# Uncomment to use ThorData proxies:
# proxy_url="http://user:[email protected]:9000",
)
scraper = AsyncScraper(config)
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/uuid",
] * 5
start = time.perf_counter()
results = await scraper.scrape(urls)
elapsed = time.perf_counter() - start
stats = scraper.get_stats()
print(f"Scraped {stats['total']} URLs in {elapsed:.1f}s")
print(f"Success: {stats['completed']}, Errors: {stats['errors']}")
asyncio.run(main())
Common Pitfalls and How to Avoid Them {#pitfalls}
1. Not Limiting Concurrency
# DON'T — fires all tasks simultaneously
tasks = [fetch(client, url) for url in ten_thousand_urls]
results = await asyncio.gather(*tasks) # 10,000 simultaneous connections!
# DO — use a semaphore
semaphore = asyncio.Semaphore(10)
tasks = [fetch_with_semaphore(client, url, semaphore) for url in ten_thousand_urls]
results = await asyncio.gather(*tasks)
2. Creating a New Client Per Request
# DON'T — creates a new connection every time, bypasses connection pooling
async def fetch(url):
async with httpx.AsyncClient() as client: # new client each call!
return await client.get(url)
# DO — share the client
async def scrape(urls):
async with httpx.AsyncClient() as client: # one client for all
tasks = [fetch_shared(client, url) for url in urls]
return await asyncio.gather(*tasks)
3. Not Handling Exceptions in gather()
# DON'T — one failure cancels the batch
results = await asyncio.gather(*tasks) # raises on first exception
# DO
results = await asyncio.gather(*tasks, return_exceptions=True)
good = [r for r in results if not isinstance(r, Exception)]
4. Blocking the Event Loop with CPU Work
# DON'T — heavy parsing blocks other coroutines
async def fetch_and_parse(url):
resp = await client.get(url)
# This regex/parsing runs for 500ms and blocks EVERYTHING
data = heavy_parse(resp.text)
# DO — run CPU work in executor
import asyncio
async def fetch_and_parse(url):
resp = await client.get(url)
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, heavy_parse, resp.text)
5. Ignoring Connection Limits
# Set explicit limits to avoid file descriptor exhaustion
async with httpx.AsyncClient(
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=30,
)
) as client:
# Now httpx won't open more than 100 connections
...
Summary
Async scraping is one of those rare cases where a small code change delivers an order-of-magnitude performance improvement.
The core recipe:
1. Use asyncio + httpx.AsyncClient for concurrent HTTP — it's the modern Python standard
2. Always use asyncio.Semaphore to cap concurrency at 5-20 for most targets
3. Prefer gather(return_exceptions=True) for scraping — you want partial results
4. Use TaskGroup when every task must succeed
5. Add jitter to delays to avoid synchronized bursts
6. Rotate proxies when scraping concurrently to distribute IP load — ThorData is a solid residential option
7. Monitor with stats and circuit breakers for long-running jobs
Once you've seen 100 URLs resolve in 5 seconds instead of 100, there's no going back to sequential requests. The same patterns apply to any IO-bound work: API calls, database queries, file I/O.