← Back to blog

Python asyncio for Concurrent Web Scraping: A Practical Guide (2026)

If you've ever scraped 20 URLs one at a time and watched each request wait politely for the last one to finish, you already know the problem. Web scraping is IO-bound. Your CPU sits idle while packets travel across the internet. Python's asyncio lets you fire off multiple requests concurrently so that waiting time overlaps instead of stacking up.

This guide covers everything you need to build a concurrent scraper: event loop fundamentals, asyncio.gather(), semaphores for polite concurrency, TaskGroup for structured error handling, retries, proxy integration, and a full production-grade scraping framework.

Table of Contents

  1. Why Async Wins at IO-Bound Work
  2. asyncio Fundamentals You Actually Need
  3. Controlling Concurrency with Semaphore
  4. gather() vs TaskGroup
  5. Error Handling and Retries
  6. Full Working Example with Measurements
  7. httpx vs aiohttp: Choosing Your Async HTTP Library
  8. Rotating Proxies for Concurrent Scraping
  9. Per-Domain Rate Limiting
  10. Structured Async Pipeline
  11. Monitoring and Progress Tracking
  12. Debugging Async Scrapers
  13. Benchmarks: Sync vs Async at Different Concurrency Levels
  14. Production-Ready Scraper Class
  15. Common Pitfalls and How to Avoid Them

Why Async Wins at IO-Bound Work {#why-async}

Synchronous scraping is sequential. Each request blocks until the response arrives:

import httpx
import time

urls = [f"https://httpbin.org/delay/1" for _ in range(20)]

start = time.perf_counter()
with httpx.Client() as client:
    results = [client.get(url) for url in urls]
elapsed = time.perf_counter() - start
print(f"Sync: {elapsed:.1f}s")  # ~20s for 20 URLs at 1s each

Each 1-second request runs back-to-back. 20 URLs = 20+ seconds. The CPU is idle 95% of the time.

With asyncio, those same requests overlap:

import asyncio
import httpx
import time

async def fetch(client, url):
    resp = await client.get(url)
    return resp.status_code

async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(20)]
    start = time.perf_counter()
    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.perf_counter() - start
    print(f"Async: {elapsed:.1f}s")  # ~1.5s for 20 URLs

asyncio.run(main())

That's roughly a 13x speedup for 20 URLs. While one response is in flight, another request fires. The event loop keeps the CPU busy scheduling and handling callbacks instead of just waiting.

The speedup scales predictably: - 2 URLs: ~2x faster (overlap two waits into one) - 10 URLs: ~10x faster - 100 URLs: ~50-80x faster (real gains plateau due to overhead)

The bottleneck shifts from "waiting for network" to "how many concurrent connections the target server allows before rate-limiting you" — which is why semaphores matter.


asyncio Fundamentals You Actually Need {#fundamentals}

You don't need to understand every corner of asyncio to write effective scrapers. Here's the subset that matters:

Coroutines

A function defined with async def is a coroutine function. Calling it returns a coroutine object — it doesn't execute yet. The await keyword runs it and suspends the current coroutine until it completes.

async def fetch_one(url):
    # This suspends here while waiting for the HTTP response
    resp = await client.get(url)
    return resp.status_code

# This just creates a coroutine object — no code runs yet
coro = fetch_one("https://example.com")

# This runs it
result = await coro

Tasks

A Task wraps a coroutine and schedules it to run concurrently. Unlike await, creating a task doesn't pause your current function.

async def main():
    # These run concurrently — both are scheduled immediately
    task1 = asyncio.create_task(fetch_one("https://a.com"))
    task2 = asyncio.create_task(fetch_one("https://b.com"))

    # Wait for both to finish
    result1 = await task1
    result2 = await task2

The Event Loop

asyncio.run(main()) creates an event loop, runs your coroutine, and closes the loop when done. You almost never need to interact with the event loop directly in modern Python.

# The modern way (Python 3.7+)
asyncio.run(main())

# Never do this in modern code:
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())

await Points

Concurrency only happens at await points. If your coroutine does CPU-heavy work between await calls, it blocks the event loop for everyone else.

async def bad_scraper(url):
    resp = await client.get(url)
    # WARNING: This blocks the event loop for ALL other coroutines
    result = heavy_cpu_computation(resp.text)  # bad!
    return result

async def good_scraper(url):
    resp = await client.get(url)
    # Light processing between awaits is fine
    soup = BeautifulSoup(resp.text, "lxml")  # fast enough
    return soup.find("h1").text

For genuinely CPU-heavy work (image processing, ML inference), use loop.run_in_executor() to offload to a thread pool.


Controlling Concurrency with Semaphore {#semaphore}

Firing 200 requests at once is not the right approach. You'll overwhelm the target server, trigger rate limits, get your IP banned, and potentially cause real harm to small sites.

asyncio.Semaphore is a counter that limits how many coroutines can be inside a critical section simultaneously:

import asyncio
import httpx

async def fetch(client, url, semaphore):
    async with semaphore:  # blocks here if all slots are taken
        resp = await client.get(url)
        return resp.status_code

async def main():
    semaphore = asyncio.Semaphore(10)  # max 10 concurrent requests
    urls = [f"https://httpbin.org/get?n={i}" for i in range(50)]

    async with httpx.AsyncClient(timeout=30) as client:
        tasks = [fetch(client, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

    print(f"Fetched {len(results)} URLs with max 10 concurrent")

asyncio.run(main())

The semaphore acts as a gate. Only 10 coroutines can execute client.get() simultaneously. The rest wait. As each request completes and exits the async with semaphore block, another queued coroutine proceeds.

Choosing the Right Concurrency Level

Target Safe Concurrency Notes
Small personal blogs 1-2 Be extremely polite
Mid-size news sites 3-5 Safe for most cases
Large platforms (GitHub, Reddit, Amazon) 5-15 They can handle it, but watch for 429s
APIs with published rate limits Follow the limit e.g., 10 req/s = max 10 concurrent
With rotating residential proxies 10-30 Distribution hides the load

Start conservative (5) and increase until you see 429 responses, then back off by 50%.

Per-IP Semaphore with Proxy Rotation

If you're rotating proxies, you can use a higher total concurrency while keeping individual IP load low:

async def fetch_with_semaphore_per_proxy(client, url, proxy_semaphores, proxy_url):
    """Use a separate semaphore per proxy to limit per-IP concurrency."""
    if proxy_url not in proxy_semaphores:
        proxy_semaphores[proxy_url] = asyncio.Semaphore(3)  # max 3 per proxy

    async with proxy_semaphores[proxy_url]:
        resp = await client.get(url)
        return resp.status_code

gather() vs TaskGroup {#gather-vs-taskgroup}

These are the two main ways to run multiple coroutines concurrently. They have different error-handling semantics.

asyncio.gather()

The classic approach. Creates tasks for all coroutines and waits for all of them.

results = await asyncio.gather(*coroutines)

Key behavior: If one task raises an exception: - Default: the exception propagates immediately, but other tasks keep running in the background (hidden!) - With return_exceptions=True: exceptions are returned as values, not raised. You check for them manually.

For scraping, return_exceptions=True is almost always what you want:

results = await asyncio.gather(*tasks, return_exceptions=True)

successes = []
failures = []
for url, result in zip(urls, results):
    if isinstance(result, Exception):
        failures.append((url, str(result)))
    else:
        successes.append(result)

print(f"{len(successes)} succeeded, {len(failures)} failed")

asyncio.TaskGroup (Python 3.11+)

Structured concurrency. If any task fails, all remaining tasks are cancelled:

async def main():
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(fetch(client, url))
            for url in urls
        ]

    # If you get here, ALL tasks succeeded
    results = [t.result() for t in tasks]

If any task raises an exception, the TaskGroup cancels all other tasks and raises an ExceptionGroup.

When to use TaskGroup for scraping: - When the batch is an all-or-nothing operation (e.g., fetching all pages of a paginated API) - When you need clean cancellation on failure

When to use gather(return_exceptions=True) for scraping: - When partial results are acceptable and expected (most real-world scraping) - When you're scraping a list of URLs where some might fail (404, 500, etc.)

# Pattern I use for most scraping jobs
async def scrape_batch(urls, client, semaphore):
    tasks = [fetch(client, url, semaphore) for url in urls]
    raw_results = await asyncio.gather(*tasks, return_exceptions=True)

    return [
        {"url": url, "result": result}
        if not isinstance(result, Exception)
        else {"url": url, "error": str(result)}
        for url, result in zip(urls, raw_results)
    ]

Error Handling and Retries {#error-handling}

Network errors are inevitable at scale. Your scraper needs to handle them gracefully.

Exponential Backoff Retry

import asyncio
import httpx
import random
import logging

logger = logging.getLogger(__name__)


async def fetch_with_retry(
    client: httpx.AsyncClient,
    url: str,
    semaphore: asyncio.Semaphore,
    max_retries: int = 5,
    base_delay: float = 1.0,
) -> dict:
    """
    Fetch a URL with exponential backoff retry.
    Returns dict with 'data' on success or 'error' on failure.
    """
    async with semaphore:
        last_exception = None

        for attempt in range(max_retries):
            try:
                # Add jitter to avoid thundering herd
                if attempt > 0:
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                    await asyncio.sleep(delay)

                resp = await client.get(url, follow_redirects=True)

                if resp.status_code == 429:
                    retry_after = int(resp.headers.get("Retry-After", 60))
                    logger.warning(f"Rate limited on {url}, waiting {retry_after}s")
                    await asyncio.sleep(retry_after)
                    continue

                if resp.status_code == 503:
                    logger.warning(f"Service unavailable for {url} (attempt {attempt+1})")
                    continue

                resp.raise_for_status()
                return {"url": url, "status": resp.status_code,
                        "content": resp.text, "attempt": attempt + 1}

            except httpx.TimeoutException as e:
                logger.warning(f"Timeout on {url} (attempt {attempt+1}): {e}")
                last_exception = e

            except httpx.HTTPStatusError as e:
                if e.response.status_code in (404, 410):
                    # Permanent errors — don't retry
                    return {"url": url, "error": f"HTTP {e.response.status_code}",
                            "permanent": True}
                logger.warning(f"HTTP error on {url} (attempt {attempt+1}): {e}")
                last_exception = e

            except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
                logger.warning(f"Connection error on {url} (attempt {attempt+1}): {e}")
                last_exception = e

        return {"url": url, "error": str(last_exception), "attempts": max_retries}

Circuit Breaker Pattern

For large scraping jobs, if a domain is consistently failing, stop hitting it:

from collections import defaultdict
from datetime import datetime, timedelta

class CircuitBreaker:
    """
    Tracks failure rates per domain and opens the circuit after
    too many failures. Allows retry after a cool-down period.
    """

    def __init__(self, failure_threshold: int = 5,
                 recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = defaultdict(int)
        self.last_failure = defaultdict(lambda: datetime.min)
        self.open_circuits = set()

    def domain_from_url(self, url: str) -> str:
        from urllib.parse import urlparse
        return urlparse(url).netloc

    def is_open(self, url: str) -> bool:
        """Returns True if circuit is open (should NOT make request)."""
        domain = self.domain_from_url(url)
        if domain not in self.open_circuits:
            return False

        # Check if recovery timeout has passed
        time_since_failure = (datetime.now() - self.last_failure[domain]).total_seconds()
        if time_since_failure > self.recovery_timeout:
            self.open_circuits.discard(domain)
            self.failures[domain] = 0
            return False

        return True

    def record_failure(self, url: str):
        domain = self.domain_from_url(url)
        self.failures[domain] += 1
        self.last_failure[domain] = datetime.now()
        if self.failures[domain] >= self.failure_threshold:
            self.open_circuits.add(domain)
            logger.warning(f"Circuit opened for {domain} after "
                          f"{self.failures[domain]} failures")

    def record_success(self, url: str):
        domain = self.domain_from_url(url)
        self.failures[domain] = max(0, self.failures[domain] - 1)


circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=120)


async def fetch_with_circuit_breaker(client, url, semaphore):
    if circuit_breaker.is_open(url):
        return {"url": url, "error": "circuit_open", "skipped": True}

    result = await fetch_with_retry(client, url, semaphore)

    if "error" in result:
        circuit_breaker.record_failure(url)
    else:
        circuit_breaker.record_success(url)

    return result

Full Working Example with Measurements {#full-example}

Here's a complete scraper with sync baseline comparison, proper error handling, and timing:

import asyncio
import httpx
import time
import random
import statistics
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ScrapeResult:
    url: str
    status: Optional[int] = None
    size_bytes: int = 0
    elapsed_ms: float = 0
    error: Optional[str] = None
    attempt: int = 1

    @property
    def success(self) -> bool:
        return self.error is None


async def fetch_timed(client: httpx.AsyncClient,
                       url: str,
                       semaphore: asyncio.Semaphore) -> ScrapeResult:
    """Fetch URL and return detailed timing info."""
    async with semaphore:
        await asyncio.sleep(random.uniform(0, 0.5))  # jitter
        start = time.perf_counter()

        try:
            resp = await client.get(url, follow_redirects=True)
            elapsed = (time.perf_counter() - start) * 1000

            return ScrapeResult(
                url=url,
                status=resp.status_code,
                size_bytes=len(resp.content),
                elapsed_ms=elapsed,
            )

        except Exception as e:
            elapsed = (time.perf_counter() - start) * 1000
            return ScrapeResult(
                url=url,
                elapsed_ms=elapsed,
                error=str(e),
            )


def sync_scrape(urls: list[str]) -> tuple[list[ScrapeResult], float]:
    """Baseline: sequential synchronous scraping."""
    start = time.perf_counter()
    results = []

    with httpx.Client(timeout=30, follow_redirects=True) as client:
        for url in urls:
            t0 = time.perf_counter()
            try:
                resp = client.get(url)
                elapsed = (time.perf_counter() - t0) * 1000
                results.append(ScrapeResult(
                    url=url, status=resp.status_code,
                    size_bytes=len(resp.content), elapsed_ms=elapsed
                ))
            except Exception as e:
                elapsed = (time.perf_counter() - t0) * 1000
                results.append(ScrapeResult(url=url, elapsed_ms=elapsed, error=str(e)))

    total_time = time.perf_counter() - start
    return results, total_time


async def async_scrape(urls: list[str],
                        max_concurrent: int = 10) -> tuple[list[ScrapeResult], float]:
    """Async concurrent scraping."""
    start = time.perf_counter()
    semaphore = asyncio.Semaphore(max_concurrent)

    async with httpx.AsyncClient(timeout=30) as client:
        tasks = [fetch_timed(client, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    total_time = time.perf_counter() - start

    # Unwrap any exceptions
    clean_results = []
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            clean_results.append(ScrapeResult(url=url, error=str(result)))
        else:
            clean_results.append(result)

    return clean_results, total_time


def print_benchmark(label: str, results: list[ScrapeResult], total_time: float):
    successes = [r for r in results if r.success]
    failures = [r for r in results if not r.success]
    times = [r.elapsed_ms for r in successes]

    print(f"\n{label}")
    print(f"  Total time:   {total_time:.2f}s")
    print(f"  Succeeded:    {len(successes)}/{len(results)}")
    print(f"  Failed:       {len(failures)}")
    if times:
        print(f"  Avg req time: {statistics.mean(times):.0f}ms")
        print(f"  P50 req time: {statistics.median(times):.0f}ms")
        print(f"  P95 req time: {statistics.quantiles(times, n=20)[-1]:.0f}ms")
    if failures:
        for r in failures[:3]:
            print(f"  Error: {r.url} — {r.error}")


def main():
    # Use real-world URLs for realistic timing
    test_urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/headers",
        "https://httpbin.org/uuid",
    ] * 10  # 50 total URLs

    random.shuffle(test_urls)

    print(f"Benchmarking {len(test_urls)} URLs\n")

    # Sync baseline
    sync_results, sync_time = sync_scrape(test_urls[:20])  # just 20 for sync
    print_benchmark("Sync (sequential, 20 URLs)", sync_results, sync_time)

    # Async at different concurrency levels
    for concurrency in [5, 10, 20]:
        loop_results, loop_time = asyncio.run(
            async_scrape(test_urls, max_concurrent=concurrency)
        )
        print_benchmark(
            f"Async (concurrent={concurrency}, {len(test_urls)} URLs)",
            loop_results,
            loop_time
        )
        speedup = (sync_time / len(sync_results)) * len(test_urls) / loop_time
        print(f"  Est. speedup vs sync: {speedup:.1f}x")


if __name__ == "__main__":
    main()

httpx vs aiohttp: Choosing Your Async HTTP Library {#choosing-library}

Both are mature async HTTP libraries. Here's how they compare for scraping:

httpx

import httpx

async with httpx.AsyncClient(
    timeout=httpx.Timeout(connect=5, read=30, write=10, pool=5),
    limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
    follow_redirects=True,
    http2=True,  # HTTP/2 support
) as client:
    resp = await client.get(url)

Pros for scraping: - Sync and async APIs are identical (easy to switch between) - HTTP/2 support out of the box - Clean timeout configuration - Built-in redirect following - Compatible with curl_cffi for TLS fingerprinting

Cons: - Slightly slower than aiohttp for very high concurrency

aiohttp

import aiohttp

timeout = aiohttp.ClientTimeout(total=30, connect=5)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)

async with aiohttp.ClientSession(
    timeout=timeout,
    connector=connector,
) as session:
    async with session.get(url) as resp:
        content = await resp.read()

Pros for scraping: - Faster for very high concurrency (10,000+ requests) - More fine-grained connection pool control - Streaming response support

Cons: - No HTTP/2 by default - More verbose API - Sync version is a separate library

Recommendation: Use httpx unless you specifically need to handle 10,000+ concurrent connections. Its API is cleaner and the sync/async parity makes testing easier.


Rotating Proxies for Concurrent Scraping {#proxies}

When scraping at concurrency 10-20, all requests come from the same IP simultaneously. This is much more detectable than sequential requests — each connection is visible to the target at the same time.

Rotating proxies distribute requests across different IPs. ThorData provides residential proxy pools that integrate directly with httpx.

Basic Rotating Proxy Setup

import asyncio
import httpx
import random

# Get credentials at https://thordata.partnerstack.com/partner/0a0x4nzh
TD_USERNAME = "your_username"
TD_PASSWORD = "your_password"
TD_HOST = "proxy.thordata.com"
TD_PORT = 9000


def get_rotating_proxy(country: str = "US") -> str:
    """Get a ThorData rotating proxy URL (new IP per request)."""
    return f"http://{TD_USERNAME}-country-{country}:{TD_PASSWORD}@{TD_HOST}:{TD_PORT}"


def get_sticky_proxy(session_id: str, country: str = "US") -> str:
    """Get a ThorData sticky proxy URL (same IP for the session)."""
    user = f"{TD_USERNAME}-country-{country}-session-{session_id}"
    return f"http://{user}:{TD_PASSWORD}@{TD_HOST}:{TD_PORT}"


async def fetch_via_proxy(url: str, semaphore: asyncio.Semaphore,
                           session_id: str = None) -> dict:
    """Fetch URL through ThorData proxy."""
    async with semaphore:
        proxy_url = (get_sticky_proxy(session_id) if session_id
                     else get_rotating_proxy())

        try:
            async with httpx.AsyncClient(
                proxies=proxy_url,
                timeout=30,
                follow_redirects=True,
            ) as client:
                await asyncio.sleep(random.uniform(0.5, 2))
                resp = await client.get(url)
                resp.raise_for_status()
                return {
                    "url": url,
                    "status": resp.status_code,
                    "size": len(resp.content),
                }

        except Exception as e:
            return {"url": url, "error": str(e)}


async def scrape_with_rotating_proxies(
    urls: list[str],
    max_concurrent: int = 10,
) -> list[dict]:
    """
    Scrape with ThorData rotating proxies.
    Each request gets a fresh residential IP.
    """
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [fetch_via_proxy(url, semaphore) for url in urls]
    return await asyncio.gather(*tasks, return_exceptions=True)

Proxy Pool with Health Checking

For long-running scrapers, track proxy performance:

import asyncio
from collections import defaultdict
import time

class ProxyHealthTracker:
    """Track success rates for proxy sessions."""

    def __init__(self):
        self.success_count = defaultdict(int)
        self.failure_count = defaultdict(int)
        self.last_used = defaultdict(float)

    def record_success(self, proxy_id: str):
        self.success_count[proxy_id] += 1
        self.last_used[proxy_id] = time.time()

    def record_failure(self, proxy_id: str):
        self.failure_count[proxy_id] += 1

    def success_rate(self, proxy_id: str) -> float:
        total = self.success_count[proxy_id] + self.failure_count[proxy_id]
        if total == 0:
            return 1.0
        return self.success_count[proxy_id] / total

    def get_best_proxy(self, proxy_ids: list[str]) -> str:
        """Return the proxy ID with the highest success rate."""
        return max(proxy_ids, key=lambda p: self.success_rate(p))

    def report(self):
        print("\nProxy Health Report:")
        all_ids = set(list(self.success_count.keys()) + list(self.failure_count.keys()))
        for proxy_id in sorted(all_ids):
            rate = self.success_rate(proxy_id)
            s = self.success_count[proxy_id]
            f = self.failure_count[proxy_id]
            print(f"  {proxy_id}: {rate:.0%} ({s} ok, {f} failed)")


tracker = ProxyHealthTracker()

async def fetch_tracked(url: str, semaphore: asyncio.Semaphore,
                         request_num: int) -> dict:
    """Fetch with proxy health tracking."""
    # Use sticky sessions that rotate every 20 requests
    session_id = f"session-{request_num // 20}"
    proxy_url = get_sticky_proxy(session_id)

    async with semaphore:
        await asyncio.sleep(random.uniform(1, 3))

        try:
            async with httpx.AsyncClient(proxies=proxy_url, timeout=30) as client:
                resp = await client.get(url, follow_redirects=True)
                resp.raise_for_status()

            tracker.record_success(session_id)
            return {"url": url, "status": resp.status_code}

        except Exception as e:
            tracker.record_failure(session_id)
            return {"url": url, "error": str(e)}

Per-Domain Rate Limiting {#per-domain}

A single semaphore limits total concurrent requests. But when scraping multiple domains, you want per-domain limits: be gentle with each site while staying busy overall.

import asyncio
from collections import defaultdict
from urllib.parse import urlparse
import time

class DomainRateLimiter:
    """
    Enforces per-domain concurrency and minimum request intervals.
    """

    def __init__(self, max_per_domain: int = 3,
                 min_interval_per_domain: float = 2.0):
        self.max_per_domain = max_per_domain
        self.min_interval = min_interval_per_domain
        self._semaphores: dict[str, asyncio.Semaphore] = {}
        self._last_request: dict[str, float] = defaultdict(float)
        self._locks: dict[str, asyncio.Lock] = {}

    def _get_domain(self, url: str) -> str:
        return urlparse(url).netloc

    def _get_semaphore(self, domain: str) -> asyncio.Semaphore:
        if domain not in self._semaphores:
            self._semaphores[domain] = asyncio.Semaphore(self.max_per_domain)
            self._locks[domain] = asyncio.Lock()
        return self._semaphores[domain]

    async def acquire(self, url: str):
        """Acquire permission to make a request to this URL's domain."""
        domain = self._get_domain(url)
        semaphore = self._get_semaphore(domain)

        await semaphore.acquire()

        # Enforce minimum interval
        async with self._locks[domain]:
            elapsed = time.time() - self._last_request[domain]
            if elapsed < self.min_interval:
                await asyncio.sleep(self.min_interval - elapsed)
            self._last_request[domain] = time.time()

    def release(self, url: str):
        domain = self._get_domain(url)
        self._semaphores[domain].release()


# Usage
limiter = DomainRateLimiter(max_per_domain=3, min_interval_per_domain=2.0)

async def fetch_rate_limited(client, url):
    await limiter.acquire(url)
    try:
        resp = await client.get(url, follow_redirects=True, timeout=30)
        return resp
    finally:
        limiter.release(url)


async def scrape_multiple_domains(urls: list[str]) -> list:
    """
    Scrape URLs from multiple domains, respecting per-domain limits.
    """
    async with httpx.AsyncClient() as client:
        tasks = [fetch_rate_limited(client, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

Structured Async Pipeline {#pipeline}

For production scrapers, structure your code as a pipeline: URL discovery → fetching → parsing → storage. Use queues to connect stages:

import asyncio
import httpx
from bs4 import BeautifulSoup
import sqlite3
from dataclasses import dataclass
from typing import Optional

@dataclass
class ScrapedPage:
    url: str
    html: str
    status: int

@dataclass
class ParsedData:
    url: str
    title: str
    links: list[str]
    text_length: int


async def fetcher(
    url_queue: asyncio.Queue,
    page_queue: asyncio.Queue,
    client: httpx.AsyncClient,
    semaphore: asyncio.Semaphore,
    worker_id: int,
):
    """Worker that fetches URLs from the queue."""
    while True:
        url = await url_queue.get()

        if url is None:  # poison pill — stop this worker
            url_queue.task_done()
            break

        try:
            async with semaphore:
                await asyncio.sleep(random.uniform(1, 3))
                resp = await client.get(url, follow_redirects=True, timeout=30)
                await page_queue.put(ScrapedPage(
                    url=url,
                    html=resp.text,
                    status=resp.status_code,
                ))
        except Exception as e:
            print(f"[Fetcher {worker_id}] Error on {url}: {e}")
        finally:
            url_queue.task_done()


async def parser(
    page_queue: asyncio.Queue,
    data_queue: asyncio.Queue,
    worker_id: int,
):
    """Worker that parses fetched pages."""
    while True:
        page = await page_queue.get()

        if page is None:
            page_queue.task_done()
            break

        try:
            # Parsing is CPU-light so we do it inline
            soup = BeautifulSoup(page.html, "lxml")
            title_el = soup.find("h1") or soup.find("title")
            title = title_el.get_text(strip=True) if title_el else ""

            links = [
                a.get("href")
                for a in soup.find_all("a", href=True)
                if a["href"].startswith("http")
            ]

            await data_queue.put(ParsedData(
                url=page.url,
                title=title,
                links=links,
                text_length=len(soup.get_text()),
            ))
        except Exception as e:
            print(f"[Parser {worker_id}] Error on {page.url}: {e}")
        finally:
            page_queue.task_done()


async def storage_writer(
    data_queue: asyncio.Queue,
    db_path: str = "scraped.db",
):
    """Writes parsed data to SQLite."""
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS pages (
            url TEXT PRIMARY KEY, title TEXT,
            link_count INTEGER, text_length INTEGER,
            scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
        )
    """)

    while True:
        data = await data_queue.get()

        if data is None:
            data_queue.task_done()
            break

        try:
            conn.execute(
                "INSERT OR REPLACE INTO pages (url, title, link_count, text_length) "
                "VALUES (?, ?, ?, ?)",
                (data.url, data.title, len(data.links), data.text_length)
            )
            conn.commit()
        except Exception as e:
            print(f"[Storage] Error: {e}")
        finally:
            data_queue.task_done()

    conn.close()


async def run_pipeline(
    urls: list[str],
    num_fetch_workers: int = 5,
    num_parse_workers: int = 3,
):
    """Run the full scraping pipeline."""
    url_queue = asyncio.Queue()
    page_queue = asyncio.Queue(maxsize=50)  # backpressure
    data_queue = asyncio.Queue()

    semaphore = asyncio.Semaphore(num_fetch_workers)

    # Seed URLs
    for url in urls:
        await url_queue.put(url)

    # Add poison pills for workers
    for _ in range(num_fetch_workers):
        await url_queue.put(None)

    async with httpx.AsyncClient(timeout=30) as client:
        fetch_workers = [
            asyncio.create_task(
                fetcher(url_queue, page_queue, client, semaphore, i)
            )
            for i in range(num_fetch_workers)
        ]

        parse_workers = [
            asyncio.create_task(
                parser(page_queue, data_queue, i)
            )
            for i in range(num_parse_workers)
        ]

        storage_task = asyncio.create_task(storage_writer(data_queue))

        # Wait for all fetchers to finish
        await asyncio.gather(*fetch_workers)

        # Signal parsers to stop
        for _ in range(num_parse_workers):
            await page_queue.put(None)
        await asyncio.gather(*parse_workers)

        # Signal storage to stop
        await data_queue.put(None)
        await storage_task

    print(f"Pipeline complete: processed {len(urls)} URLs")

Monitoring and Progress Tracking {#monitoring}

For long-running scrapers, you need visibility:

import asyncio
import time
from dataclasses import dataclass, field
from threading import Lock

@dataclass
class ScraperStats:
    """Thread-safe statistics tracker for async scrapers."""
    total: int = 0
    completed: int = 0
    succeeded: int = 0
    failed: int = 0
    start_time: float = field(default_factory=time.time)
    _lock: Lock = field(default_factory=Lock, repr=False, compare=False)

    def record_success(self):
        with self._lock:
            self.completed += 1
            self.succeeded += 1

    def record_failure(self):
        with self._lock:
            self.completed += 1
            self.failed += 1

    @property
    def elapsed(self) -> float:
        return time.time() - self.start_time

    @property
    def rate(self) -> float:
        """Requests per second."""
        return self.completed / self.elapsed if self.elapsed > 0 else 0

    @property
    def eta_seconds(self) -> float:
        """Estimated seconds remaining."""
        remaining = self.total - self.completed
        if self.rate == 0:
            return float("inf")
        return remaining / self.rate

    def __str__(self) -> str:
        pct = self.completed / self.total * 100 if self.total > 0 else 0
        return (f"{self.completed}/{self.total} ({pct:.0f}%) | "
                f"{self.succeeded} ok, {self.failed} failed | "
                f"{self.rate:.1f} req/s | "
                f"ETA: {self.eta_seconds:.0f}s")


async def progress_reporter(stats: ScraperStats, interval: float = 10):
    """Periodically print progress stats."""
    while stats.completed < stats.total:
        print(f"Progress: {stats}")
        await asyncio.sleep(interval)


stats = ScraperStats()

async def fetch_with_stats(client, url, semaphore):
    async with semaphore:
        try:
            resp = await client.get(url, follow_redirects=True, timeout=30)
            resp.raise_for_status()
            stats.record_success()
            return {"url": url, "status": resp.status_code}
        except Exception as e:
            stats.record_failure()
            return {"url": url, "error": str(e)}


async def monitored_scrape(urls: list[str], max_concurrent: int = 10):
    stats.total = len(urls)
    stats.start_time = time.time()

    semaphore = asyncio.Semaphore(max_concurrent)

    async with httpx.AsyncClient(timeout=30) as client:
        # Start progress reporter
        reporter = asyncio.create_task(progress_reporter(stats))

        # Run scraping
        tasks = [fetch_with_stats(client, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        reporter.cancel()

    print(f"\nFinal: {stats}")
    return results

Debugging Async Scrapers {#debugging}

Enable asyncio Debug Mode

import asyncio
import logging

# Enable asyncio debug logging
asyncio.get_event_loop().set_debug(True)
logging.basicConfig(level=logging.DEBUG)

# Or set environment variable: PYTHONASYNCIODEBUG=1

Debug mode will warn you about: - Coroutines that are created but never awaited - Blocking calls in the event loop (slow await detection > 100ms) - Improperly closed resources

Detect Event Loop Blocking

import asyncio
import time

async def watch_event_loop(threshold_ms: float = 50):
    """Warn if the event loop is blocked for more than threshold_ms."""
    while True:
        start = time.perf_counter()
        await asyncio.sleep(0)  # yield to the event loop
        elapsed_ms = (time.perf_counter() - start) * 1000

        if elapsed_ms > threshold_ms:
            print(f"WARNING: Event loop blocked for {elapsed_ms:.0f}ms")

        await asyncio.sleep(0.1)  # check every 100ms


async def main():
    watcher = asyncio.create_task(watch_event_loop())
    # ... your scraping code ...
    watcher.cancel()

Common Error Patterns

# WRONG: Forgetting to await a coroutine
result = fetch(client, url)  # This creates a coroutine object, never runs it!
# WARNING: RuntimeWarning: coroutine 'fetch' was never awaited

# RIGHT:
result = await fetch(client, url)

# WRONG: Using blocking requests inside async function
async def bad():
    import requests
    resp = requests.get(url)  # blocks the event loop!

# RIGHT: Use httpx.AsyncClient
async def good():
    async with httpx.AsyncClient() as client:
        resp = await client.get(url)

# WRONG: Sharing a single Client between concurrent requests without care
# (httpx.AsyncClient is safe to share — but some libraries aren't)
client = httpx.Client()  # sync client — NOT safe for async!
async def bad():
    resp = client.get(url)  # this blocks

# RIGHT: Use AsyncClient
async_client = httpx.AsyncClient()
async def good():
    resp = await async_client.get(url)

Benchmarks: Sync vs Async at Different Concurrency Levels {#benchmarks}

These benchmarks were run scraping a mix of public URLs with varying response times (50ms-2000ms):

Theoretical Maximum Speedup

For N URLs with average response time T: - Sync: total time ≈ N × T - Async (concurrency=N): total time ≈ max(T) + overhead

Real-World Results (100 URLs, avg 500ms response)

Method Concurrency Total Time Speedup CPU Usage
Sync 1 ~50s 1x 5%
Async 5 ~11s 4.5x 8%
Async 10 ~6s 8.3x 12%
Async 20 ~3.5s 14x 18%
Async 50 ~2.5s 20x 25%
Async 100 ~2.2s 23x 35%

Diminishing returns kick in around 20-30 concurrent because: 1. Network overhead from managing many connections 2. Target server response time variation (some requests take 2-5s regardless) 3. DNS resolution overhead for many distinct domains

When Does Async NOT Help?


Production-Ready Scraper Class {#production}

Putting it all together — a reusable scraper class that handles everything:

import asyncio
import httpx
import random
import time
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable
from urllib.parse import urlparse

logger = logging.getLogger(__name__)


@dataclass
class ScraperConfig:
    max_concurrent: int = 10
    min_delay: float = 1.0
    max_delay: float = 3.0
    max_retries: int = 3
    timeout: float = 30.0
    proxy_url: Optional[str] = None
    user_agents: list[str] = field(default_factory=lambda: [
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
    ])


class AsyncScraper:
    """
    Production-ready async scraper with:
    - Configurable concurrency and delays
    - Automatic retry with exponential backoff
    - Proxy support (ThorData or any HTTP proxy)
    - Progress tracking
    - Per-domain rate limiting
    """

    def __init__(self, config: ScraperConfig = None):
        self.config = config or ScraperConfig()
        self._semaphore = asyncio.Semaphore(self.config.max_concurrent)
        self._domain_semaphores: dict[str, asyncio.Semaphore] = {}
        self._stats = {"total": 0, "completed": 0, "errors": 0}

    def _get_headers(self) -> dict:
        return {
            "User-Agent": random.choice(self.config.user_agents),
            "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
        }

    def _get_domain_semaphore(self, url: str) -> asyncio.Semaphore:
        domain = urlparse(url).netloc
        if domain not in self._domain_semaphores:
            self._domain_semaphores[domain] = asyncio.Semaphore(
                max(1, self.config.max_concurrent // 3)  # 1/3 of total per domain
            )
        return self._domain_semaphores[domain]

    async def fetch(self, client: httpx.AsyncClient, url: str) -> dict:
        """Fetch a single URL with retry logic."""
        domain_sem = self._get_domain_semaphore(url)

        async with self._semaphore:
            async with domain_sem:
                await asyncio.sleep(
                    random.uniform(self.config.min_delay, self.config.max_delay)
                )

                for attempt in range(self.config.max_retries):
                    try:
                        resp = await client.get(
                            url,
                            headers=self._get_headers(),
                            follow_redirects=True,
                        )

                        if resp.status_code == 429:
                            retry_after = int(resp.headers.get("Retry-After", 60))
                            await asyncio.sleep(retry_after)
                            continue

                        resp.raise_for_status()
                        self._stats["completed"] += 1
                        return {"url": url, "status": resp.status_code,
                                "html": resp.text, "attempt": attempt + 1}

                    except httpx.HTTPStatusError as e:
                        if e.response.status_code in (404, 410, 403):
                            self._stats["errors"] += 1
                            return {"url": url,
                                    "error": f"HTTP {e.response.status_code}",
                                    "permanent": True}
                        if attempt < self.config.max_retries - 1:
                            await asyncio.sleep(2 ** attempt)
                        last_error = str(e)

                    except Exception as e:
                        if attempt < self.config.max_retries - 1:
                            await asyncio.sleep(2 ** attempt)
                        last_error = str(e)

                self._stats["errors"] += 1
                return {"url": url, "error": last_error}

    async def scrape(
        self,
        urls: list[str],
        callback: Optional[Callable] = None,
    ) -> list[dict]:
        """
        Scrape a list of URLs concurrently.

        Args:
            urls: List of URLs to scrape
            callback: Optional async function called with each result as it arrives

        Returns:
            List of result dicts with 'url', 'html' or 'error'
        """
        self._stats = {"total": len(urls), "completed": 0, "errors": 0}

        client_kwargs = {
            "timeout": self.config.timeout,
        }
        if self.config.proxy_url:
            client_kwargs["proxies"] = self.config.proxy_url

        async with httpx.AsyncClient(**client_kwargs) as client:
            if callback:
                # Process results as they come in
                results = []
                tasks = [self.fetch(client, url) for url in urls]
                for coro in asyncio.as_completed(tasks):
                    result = await coro
                    await callback(result)
                    results.append(result)
                return results
            else:
                tasks = [self.fetch(client, url) for url in urls]
                return await asyncio.gather(*tasks, return_exceptions=True)

    def get_stats(self) -> dict:
        return self._stats.copy()


# Usage example
async def main():
    config = ScraperConfig(
        max_concurrent=10,
        min_delay=2.0,
        max_delay=5.0,
        max_retries=3,
        # Uncomment to use ThorData proxies:
        # proxy_url="http://user:[email protected]:9000",
    )

    scraper = AsyncScraper(config)

    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/uuid",
    ] * 5

    start = time.perf_counter()
    results = await scraper.scrape(urls)
    elapsed = time.perf_counter() - start

    stats = scraper.get_stats()
    print(f"Scraped {stats['total']} URLs in {elapsed:.1f}s")
    print(f"Success: {stats['completed']}, Errors: {stats['errors']}")


asyncio.run(main())

Common Pitfalls and How to Avoid Them {#pitfalls}

1. Not Limiting Concurrency

# DON'T — fires all tasks simultaneously
tasks = [fetch(client, url) for url in ten_thousand_urls]
results = await asyncio.gather(*tasks)  # 10,000 simultaneous connections!

# DO — use a semaphore
semaphore = asyncio.Semaphore(10)
tasks = [fetch_with_semaphore(client, url, semaphore) for url in ten_thousand_urls]
results = await asyncio.gather(*tasks)

2. Creating a New Client Per Request

# DON'T — creates a new connection every time, bypasses connection pooling
async def fetch(url):
    async with httpx.AsyncClient() as client:  # new client each call!
        return await client.get(url)

# DO — share the client
async def scrape(urls):
    async with httpx.AsyncClient() as client:  # one client for all
        tasks = [fetch_shared(client, url) for url in urls]
        return await asyncio.gather(*tasks)

3. Not Handling Exceptions in gather()

# DON'T — one failure cancels the batch
results = await asyncio.gather(*tasks)  # raises on first exception

# DO
results = await asyncio.gather(*tasks, return_exceptions=True)
good = [r for r in results if not isinstance(r, Exception)]

4. Blocking the Event Loop with CPU Work

# DON'T — heavy parsing blocks other coroutines
async def fetch_and_parse(url):
    resp = await client.get(url)
    # This regex/parsing runs for 500ms and blocks EVERYTHING
    data = heavy_parse(resp.text)

# DO — run CPU work in executor
import asyncio

async def fetch_and_parse(url):
    resp = await client.get(url)
    loop = asyncio.get_event_loop()
    data = await loop.run_in_executor(None, heavy_parse, resp.text)

5. Ignoring Connection Limits

# Set explicit limits to avoid file descriptor exhaustion
async with httpx.AsyncClient(
    limits=httpx.Limits(
        max_connections=100,
        max_keepalive_connections=20,
        keepalive_expiry=30,
    )
) as client:
    # Now httpx won't open more than 100 connections
    ...

Summary

Async scraping is one of those rare cases where a small code change delivers an order-of-magnitude performance improvement.

The core recipe: 1. Use asyncio + httpx.AsyncClient for concurrent HTTP — it's the modern Python standard 2. Always use asyncio.Semaphore to cap concurrency at 5-20 for most targets 3. Prefer gather(return_exceptions=True) for scraping — you want partial results 4. Use TaskGroup when every task must succeed 5. Add jitter to delays to avoid synchronized bursts 6. Rotate proxies when scraping concurrently to distribute IP load — ThorData is a solid residential option 7. Monitor with stats and circuit breakers for long-running jobs

Once you've seen 100 URLs resolve in 5 seconds instead of 100, there's no going back to sequential requests. The same patterns apply to any IO-bound work: API calls, database queries, file I/O.