How to Structure a Web Scraping Project in Python (2026)
How to Structure a Web Scraping Project in Python (2026)
There is an enormous gap between a working scraping script and a scraping project that survives contact with production. Most tutorials close at the first gap. They show you how to get data out of a website once, on a good day, from your laptop, using a single IP address. They don't show you what to do when the site changes its HTML on a Saturday morning, when your IP gets blocked at 3am, when a spider starts returning empty results without throwing an exception, when you need to add three new target sites, or when a colleague needs to run your scraper on their machine and the output looks completely different.
This guide is about everything after "getting it to work." It's about structuring scraping projects so they're maintainable, debuggable, and observable — so that when something breaks (and it will break), you know exactly where to look. It's about building systems that can run unattended, recover from failures, rotate through proxy pools automatically, adapt when websites fight back, and produce consistent output that downstream pipelines can trust.
The patterns here come from projects that have been running in production for months or years. Some scrape hundreds of pages per day, some scrape millions. The architecture scales across both, and the problems it solves are universal: configuration drift, environment inconsistency, silent failures, IP bans, and the slow decay of selectors that happens when websites evolve.
If you're just starting out, this might feel like overkill for a script that runs once. But these patterns are cheap to put in place at the beginning and extremely expensive to retrofit later. Start with the structure, and every spider you add becomes easier than the last.
Core Principles Before Any Code
Four principles underpin every architectural decision in this guide:
Separate fetching from parsing from storing. These are three different concerns with three different failure modes. Fetch failures are network issues. Parse failures are site structure changes. Storage failures are infrastructure issues. Keeping them separate means you can diagnose and fix each independently.
Fail loudly or not at all. Silent failures — where a spider returns zero results because a selector stopped matching, but exits with status code 0 and no error message — are the worst possible failure mode for a scraping system. Every spider should either produce expected output or raise a clear exception.
Make configuration external. Everything that might change between environments (proxy URLs, rate limits, output paths, concurrency, target URLs) lives in environment variables or a config file that doesn't get committed to git. Never hardcode these values.
Log everything, observe everything. You can't debug what you can't see. Log request timing, response sizes, error types, retry counts, and items yielded. When something goes wrong, your logs should tell the story.
The Folder Layout
my_scraper/
├── spiders/
│ ├── __init__.py
│ ├── base.py # Base spider class with shared logic
│ ├── amazon.py
│ ├── linkedin.py
│ └── reddit.py
├── pipelines/
│ ├── __init__.py
│ ├── clean.py # Field normalization and validation
│ ├── deduplicate.py # Deduplication by URL or content hash
│ ├── export.py # Output to JSONL, CSV, SQLite, Postgres
│ └── notify.py # Alerts on errors or completions
├── utils/
│ ├── __init__.py
│ ├── http.py # HTTP client factory with proxy support
│ ├── parsing.py # Shared HTML/JSON parsing helpers
│ ├── retry.py # Retry decorators and backoff logic
│ ├── rate_limit.py # Adaptive rate limiting
│ └── proxy.py # Proxy pool management
├── anti_detect/
│ ├── __init__.py
│ ├── headers.py # Realistic header generation
│ ├── fingerprint.py # Browser fingerprint simulation
│ └── timing.py # Human-like timing patterns
├── config/
│ ├── __init__.py
│ └── settings.py # All configuration via env vars
├── tests/
│ ├── test_spiders.py
│ ├── test_pipelines.py
│ └── fixtures/ # Saved HTML for offline testing
├── data/
│ ├── raw/ # Raw fetched HTML (optional, for debugging)
│ └── processed/ # Clean output files
├── logs/
├── main.py # Entry point and spider orchestration
├── scheduler.py # Cron-like scheduling logic
├── requirements.txt
├── pyproject.toml # Project metadata and tool config
├── .env.example # Template for environment variables
└── .gitignore # Excludes .env, data/, logs/
This isn't arbitrary nesting. Each directory has a single responsibility and a clear boundary. The spiders/ directory contains nothing but fetching logic. Anything that transforms data lives in pipelines/. Anything that's reused across spiders lives in utils/. The anti_detect/ module is separate because it evolves independently — you update it when target sites change their detection methods, not when spider selectors change.
Configuration via Environment Variables
Hard-coding anything that might change is how you end up tracking down values scattered across 15 files when you need to change your proxy provider:
# config/settings.py
import os
import logging
from typing import Optional
from dotenv import load_dotenv
load_dotenv()
# Core scraping behavior
CONCURRENCY: int = int(os.getenv("SCRAPER_CONCURRENCY", "3"))
REQUEST_DELAY_MIN: float = float(os.getenv("REQUEST_DELAY_MIN", "2.5"))
REQUEST_DELAY_MAX: float = float(os.getenv("REQUEST_DELAY_MAX", "6.0"))
MAX_RETRIES: int = int(os.getenv("MAX_RETRIES", "4"))
REQUEST_TIMEOUT: int = int(os.getenv("REQUEST_TIMEOUT", "30"))
# Output configuration
OUTPUT_FORMAT: str = os.getenv("OUTPUT_FORMAT", "jsonl") # jsonl | csv | sqlite | postgres
OUTPUT_DIR: str = os.getenv("OUTPUT_DIR", "data/processed")
DB_PATH: str = os.getenv("DB_PATH", "data/scraper.db")
DATABASE_URL: Optional[str] = os.getenv("DATABASE_URL") # For Postgres
# Proxy configuration
PROXY_URL: Optional[str] = os.getenv("PROXY_URL")
PROXY_USERNAME: Optional[str] = os.getenv("PROXY_USERNAME")
PROXY_PASSWORD: Optional[str] = os.getenv("PROXY_PASSWORD")
PROXY_HOST: str = os.getenv("PROXY_HOST", "proxy.thordata.com")
PROXY_PORT: int = int(os.getenv("PROXY_PORT", "9000"))
PROXY_COUNTRY: str = os.getenv("PROXY_COUNTRY", "US")
# Logging
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
LOG_FILE: Optional[str] = os.getenv("LOG_FILE", "logs/scraper.log")
# Notifications
ALERT_WEBHOOK: Optional[str] = os.getenv("ALERT_WEBHOOK") # Slack/Discord webhook
ALERT_EMAIL: Optional[str] = os.getenv("ALERT_EMAIL")
# Target-specific settings
AMAZON_DELAY_MIN: float = float(os.getenv("AMAZON_DELAY_MIN", "4.0"))
LINKEDIN_DELAY_MIN: float = float(os.getenv("LINKEDIN_DELAY_MIN", "6.0"))
def validate() -> None:
"""Validate required settings at startup."""
if CONCURRENCY < 1:
raise ValueError("SCRAPER_CONCURRENCY must be >= 1")
if REQUEST_DELAY_MIN > REQUEST_DELAY_MAX:
raise ValueError("REQUEST_DELAY_MIN must be <= REQUEST_DELAY_MAX")
if OUTPUT_FORMAT not in ("jsonl", "csv", "sqlite", "postgres"):
raise ValueError(f"Unknown OUTPUT_FORMAT: {OUTPUT_FORMAT}")
if OUTPUT_FORMAT == "postgres" and not DATABASE_URL:
raise ValueError("DATABASE_URL required when OUTPUT_FORMAT=postgres")
Your .env.example file (committed to git) documents every variable:
# .env.example — copy to .env and fill in values
# Concurrency and timing
SCRAPER_CONCURRENCY=3
REQUEST_DELAY_MIN=2.5
REQUEST_DELAY_MAX=6.0
MAX_RETRIES=4
# Proxy (ThorData residential)
PROXY_USERNAME=your_username
PROXY_PASSWORD=your_password
PROXY_HOST=proxy.thordata.com
PROXY_PORT=9000
PROXY_COUNTRY=US
# Output
OUTPUT_FORMAT=jsonl
OUTPUT_DIR=data/processed
DB_PATH=data/scraper.db
# Logging
LOG_LEVEL=INFO
LOG_FILE=logs/scraper.log
Centralized Logging Setup
# main.py (logging setup)
import logging
import sys
from pathlib import Path
from config.settings import LOG_LEVEL, LOG_FILE
def setup_logging() -> logging.Logger:
"""Configure structured logging for the entire project."""
Path("logs").mkdir(exist_ok=True)
log_level = getattr(logging, LOG_LEVEL.upper(), logging.INFO)
formatter = logging.Formatter(
fmt="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handlers = [logging.StreamHandler(sys.stdout)]
if LOG_FILE:
handlers.append(logging.FileHandler(LOG_FILE, encoding="utf-8"))
# Configure root logger
logging.basicConfig(
level=log_level,
handlers=handlers,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
# Quieter third-party loggers
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("playwright").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
return logging.getLogger("scraper")
# In every module:
logger = logging.getLogger(__name__)
# This gives you "spiders.amazon", "utils.http", etc. — makes log source obvious
HTTP Client Factory with Proxy Support
The HTTP client should be a centralized concern, not created ad-hoc in each spider:
# utils/http.py
import httpx
import requests
import random
import logging
from typing import Optional, Dict
from config.settings import (
PROXY_URL, PROXY_USERNAME, PROXY_PASSWORD,
PROXY_HOST, PROXY_PORT, PROXY_COUNTRY,
REQUEST_TIMEOUT,
)
logger = logging.getLogger(__name__)
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
]
def build_base_headers(referer: Optional[str] = None) -> Dict[str, str]:
"""Generate realistic browser headers."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin" if referer else "none",
"Sec-Fetch-User": "?1",
}
if referer:
headers["Referer"] = referer
return headers
def get_proxy_url(sticky_session: Optional[str] = None) -> Optional[str]:
"""
Build proxy URL from configuration.
If sticky_session is provided, uses ThorData session pinning for consistent exit IP.
"""
if PROXY_URL:
return PROXY_URL
if not (PROXY_USERNAME and PROXY_PASSWORD):
return None
if sticky_session:
user = f"{PROXY_USERNAME}-country-{PROXY_COUNTRY}-session-{sticky_session}"
else:
user = f"{PROXY_USERNAME}-country-{PROXY_COUNTRY}"
return f"http://{user}:{PROXY_PASSWORD}@{PROXY_HOST}:{PROXY_PORT}"
def make_httpx_client(
sticky_session: Optional[str] = None,
extra_headers: Optional[Dict] = None,
) -> httpx.Client:
"""Create a configured httpx client, optionally with proxy."""
headers = build_base_headers()
if extra_headers:
headers.update(extra_headers)
kwargs = {
"headers": headers,
"timeout": httpx.Timeout(REQUEST_TIMEOUT),
"follow_redirects": True,
}
proxy = get_proxy_url(sticky_session)
if proxy:
kwargs["proxy"] = proxy
logger.debug(f"Using proxy: {proxy[:50]}...")
return httpx.Client(**kwargs)
def make_requests_session(
sticky_session: Optional[str] = None,
warm_up_url: Optional[str] = None,
) -> requests.Session:
"""Create a requests Session, optionally with warm-up request for cookie establishment."""
session = requests.Session()
session.headers.update(build_base_headers())
proxy = get_proxy_url(sticky_session)
if proxy:
session.proxies = {"http": proxy, "https": proxy}
if warm_up_url:
try:
resp = session.get(warm_up_url, timeout=REQUEST_TIMEOUT)
logger.debug(f"Session warmed up: {warm_up_url} -> {resp.status_code}")
except Exception as e:
logger.warning(f"Warm-up failed: {e}")
return session
Proxy Pool Management with ThorData
For any serious scraping operation, proxy rotation is infrastructure, not an afterthought. ThorData's residential proxy network provides ISP-assigned IP addresses that pass reputation scoring at sites that aggressively block datacenter ranges:
# utils/proxy.py
import threading
import random
import time
import logging
from typing import Optional
from config.settings import PROXY_USERNAME, PROXY_PASSWORD, PROXY_HOST, PROXY_PORT, PROXY_COUNTRY
logger = logging.getLogger(__name__)
class ProxyPool:
"""
Manages ThorData proxy sessions for web scraping.
Supports two modes:
- Rotating: new IP on every request (for high-volume, stateless scraping)
- Sticky: same IP for a duration (for multi-page workflows, login sessions)
ThorData sticky sessions persist for the configured duration,
letting you complete a full profile or paginated result set
on one IP before rotating to a fresh one for the next target.
"""
def __init__(
self,
username: str = PROXY_USERNAME or "",
password: str = PROXY_PASSWORD or "",
host: str = PROXY_HOST,
port: int = PROXY_PORT,
country: str = PROXY_COUNTRY,
):
self.username = username
self.password = password
self.host = host
self.port = port
self.country = country
self._sticky_id: Optional[str] = None
self._sticky_created: float = 0
self._lock = threading.Lock()
self._stats = {"requests": 0, "successes": 0, "failures": 0, "rotations": 0}
def rotating(self) -> Optional[str]:
"""Get a rotating proxy URL. New exit IP on every connection."""
if not (self.username and self.password):
return None
return f"http://{self.username}-country-{self.country}:{self.password}@{self.host}:{self.port}"
def sticky(self, ttl_minutes: int = 10) -> Optional[str]:
"""
Get a sticky proxy URL. Same exit IP for up to ttl_minutes.
Automatically renews the session when TTL expires.
"""
if not (self.username and self.password):
return None
with self._lock:
now = time.time()
if not self._sticky_id or (now - self._sticky_created) > ttl_minutes * 60:
self._sticky_id = f"sess{random.randint(100000, 999999)}"
self._sticky_created = now
logger.debug(f"New sticky session: {self._sticky_id}")
return (
f"http://{self.username}-country-{self.country}-session-{self._sticky_id}"
f":{self.password}@{self.host}:{self.port}"
)
def rotate(self) -> None:
"""Force rotation on the next sticky() call."""
with self._lock:
self._sticky_id = None
self._stats["rotations"] += 1
logger.debug("Proxy rotated")
def record(self, success: bool) -> None:
with self._lock:
self._stats["requests"] += 1
if success:
self._stats["successes"] += 1
else:
self._stats["failures"] += 1
@property
def stats(self) -> dict:
return dict(self._stats)
def is_configured(self) -> bool:
return bool(self.username and self.password)
# Global proxy pool instance
proxy_pool = ProxyPool()
The Retry Decorator Pattern
Network requests fail. Build retry logic once and apply it everywhere:
# utils/retry.py
import time
import random
import logging
from functools import wraps
from typing import Callable, Type, Tuple, Optional
from config.settings import MAX_RETRIES
logger = logging.getLogger(__name__)
def retry_with_backoff(
max_attempts: int = MAX_RETRIES,
base_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 60.0,
jitter: float = 1.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
on_retry: Optional[Callable] = None,
):
"""
Decorator: retry on specified exceptions with exponential backoff + jitter.
Args:
max_attempts: Total attempts before re-raising
base_delay: Initial delay in seconds
backoff_factor: Multiplier applied per retry
max_delay: Maximum delay cap
jitter: Random jitter added to each delay
exceptions: Exception types that trigger retry
on_retry: Optional callback(attempt, exception) called before each retry
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
result = func(*args, **kwargs)
if attempt > 1:
logger.info(f"{func.__name__} succeeded on attempt {attempt}")
return result
except exceptions as e:
last_exc = e
if attempt == max_attempts:
logger.error(
f"{func.__name__} failed after {max_attempts} attempts: {type(e).__name__}: {e}"
)
raise
delay = min(
base_delay * (backoff_factor ** (attempt - 1)) + random.uniform(0, jitter),
max_delay,
)
logger.warning(
f"{func.__name__} attempt {attempt}/{max_attempts} failed "
f"({type(e).__name__}), retrying in {delay:.1f}s"
)
if on_retry:
on_retry(attempt, e)
time.sleep(delay)
raise last_exc
return wrapper
return decorator
def http_retry(max_attempts: int = 4):
"""Retry decorator configured for HTTP operations."""
import httpx, requests
return retry_with_backoff(
max_attempts=max_attempts,
base_delay=2.0,
backoff_factor=2.5,
max_delay=120.0,
jitter=2.0,
exceptions=(
httpx.RequestError,
httpx.HTTPStatusError,
requests.RequestException,
ConnectionError,
TimeoutError,
),
)
def parse_retry(max_attempts: int = 3):
"""Retry decorator for parsing operations (shorter delays)."""
return retry_with_backoff(
max_attempts=max_attempts,
base_delay=0.5,
backoff_factor=2.0,
max_delay=10.0,
exceptions=(AttributeError, TypeError, KeyError, IndexError),
)
Adaptive Rate Limiting
Static delays are wasteful when the site is happy and too aggressive when it's stressed:
# utils/rate_limit.py
import time
import random
import threading
import logging
from config.settings import REQUEST_DELAY_MIN, REQUEST_DELAY_MAX
logger = logging.getLogger(__name__)
class AdaptiveRateLimiter:
"""
Rate limiter that adapts to server response patterns.
- Speeds up gradually after sustained success (floor: min_delay)
- Backs off exponentially on 429/503/CAPTCHA signals (ceiling: max_delay)
- Adds random jitter to avoid synchronized storms from parallel spiders
"""
def __init__(
self,
min_delay: float = REQUEST_DELAY_MIN,
max_delay: float = 60.0,
speedup_after: int = 10,
backoff_factor: float = 2.5,
speedup_factor: float = 0.85,
):
self.min_delay = min_delay
self.max_delay = max_delay
self.current_delay = min_delay
self._success_streak = 0
self._speedup_after = speedup_after
self._backoff_factor = backoff_factor
self._speedup_factor = speedup_factor
self._lock = threading.Lock()
def wait(self) -> None:
with self._lock:
jitter = random.gauss(0, 0.3)
delay = max(self.min_delay * 0.5, self.current_delay + jitter)
time.sleep(delay)
def success(self) -> None:
with self._lock:
self._success_streak += 1
if self._success_streak >= self._speedup_after:
old = self.current_delay
self.current_delay = max(self.min_delay, self.current_delay * self._speedup_factor)
self._success_streak = 0
def throttle(self, extra_wait: float = 0) -> None:
with self._lock:
old = self.current_delay
self.current_delay = min(self.max_delay, self.current_delay * self._backoff_factor)
self._success_streak = 0
logger.warning(f"Rate limiter backed off: {old:.1f}s -> {self.current_delay:.1f}s")
if extra_wait > 0:
time.sleep(extra_wait)
def captcha(self) -> None:
self.throttle(extra_wait=random.uniform(20, 60))
Anti-Detection Module
# anti_detect/headers.py
import random
from typing import Dict, Optional
BROWSER_PROFILES = [
{
"name": "chrome_windows",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec_ch_ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
"sec_ch_ua_platform": '"Windows"',
"sec_ch_ua_mobile": "?0",
"accept_language": "en-US,en;q=0.9",
},
{
"name": "chrome_mac",
"user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"sec_ch_ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
"sec_ch_ua_platform": '"macOS"',
"sec_ch_ua_mobile": "?0",
"accept_language": "en-US,en;q=0.9,en-GB;q=0.8",
},
{
"name": "firefox_windows",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
"sec_ch_ua": None,
"sec_ch_ua_platform": None,
"sec_ch_ua_mobile": None,
"accept_language": "en-US,en;q=0.5",
},
]
class HeaderGenerator:
"""Generate internally-consistent browser headers for a scraping session."""
def __init__(self, profile_name: Optional[str] = None):
if profile_name:
matching = [p for p in BROWSER_PROFILES if p["name"] == profile_name]
self._profile = matching[0] if matching else random.choice(BROWSER_PROFILES)
else:
self._profile = random.choice(BROWSER_PROFILES)
def get(
self,
url: str,
referer: Optional[str] = None,
is_xhr: bool = False,
) -> Dict[str, str]:
"""Generate headers for a specific request."""
if is_xhr:
accept = "application/json, text/plain, */*"
fetch_dest = "empty"
fetch_mode = "cors"
else:
accept = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8"
fetch_dest = "document"
fetch_mode = "navigate"
headers: Dict[str, str] = {
"User-Agent": self._profile["user_agent"],
"Accept": accept,
"Accept-Language": self._profile["accept_language"],
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": fetch_dest,
"Sec-Fetch-Mode": fetch_mode,
"Sec-Fetch-Site": "same-origin" if referer else "none",
}
if not is_xhr:
headers["Upgrade-Insecure-Requests"] = "1"
headers["Sec-Fetch-User"] = "?1"
if self._profile.get("sec_ch_ua"):
headers["Sec-CH-UA"] = self._profile["sec_ch_ua"]
headers["Sec-CH-UA-Mobile"] = self._profile["sec_ch_ua_mobile"]
headers["Sec-CH-UA-Platform"] = self._profile["sec_ch_ua_platform"]
if referer:
headers["Referer"] = referer
return headers
# anti_detect/timing.py
import time
import random
class HumanTimingSimulator:
"""
Simulate human browsing timing patterns.
Humans don't wait exactly N seconds between pages.
They vary based on reading time, distractions, and page complexity.
This variation is a meaningful signal — perfectly regular timing is a bot marker.
"""
@staticmethod
def between_pages(category: str = "normal") -> float:
"""
Wait between navigation events.
Categories:
- fast: Quick browsing (1-3s)
- normal: Typical reading (3-7s)
- slow: Careful reading/distracted (6-15s)
- very_slow: Long form content or inattentive (10-30s)
"""
ranges = {
"fast": (1.0, 3.0),
"normal": (3.0, 7.0),
"slow": (6.0, 15.0),
"very_slow": (10.0, 30.0),
}
lo, hi = ranges.get(category, ranges["normal"])
alpha, beta_val = 2, 3
raw = random.betavariate(alpha, beta_val)
return lo + (hi - lo) * raw
@staticmethod
def wait(category: str = "normal") -> None:
delay = HumanTimingSimulator.between_pages(category)
time.sleep(delay)
The Base Spider Class
All spiders inherit from a common base that provides HTTP, rate limiting, and proxy handling:
# spiders/base.py
import logging
import time
import random
from abc import ABC, abstractmethod
from typing import Iterator, Dict, Any, Optional
from utils.http import make_requests_session
from utils.rate_limit import AdaptiveRateLimiter
from utils.proxy import proxy_pool
from utils.retry import http_retry
from anti_detect.headers import HeaderGenerator
from anti_detect.timing import HumanTimingSimulator
import requests
logger = logging.getLogger(__name__)
class BaseSpider(ABC):
"""
Abstract base for all spiders.
Provides:
- Configured HTTP client with proxy support
- Adaptive rate limiting
- Retry logic via decorator
- Standard request/response logging
- CAPTCHA and block detection
- Item counting and progress reporting
"""
name: str = "base"
start_url: Optional[str] = None
use_sticky_proxy: bool = True
proxy_ttl_minutes: int = 10
CAPTCHA_SIGNALS = ["captcha", "cf-challenge", "unusual traffic", "access denied", "ray id"]
BLOCK_SIGNALS = ["403 forbidden", "ip blocked", "banned", "your ip has been"]
def __init__(self):
self._rate_limiter = AdaptiveRateLimiter()
self._header_gen = HeaderGenerator()
self._timer = HumanTimingSimulator()
self._items_yielded = 0
self._requests_made = 0
self._errors = 0
self._session_id = f"{self.name}_{random.randint(10000, 99999)}"
self._session = make_requests_session(
warm_up_url=self.start_url,
)
logger.info(f"Spider {self.name!r} initialized")
def is_blocked(self, html: str, status: int) -> str:
"""Returns: 'ok', 'captcha', 'blocked', 'rate_limit'"""
if status == 429:
return "rate_limit"
if status == 403:
return "blocked"
lower = html.lower()
if any(s in lower for s in self.CAPTCHA_SIGNALS):
return "captcha"
if any(s in lower for s in self.BLOCK_SIGNALS):
return "blocked"
return "ok"
@http_retry(max_attempts=4)
def fetch(self, url: str, referer: Optional[str] = None, params: Optional[Dict] = None) -> str:
"""Fetch a URL and return the response text."""
self._rate_limiter.wait()
headers = self._header_gen.get(url, referer=referer)
self._session.headers.update(headers)
resp = self._session.get(url, params=params, timeout=30)
self._requests_made += 1
block_type = self.is_blocked(resp.text, resp.status_code)
if block_type == "rate_limit":
retry_after = int(resp.headers.get("Retry-After", 60))
self._rate_limiter.throttle(extra_wait=retry_after)
proxy_pool.rotate()
raise requests.RequestException("Rate limited")
if block_type == "captcha":
self._rate_limiter.captcha()
proxy_pool.rotate()
raise requests.RequestException("CAPTCHA detected")
if block_type == "blocked":
self._rate_limiter.throttle(extra_wait=30)
proxy_pool.rotate()
raise requests.RequestException("IP blocked")
resp.raise_for_status()
self._rate_limiter.success()
logger.debug(f"Fetched {url} -> {resp.status_code} ({len(resp.content)}B)")
return resp.text
@abstractmethod
def parse(self, html: str, url: str) -> Iterator[Dict[str, Any]]:
pass
def run(self) -> Iterator[Dict[str, Any]]:
if not self.start_url:
raise NotImplementedError("Either set start_url or override run()")
html = self.fetch(self.start_url)
yield from self.parse(html, self.start_url)
@property
def stats(self) -> Dict[str, int]:
return {"requests": self._requests_made, "items": self._items_yielded, "errors": self._errors}
Playwright Integration for JavaScript-Heavy Sites
For sites that require full browser execution — SPAs, heavy JS renderers, sites with bot challenges:
# utils/playwright_runner.py
import asyncio
import random
import logging
from typing import Optional, Dict, Any, List
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
from utils.proxy import proxy_pool
logger = logging.getLogger(__name__)
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
]
async def make_stealth_browser(proxy_url: Optional[str] = None) -> Browser:
"""Launch Playwright browser with anti-detection measures."""
playwright = await async_playwright().start()
args = [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
"--window-size=1920,1080",
"--lang=en-US",
]
kwargs: Dict[str, Any] = {"headless": True, "args": args}
if proxy_url:
from urllib.parse import urlparse
parsed = urlparse(proxy_url)
kwargs["proxy"] = {
"server": f"http://{parsed.hostname}:{parsed.port}",
"username": parsed.username or "",
"password": parsed.password or "",
}
return await playwright.chromium.launch(**kwargs)
async def make_stealth_context(browser: Browser) -> BrowserContext:
"""Create browser context with realistic fingerprint."""
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
locale="en-US",
timezone_id="America/New_York",
user_agent=random.choice(USER_AGENTS),
)
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', { get: () => ({length: 3}) });
Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });
window.chrome = { runtime: {} };
Object.defineProperty(navigator, 'hardwareConcurrency', { get: () => 8 });
""")
return context
class PlaywrightScraper:
"""
Base class for Playwright-based spiders.
Use when target requires JavaScript rendering or bot challenges.
"""
def __init__(self, proxy_url: Optional[str] = None):
self.proxy_url = proxy_url or proxy_pool.sticky()
async def scrape(self, url: str) -> str:
"""Fetch page HTML after full JS execution."""
browser = await make_stealth_browser(self.proxy_url)
try:
context = await make_stealth_context(browser)
page = await context.new_page()
await page.goto(url, wait_until="networkidle")
await asyncio.sleep(random.uniform(1.5, 3.0))
return await page.content()
finally:
await browser.close()
Pipeline Layer
# pipelines/export.py
import json
import sqlite3
import logging
from pathlib import Path
from typing import Dict, Any
from datetime import datetime
from config.settings import OUTPUT_FORMAT, OUTPUT_DIR, DB_PATH
logger = logging.getLogger(__name__)
class JsonlExporter:
"""Export items to newline-delimited JSON."""
def __init__(self, filename: str):
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
self.path = Path(OUTPUT_DIR) / filename
self._file = None
self._count = 0
def __enter__(self):
self._file = open(self.path, "w", encoding="utf-8")
return self
def __exit__(self, *args):
if self._file:
self._file.close()
logger.info(f"Exported {self._count} items to {self.path}")
def write(self, item: Dict[str, Any]) -> None:
item["_scraped_at"] = datetime.utcnow().isoformat()
self._file.write(json.dumps(item, ensure_ascii=False, default=str) + "\n")
self._count += 1
if self._count % 100 == 0:
self._file.flush()
class SqliteExporter:
"""Export items to SQLite, auto-creating table from first item's schema."""
def __init__(self, table: str, db_path: str = DB_PATH):
self.table = table
self.db_path = db_path
self._conn = None
self._columns = None
self._count = 0
def __enter__(self):
self._conn = sqlite3.connect(self.db_path)
self._conn.execute("PRAGMA journal_mode=WAL")
return self
def __exit__(self, *args):
if self._conn:
self._conn.commit()
self._conn.close()
logger.info(f"Saved {self._count} items to {self.db_path}:{self.table}")
def write(self, item: Dict[str, Any]) -> None:
item["scraped_at"] = datetime.utcnow().isoformat()
if self._columns is None:
self._columns = list(item.keys())
cols_def = ", ".join(f'"{c}" TEXT' for c in self._columns)
self._conn.execute(
f'CREATE TABLE IF NOT EXISTS "{self.table}" (id INTEGER PRIMARY KEY AUTOINCREMENT, {cols_def})'
)
self._conn.commit()
placeholders = ", ".join("?" * len(self._columns))
col_names = ", ".join(f'"{c}"' for c in self._columns)
values = [str(item.get(c, "")) for c in self._columns]
self._conn.execute(
f'INSERT INTO "{self.table}" ({col_names}) VALUES ({placeholders})',
values,
)
self._count += 1
if self._count % 50 == 0:
self._conn.commit()
def get_exporter(spider_name: str):
"""Factory: return the appropriate exporter based on OUTPUT_FORMAT."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if OUTPUT_FORMAT == "jsonl":
return JsonlExporter(f"{spider_name}_{timestamp}.jsonl")
elif OUTPUT_FORMAT == "sqlite":
return SqliteExporter(table=spider_name)
else:
raise ValueError(f"Unsupported OUTPUT_FORMAT: {OUTPUT_FORMAT}")
Deduplication and Validation Pipeline
# pipelines/clean.py
import hashlib
import logging
import re
from typing import Dict, Any, Optional, Set
logger = logging.getLogger(__name__)
class Deduplicator:
"""Track seen items by fingerprint to avoid writing duplicates."""
def __init__(self, key_fields: list):
self.key_fields = key_fields
self._seen: Set[str] = set()
self._total = 0
self._dupes = 0
def is_duplicate(self, item: Dict[str, Any]) -> bool:
self._total += 1
fingerprint = hashlib.md5(
"|".join(str(item.get(f, "")) for f in self.key_fields).encode()
).hexdigest()
if fingerprint in self._seen:
self._dupes += 1
return True
self._seen.add(fingerprint)
return False
@property
def stats(self):
return {"total": self._total, "duplicates": self._dupes, "unique": self._total - self._dupes}
def validate_item(item: Dict[str, Any], required_fields: list) -> Optional[Dict]:
"""Validate item has required fields with non-empty values."""
for field in required_fields:
val = item.get(field)
if val is None or (isinstance(val, str) and not val.strip()):
return None
return item
def clean_text(text: str) -> str:
"""Normalize whitespace and remove control characters."""
text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
Real-World Use Cases with Code
Use Case 1: Multi-Spider Orchestration
# main.py
import argparse
import logging
import sys
import time
from config.settings import validate
from utils.proxy import proxy_pool
def setup_logging():
from config.settings import LOG_LEVEL, LOG_FILE
from pathlib import Path
Path("logs").mkdir(exist_ok=True)
handlers = [logging.StreamHandler(sys.stdout)]
if LOG_FILE:
handlers.append(logging.FileHandler(LOG_FILE, encoding="utf-8"))
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
handlers=handlers,
)
logger = logging.getLogger("main")
def run_spider(spider_name: str, **kwargs) -> dict:
from spiders.reddit import RedditSpider
from pipelines.export import get_exporter
from pipelines.clean import Deduplicator, validate_item, clean_text
SPIDERS = {"reddit": RedditSpider}
spider = SPIDERS[spider_name](**kwargs)
dedup = Deduplicator(key_fields=["url", "permalink"])
with get_exporter(spider_name) as exporter:
for item in spider.run():
for key, val in item.items():
if isinstance(val, str):
item[key] = clean_text(val)
if dedup.is_duplicate(item):
continue
if validate_item(item, required_fields=["title"]) is None:
continue
exporter.write(item)
return {**spider.stats, **dedup.stats, "proxy": proxy_pool.stats}
Use Case 2: Parallel Spider Runner
import concurrent.futures
import threading
def run_spiders_parallel(
spider_configs: list,
max_workers: int = 3,
) -> Dict[str, dict]:
"""
Run multiple spider configurations in parallel.
Each spider gets its own thread, rate limiter, and proxy session.
"""
results = {}
lock = threading.Lock()
def run_one(config: dict) -> None:
name = config.pop("spider")
stats = run_spider(name, **config)
with lock:
results[name] = stats
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(run_one, cfg) for cfg in spider_configs]
concurrent.futures.wait(futures)
return results
# Usage
configs = [
{"spider": "reddit", "subreddit": "python", "pages": 5},
{"spider": "reddit", "subreddit": "datascience", "pages": 5},
{"spider": "reddit", "subreddit": "MachineLearning", "pages": 3},
]
results = run_spiders_parallel(configs, max_workers=2)
Use Case 3: Incremental Scraping with Change Detection
import sqlite3
import hashlib
from datetime import datetime
def scrape_with_change_detection(
spider_name: str,
db_path: str = "data/scraper.db",
**spider_kwargs,
) -> Dict[str, int]:
"""
Only write items that are new or have changed since last run.
Uses content hash to detect updates.
"""
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS seen_items (
fingerprint TEXT PRIMARY KEY,
first_seen TEXT,
last_seen TEXT,
update_count INTEGER DEFAULT 0
)
""")
from spiders.reddit import RedditSpider
SPIDERS = {"reddit": RedditSpider}
spider = SPIDERS[spider_name](**spider_kwargs)
new_count = updated_count = skipped_count = 0
with get_exporter(f"{spider_name}_incremental") as exporter:
for item in spider.run():
# Content hash for change detection
content_hash = hashlib.sha256(
json.dumps(item, sort_keys=True, default=str).encode()
).hexdigest()
existing = conn.execute(
"SELECT fingerprint FROM seen_items WHERE fingerprint=?",
(content_hash,)
).fetchone()
if not existing:
exporter.write(item)
conn.execute(
"INSERT INTO seen_items VALUES (?, ?, ?, 0)",
(content_hash, datetime.utcnow().isoformat(), datetime.utcnow().isoformat()),
)
new_count += 1
else:
skipped_count += 1
conn.commit()
conn.close()
return {"new": new_count, "skipped": skipped_count}
Use Case 4: Error Recovery and Resumable Scraping
import json
from pathlib import Path
class ResumableJobQueue:
"""
Persist scraping jobs to disk so interrupted runs can resume.
Useful for large batch jobs that run for hours.
"""
def __init__(self, queue_path: str = "data/job_queue.jsonl"):
self.queue_path = Path(queue_path)
self.queue_path.parent.mkdir(parents=True, exist_ok=True)
def add_jobs(self, jobs: List[Dict]) -> None:
with open(self.queue_path, "a") as f:
for job in jobs:
f.write(json.dumps({**job, "status": "pending"}) + "\n")
def next_pending(self) -> Optional[Dict]:
jobs = self._load_all()
for job in jobs:
if job.get("status") == "pending":
return job
return None
def mark_done(self, job_id: str) -> None:
self._update_status(job_id, "done")
def mark_failed(self, job_id: str, error: str) -> None:
self._update_status(job_id, "failed", error=error)
def _load_all(self) -> List[Dict]:
if not self.queue_path.exists():
return []
with open(self.queue_path) as f:
return [json.loads(line) for line in f if line.strip()]
def _update_status(self, job_id: str, status: str, **extra) -> None:
jobs = self._load_all()
for job in jobs:
if job.get("id") == job_id:
job["status"] = status
job.update(extra)
with open(self.queue_path, "w") as f:
for job in jobs:
f.write(json.dumps(job) + "\n")
Use Case 5: Output Schema Validation
from dataclasses import dataclass, field, asdict
from typing import Optional, List
@dataclass
class ScrapedItem:
"""
Enforce output schema via dataclass.
Type errors surface as AttributeError at write time, not silently.
"""
url: str
title: str
scraped_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# Optional enrichment fields
description: Optional[str] = None
published_date: Optional[str] = None
author: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_raw(cls, raw: Dict) -> "ScrapedItem":
"""Construct from raw spider output, normalizing types."""
return cls(
url=str(raw.get("url", "")),
title=clean_text(str(raw.get("title", ""))),
description=clean_text(str(raw["description"])) if raw.get("description") else None,
published_date=str(raw.get("date", raw.get("published", ""))),
author=str(raw.get("author", raw.get("by", ""))),
tags=[str(t) for t in raw.get("tags", [])],
)
Use Case 6: Health Monitoring and Alerting
import requests as req_lib
from typing import Optional
class SpiderHealthMonitor:
"""
Monitor spider health and send alerts on anomalies.
Catches silent failures like consistently empty output.
"""
def __init__(self, webhook_url: Optional[str] = None, min_items_per_run: int = 10):
self.webhook_url = webhook_url
self.min_items = min_items_per_run
def check_run(self, spider_name: str, stats: Dict) -> None:
issues = []
if stats.get("items", 0) < self.min_items:
issues.append(f"Low yield: {stats.get('items', 0)} items (min: {self.min_items})")
error_rate = stats.get("errors", 0) / max(1, stats.get("requests", 1))
if error_rate > 0.3:
issues.append(f"High error rate: {error_rate:.0%}")
if issues and self.webhook_url:
self._send_alert(spider_name, issues)
elif issues:
logger.warning(f"Spider {spider_name} health issues: {issues}")
def _send_alert(self, spider_name: str, issues: List[str]) -> None:
message = f"Spider alert: {spider_name}\n" + "\n".join(f"- {i}" for i in issues)
try:
req_lib.post(self.webhook_url, json={"text": message}, timeout=10)
except Exception as e:
logger.error(f"Failed to send alert: {e}")
Use Case 7: Scheduled Production Run
# scheduler.py
import schedule
import time
import logging
from main import setup_logging, run_spider, validate
logger = logging.getLogger("scheduler")
def scheduled_reddit_scrape():
logger.info("Starting scheduled Reddit scrape")
try:
stats = run_spider("reddit", subreddit="python", pages=10)
logger.info(f"Scheduled run complete: {stats}")
except Exception as e:
logger.error(f"Scheduled run failed: {e}")
def main():
setup_logging()
validate()
# Run every 6 hours
schedule.every(6).hours.do(scheduled_reddit_scrape)
# Run immediately on start too
scheduled_reddit_scrape()
logger.info("Scheduler started")
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
main()
Testing Strategy
# tests/test_spiders.py
import pytest
import json
from pathlib import Path
from unittest.mock import patch
FIXTURE_DIR = Path(__file__).parent / "fixtures"
def test_reddit_spider_parses_json(tmp_path):
"""Test Reddit spider parses API JSON correctly."""
from spiders.reddit import RedditSpider
mock_response = json.dumps({
"data": {
"children": [
{"data": {
"title": "Test Post",
"score": 100,
"url": "https://example.com",
"permalink": "/r/test/comments/abc/test_post/",
"author": "testuser",
"num_comments": 5,
"upvote_ratio": 0.95,
"selftext": "",
"created_utc": 1700000000,
"subreddit": "python",
"is_self": False,
"domain": "example.com",
"link_flair_text": None,
}}
],
"after": None,
}
})
spider = RedditSpider(subreddit="python", pages=1)
with patch.object(spider, "fetch", return_value=mock_response):
items = list(spider.run())
assert len(items) == 1
assert items[0]["title"] == "Test Post"
assert items[0]["score"] == 100
def test_deduplicator_catches_duplicates():
from pipelines.clean import Deduplicator
dedup = Deduplicator(key_fields=["url"])
item = {"url": "https://example.com/1", "title": "Test"}
assert not dedup.is_duplicate(item)
assert dedup.is_duplicate(item)
assert dedup.stats["duplicates"] == 1
assert dedup.stats["unique"] == 1
def test_validate_item_rejects_empty():
from pipelines.clean import validate_item
assert validate_item({"title": ""}, ["title"]) is None
assert validate_item({"title": "Hello"}, ["title"]) is not None
def test_adaptive_rate_limiter_backs_off():
from utils.rate_limit import AdaptiveRateLimiter
limiter = AdaptiveRateLimiter(min_delay=1.0)
initial_delay = limiter.current_delay
limiter.throttle()
assert limiter.current_delay > initial_delay
def test_proxy_pool_builds_urls():
from utils.proxy import ProxyPool
pool = ProxyPool(username="user", password="pass", host="proxy.example.com", port=9000, country="US")
rotating = pool.rotating()
assert "user-country-US" in rotating
assert "proxy.example.com:9000" in rotating
sticky1 = pool.sticky()
sticky2 = pool.sticky()
assert sticky1 == sticky2 # Same session within TTL
pool.rotate()
sticky3 = pool.sticky()
assert sticky3 != sticky1 # Different session after rotation
Scheduling with Systemd Timers
For long-running production scrapers, systemd timers are more robust than cron — they handle missed runs, have restart policies, and integrate with journald:
# /etc/systemd/system/scraper.service
[Unit]
Description=Web Scraper
After=network.target
[Service]
Type=oneshot
User=scraper
WorkingDirectory=/opt/my_scraper
ExecStart=/opt/my_scraper/.venv/bin/python main.py reddit --subreddit python
EnvironmentFile=/opt/my_scraper/.env
StandardOutput=journal
StandardError=journal
# /etc/systemd/system/scraper.timer
[Unit]
Description=Run web scraper every 6 hours
[Timer]
OnBootSec=5min
OnUnitActiveSec=6h
Persistent=true
[Install]
WantedBy=timers.target
systemctl enable --now scraper.timer
journalctl -u scraper.service -f
Dependencies with uv
uv venv --python 3.14
source .venv/bin/activate
uv pip install httpx requests beautifulsoup4 lxml playwright tenacity python-dotenv pandas schedule
playwright install chromium
uv pip freeze > requirements.txt
Common Pitfalls This Structure Prevents
Silent failures: is_blocked() detects soft failures — CAPTCHAs that return 200 OK, rate-limit pages that don't use 429. validate_item() catches spiders that return empty records.
Configuration drift: Single source of truth for all settings. validate() at startup catches contradictions before any request is made.
Data loss on crash: JSONL exporter flushes every 100 items. SQLite exporter commits every 50. A crash after 10,000 items loses at most 100.
Proxy mismanagement: ProxyPool handles all URL construction, TTL tracking, and rotation. No spider constructs a proxy URL directly.
Non-reproducible environments: requirements.txt is always from pip freeze. .env.example documents everything. The directory structure is the contract.
The goal is not engineering for its own sake. It's operational reliability: when something breaks at 3am, you can find it, understand it, and fix it in minutes rather than hours.