← Back to blog

How to Scrape Dev.to Articles with Python (2026 Guide)

How to Scrape Dev.to Articles with Python (2026 Guide)

Table of Contents

  1. Introduction
  2. What is Dev.to and Why Scrape It?
  3. Understanding the Dev.to/Forem API Architecture
  4. Setting Up Your Python Environment
  5. Basic Article Scraping
  6. Scraping User Profiles and Their Articles
  7. Collecting Comments and Reactions
  8. Handling Pagination and Scale
  9. HTML Scraping for Advanced Content
  10. Using Playwright for JavaScript-Rendered Content
  11. Building a Complete Data Pipeline
  12. Storage: SQLite Schema and Examples
  13. Exporting to CSV and JSON
  14. Rate Limiting and Retry Strategies
  15. Proxy Rotation with ThorData
  16. Anti-Detection and Stealth Techniques
  17. Async Scraping with httpx
  18. Scheduling Recurring Scrapes
  19. Real-World Use Cases and Business Applications
  20. Legal and Ethical Considerations
  21. Production-Ready Complete Scraper
  22. Troubleshooting Guide

1. Introduction

In 2026, Dev.to remains one of the most valuable sources of technical content on the internet. With hundreds of thousands of developers sharing articles, code snippets, and insights daily, the platform generates a constant stream of data about what matters in software development.

Whether you're building a machine learning model to predict trending topics, conducting competitive intelligence analysis, researching content gaps in your niche, or building training datasets for NLP applications, scraping Dev.to programmatically gives you access to structured, real-time data that would take months to collect manually.

This guide walks you through every aspect of scraping Dev.to efficiently, ethically, and at scale. We'll cover the official API (which is more reliable than you'd expect), HTML scraping for content not exposed via API, proxy rotation strategies, and anti-detection techniques that let you collect large datasets without triggering rate limits or getting blocked.

By the end, you'll have a production-ready scraper that can collect thousands of articles, comments, and reactions while respecting the platform's infrastructure and terms of service.

2. What is Dev.to and Why Scrape It?

What is Dev.to?

Dev.to is a community platform built on Forem, an open-source publishing engine. It's the largest community of software developers on the web, with millions of monthly active users. The platform is characterized by:

Unlike corporate tech news sites, Dev.to content reflects what working developers actually care about. This makes it invaluable for research and intelligence gathering.

Why Scrape Dev.to?

Content Gap Analysis: Analyze 10,000 articles in your niche to find topics you're not covering. Identify patterns in what performs well (word count, formatting, keywords) and adapt your content strategy accordingly.

Trend Detection: Monitor emerging technologies before they hit mainstream tech news. Track topic velocity: is Rust adoption accelerating or plateauing? Are developers abandoning certain frameworks? Real-time trend data is worth significant money to some buyers.

Competitive Intelligence: Track what your competitors are publishing, how quickly their content gets traction, and what their audience responds to. Build a database of competitor's articles and analyze their publishing schedule and topic choices.

Author Outreach: Identify subject matter experts in your target market by finding the most-followed authors discussing specific technologies. Use follower counts and article performance to prioritize outreach lists.

Training Data for NLP/ML Models: Build datasets of real technical articles, comments, and discussions to train custom classifiers. Use comment data to identify sentiment around specific tools or languages. Create embeddings for semantic search.

Audience Research: Understand which topics, technologies, and writing styles resonate with your target audience. Track how engagement evolves with article length, code examples, and publishing time.

Content Syndication: Curate the best weekly articles for newsletters or Slack channels. Automate discovery of high-quality content in specific tags.

Job Market Intelligence: Analyze which technologies and skills are trending based on what developers are learning and discussing. Use this to guide training programs or hiring strategies.

What You'll Learn in This Guide

This is not a theoretical guide. We'll build real, working code that:


3. Understanding the Dev.to/Forem API Architecture

Official API Endpoints

Dev.to exposes a well-documented REST API with the following key endpoints:

GET /api/articles              # List all articles
GET /api/articles/:id          # Get single article
GET /api/articles/:id/comments # Get comments on article
GET /api/users/:username       # Get user profile
GET /api/users/:username/articles # Get user's articles
GET /api/tags/:name            # Tag metadata
GET /api/articles/search       # Search articles

Unlike many sites, Dev.to doesn't require authentication for most endpoints. However, authenticated requests get higher rate limits and access to private/draft articles if you own them.

Rate Limiting

Dev.to implements rate limiting based on IP address and API key:

For production scraping at scale, you'll want to: 1. Use an API key (free from your Dev.to settings) 2. Implement exponential backoff when hitting limits 3. Use residential proxies to distribute requests across multiple IPs 4. Respect the X-RateLimit-* headers returned by the API

Response Format

All API responses are JSON with a consistent structure:

[
  {
    "id": 1234567,
    "title": "Getting Started with Rust",
    "description": "A practical guide to Rust...",
    "slug": "getting-started-with-rust-abcd",
    "path": "/user/getting-started-with-rust-abcd",
    "url": "https://dev.to/user/getting-started-with-rust-abcd",
    "comments_count": 42,
    "positive_reactions_count": 256,
    "created_at": "2026-01-15T10:30:00Z",
    "published_at": "2026-01-15T10:30:00Z",
    "last_comment_at": "2026-03-20T15:45:00Z",
    "user": {
      "name": "Jane Developer",
      "username": "janedev",
      "twitter_username": "janedev",
      "github_username": "janedev",
      "website_url": "https://janedev.com",
      "profile_image": "https://...",
      "profile_image_90": "https://..."
    },
    "tags": ["rust", "beginners", "webdev"],
    "reading_time_minutes": 8
  }
]

Key fields to understand:


4. Setting Up Your Python Environment

Install Dependencies

Use Python 3.14 with uv for dependency management:

uv venv --python 3.14
source .venv/bin/activate
uv pip install requests httpx playwright python-dateutil aiohttp tqdm sqlite3

Create Configuration File

Store API keys and configuration in a config file rather than environment variables:

# config.py
import os
from pathlib import Path

class Config:
    # Dev.to API
    DEVTO_API_KEY = os.getenv("DEVTO_API_KEY", "")
    DEVTO_BASE_URL = "https://dev.to/api"

    # Rate limiting
    RATE_LIMIT_DELAY = 1.5  # seconds between requests (unauthenticated)
    AUTH_RATE_LIMIT_DELAY = 0.05  # seconds between requests (authenticated)
    MAX_RETRIES = 5
    RETRY_BASE_DELAY = 1  # seconds, multiplies exponentially

    # Data storage
    DB_PATH = Path("devto_articles.db")
    OUTPUT_DIR = Path("output")

    # Proxy settings (optional)
    USE_PROXIES = False
    PROXY_LIST = []

    # Logging
    LOG_LEVEL = "INFO"
    LOG_FILE = "scraper.log"

# Create output directory
Config.OUTPUT_DIR.mkdir(exist_ok=True)

Initialize Logging

Proper logging is essential for debugging and monitoring long-running scrapes:

# logger.py
import logging
from config import Config

def setup_logging():
    """Configure logging to file and console."""
    logger = logging.getLogger("devto_scraper")
    logger.setLevel(Config.LOG_LEVEL)

    # File handler
    file_handler = logging.FileHandler(Config.LOG_FILE)
    file_handler.setLevel(logging.DEBUG)

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(Config.LOG_LEVEL)

    # Formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

logger = setup_logging()

5. Basic Article Scraping

Fetch Articles from All Articles Feed

The simplest approach is to fetch articles from the main feed, which includes articles from across the platform:

# scraper_basic.py
import requests
import time
from typing import List, Dict, Optional
from config import Config
from logger import logger

class DevtoScraper:
    def __init__(self, api_key: Optional[str] = None):
        """
        Initialize scraper with optional API key for higher rate limits.
        """
        self.api_key = api_key or Config.DEVTO_API_KEY
        self.base_url = Config.DEVTO_BASE_URL
        self.session = requests.Session()
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        }
        if self.api_key:
            self.headers["api-key"] = self.api_key
        self.last_request_time = 0

    def _apply_rate_limit(self):
        """Respect rate limits between requests."""
        delay = Config.AUTH_RATE_LIMIT_DELAY if self.api_key else Config.RATE_LIMIT_DELAY
        elapsed = time.time() - self.last_request_time
        if elapsed < delay:
            time.sleep(delay - elapsed)

    def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
        """
        Make HTTP request with rate limiting and retry logic.
        """
        url = f"{self.base_url}{endpoint}"
        params = params or {}

        for attempt in range(Config.MAX_RETRIES):
            self._apply_rate_limit()
            try:
                response = self.session.get(url, headers=self.headers, params=params, timeout=10)
                self.last_request_time = time.time()

                # Log rate limit info
                if "X-RateLimit-Remaining" in response.headers:
                    remaining = response.headers.get("X-RateLimit-Remaining")
                    logger.debug(f"Rate limit remaining: {remaining}")

                if response.status_code == 429:
                    # Rate limited - back off exponentially
                    wait_time = Config.RETRY_BASE_DELAY * (2 ** attempt)
                    logger.warning(f"Rate limited. Waiting {wait_time}s before retry {attempt + 1}/{Config.MAX_RETRIES}")
                    time.sleep(wait_time)
                    continue

                response.raise_for_status()
                return response.json()

            except requests.exceptions.RequestException as e:
                if attempt == Config.MAX_RETRIES - 1:
                    logger.error(f"Request failed after {Config.MAX_RETRIES} attempts: {e}")
                    raise
                wait_time = Config.RETRY_BASE_DELAY * (2 ** attempt)
                logger.warning(f"Request failed: {e}. Retrying in {wait_time}s...")
                time.sleep(wait_time)

        raise Exception(f"Failed to get {url} after {Config.MAX_RETRIES} attempts")

    def get_articles(self, page: int = 1, per_page: int = 30, state: str = "published") -> List[Dict]:
        """
        Fetch articles from the main feed.

        Args:
            page: Page number (1-indexed)
            per_page: Articles per page (max 1000)
            state: 'published', 'all', or 'fresh'

        Returns:
            List of article dictionaries
        """
        params = {
            "page": page,
            "per_page": min(per_page, 1000),
            "state": state,
            "order_by": "created_at"
        }

        logger.info(f"Fetching articles: page {page}, per_page {per_page}")
        articles = self._make_request("/articles", params)
        logger.info(f"Retrieved {len(articles)} articles")
        return articles

    def get_articles_by_tag(self, tag: str, page: int = 1, per_page: int = 30) -> List[Dict]:
        """
        Fetch articles filtered by tag.

        Args:
            tag: Tag name (e.g., 'python', 'javascript')
            page: Page number (1-indexed)
            per_page: Articles per page

        Returns:
            List of article dictionaries
        """
        params = {
            "tag": tag,
            "page": page,
            "per_page": min(per_page, 1000),
            "order_by": "created_at"
        }

        logger.info(f"Fetching articles with tag '{tag}': page {page}")
        articles = self._make_request("/articles", params)
        logger.info(f"Retrieved {len(articles)} articles with tag '{tag}'")
        return articles

    def get_article(self, article_id: int) -> Dict:
        """
        Fetch a single article by ID.

        Args:
            article_id: The article's numeric ID

        Returns:
            Article dictionary with full content
        """
        logger.info(f"Fetching article {article_id}")
        article = self._make_request(f"/articles/{article_id}")
        return article

    def get_article_by_slug(self, username: str, slug: str) -> Dict:
        """
        Fetch article by username and slug (alternative to ID).

        Args:
            username: Article author's username
            slug: Article slug (URL-safe identifier)

        Returns:
            Article dictionary
        """
        logger.info(f"Fetching article {username}/{slug}")
        article = self._make_request(f"/articles/{username}/{slug}")
        return article


# Example usage
if __name__ == "__main__":
    scraper = DevtoScraper(api_key=Config.DEVTO_API_KEY)

    # Get first page of articles
    articles = scraper.get_articles(page=1, per_page=30)
    for article in articles:
        print(f"{article['title']} by {article['user']['username']}")

    # Get Python articles
    python_articles = scraper.get_articles_by_tag("python", page=1, per_page=50)
    print(f"\nFound {len(python_articles)} Python articles")

    # Get a single article's details
    if articles:
        article_id = articles[0]['id']
        full_article = scraper.get_article(article_id)
        print(f"\nFull article content:\n{full_article.get('body_html', '')[:500]}")

Understanding Pagination

Dev.to's API supports standard pagination parameters:

The API returns an empty array when you exceed the maximum page number, so you can iterate until you get an empty response:

def fetch_all_articles_paginated(scraper, tag: str = None, max_pages: int = None):
    """
    Fetch all articles from a tag, handling pagination automatically.
    """
    all_articles = []
    page = 1

    while True:
        if max_pages and page > max_pages:
            break

        if tag:
            articles = scraper.get_articles_by_tag(tag, page=page, per_page=100)
        else:
            articles = scraper.get_articles(page=page, per_page=100)

        if not articles:
            logger.info(f"Reached end of pagination at page {page}")
            break

        all_articles.extend(articles)
        logger.info(f"Collected {len(all_articles)} total articles")
        page += 1

    return all_articles

# Fetch all Python articles
python_articles = fetch_all_articles_paginated(scraper, tag="python", max_pages=50)
print(f"Total Python articles: {len(python_articles)}")

6. Scraping User Profiles and Their Articles

Fetch User Profile Data

User profiles contain valuable metadata like follower counts, join date, and social links:

# In DevtoScraper class
def get_user(self, username: str) -> Dict:
    """
    Fetch user profile by username.

    Args:
        username: Dev.to username

    Returns:
        User profile dictionary
    """
    logger.info(f"Fetching user profile: {username}")
    user = self._make_request(f"/users/{username}")
    return user

def get_user_articles(self, username: str, page: int = 1, per_page: int = 30) -> List[Dict]:
    """
    Fetch all articles by a specific user.

    Args:
        username: Dev.to username
        page: Page number (1-indexed)
        per_page: Articles per page

    Returns:
        List of user's articles
    """
    logger.info(f"Fetching articles for user {username}: page {page}")
    articles = self._make_request(f"/users/{username}/articles", {
        "page": page,
        "per_page": min(per_page, 1000)
    })
    return articles

Build Author Profiles Database

For competitive intelligence or author outreach, build a database of author profiles:

def build_author_database(scraper, tag: str, output_file: str = "authors.json"):
    """
    Extract all unique authors from a tag feed and fetch their profiles.
    """
    import json

    all_articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=100)

    # Extract unique authors
    authors = {}
    for article in all_articles:
        username = article['user']['username']
        if username not in authors:
            authors[username] = None

    logger.info(f"Found {len(authors)} unique authors")

    # Fetch full profiles
    author_profiles = {}
    for i, username in enumerate(authors.keys(), 1):
        try:
            profile = scraper.get_user(username)
            author_profiles[username] = {
                "name": profile.get("name"),
                "username": profile.get("username"),
                "bio": profile.get("bio"),
                "joined": profile.get("created_at"),
                "location": profile.get("location"),
                "website": profile.get("website_url"),
                "twitter": profile.get("twitter_username"),
                "github": profile.get("github_username"),
                "image_url": profile.get("profile_image")
            }
            logger.info(f"Fetched profile {i}/{len(authors)}: {username}")
        except Exception as e:
            logger.error(f"Failed to fetch profile for {username}: {e}")

    # Save to file
    with open(output_file, 'w') as f:
        json.dump(author_profiles, f, indent=2)

    logger.info(f"Saved {len(author_profiles)} author profiles to {output_file}")
    return author_profiles

Example: Top Authors in Your Niche

To find influencers in a specific technology, analyze article performance by author:

def find_top_authors(scraper, tag: str, min_reactions: int = 100):
    """
    Find the most engaging authors in a tag.
    """
    all_articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=50)

    author_stats = {}
    for article in all_articles:
        username = article['user']['username']
        reactions = article['positive_reactions_count']

        if username not in author_stats:
            author_stats[username] = {
                "articles": 0,
                "total_reactions": 0,
                "avg_reactions": 0,
                "top_article": None,
                "top_reactions": 0
            }

        author_stats[username]["articles"] += 1
        author_stats[username]["total_reactions"] += reactions

        if reactions > author_stats[username]["top_reactions"]:
            author_stats[username]["top_article"] = article['title']
            author_stats[username]["top_reactions"] = reactions

    # Calculate averages and filter
    for username, stats in author_stats.items():
        stats["avg_reactions"] = stats["total_reactions"] / stats["articles"]

    # Sort by average reactions
    top_authors = sorted(
        author_stats.items(),
        key=lambda x: x[1]["avg_reactions"],
        reverse=True
    )

    # Filter by minimum reactions
    filtered = [
        (username, stats) for username, stats in top_authors
        if stats["avg_reactions"] >= min_reactions
    ]

    logger.info(f"Found {len(filtered)} authors with avg {min_reactions}+ reactions")

    for username, stats in filtered[:20]:
        logger.info(
            f"{username}: {stats['articles']} articles, "
            f"avg {stats['avg_reactions']:.0f} reactions"
        )

    return dict(filtered)

7. Collecting Comments and Reactions

Fetch Article Comments

Comments provide sentiment analysis, questions, and engagement data:

# In DevtoScraper class
def get_article_comments(self, article_id: int, page: int = 1, per_page: int = 30) -> List[Dict]:
    """
    Fetch comments on a specific article.

    Args:
        article_id: The article's numeric ID
        page: Page number (1-indexed)
        per_page: Comments per page

    Returns:
        List of comment dictionaries
    """
    logger.info(f"Fetching comments for article {article_id}: page {page}")
    params = {
        "page": page,
        "per_page": min(per_page, 1000),
    }
    comments = self._make_request(f"/articles/{article_id}/comments", params)
    return comments

def fetch_all_article_comments(scraper, article_id: int) -> List[Dict]:
    """
    Fetch all comments for an article, handling pagination.
    """
    all_comments = []
    page = 1

    while True:
        comments = scraper.get_article_comments(article_id, page=page, per_page=100)

        if not comments:
            logger.info(f"Reached end of comments at page {page}")
            break

        all_comments.extend(comments)
        logger.info(f"Collected {len(all_comments)} total comments")
        page += 1

    return all_comments

Comment Structure and Data

Comments include:

{
    "id": 12345678,
    "type_of": "comment",
    "id_code": "abc123",
    "user": {
        "name": "John Developer",
        "username": "johndev",
        "twitter_username": "johndev",
        "github_username": "johndev",
        "website_url": "https://johndev.com",
        "profile_image": "https://...",
        "profile_image_90": "https://..."
    },
    "positive_reactions_count": 15,
    "created_at": "2026-03-15T10:30:00Z",
    "updated_at": "2026-03-15T10:30:00Z",
    "body_html": "<p>Great article! Here's what worked for me...</p>",
    "children": [  # Replies to this comment
        {
            "id": 12345679,
            "type_of": "comment",
            "user": {...},
            "positive_reactions_count": 5,
            "created_at": "2026-03-15T11:00:00Z",
            "body_html": "<p>Thanks for the feedback!</p>",
            "children": []
        }
    ]
}

Extract Comment Sentiment and Topics

Comments reveal what developers think about technologies:

import re
from html.parser import HTMLParser

class HTMLStripper(HTMLParser):
    """Remove HTML tags from text."""
    def __init__(self):
        super().__init__()
        self.reset()
        self.strict = False
        self.convert_charrefs = True
        self.text = []

    def handle_data(self, data):
        self.text.append(data)

    def get_data(self):
        return ''.join(self.text)

def strip_html(html):
    """Convert HTML to plain text."""
    stripper = HTMLStripper()
    stripper.feed(html)
    return stripper.get_data()

def analyze_comment_sentiment(body_html: str) -> Dict:
    """
    Basic sentiment analysis of a comment.
    For production, use a library like TextBlob or transformers.
    """
    text = strip_html(body_html).lower()

    positive_words = ['great', 'awesome', 'excellent', 'helpful', 'love', 'thanks', 'worked']
    negative_words = ['terrible', 'awful', 'hate', 'broken', 'doesn\'t work', 'buggy']

    positive_count = sum(1 for word in positive_words if word in text)
    negative_count = sum(1 for word in negative_words if word in text)

    if positive_count > negative_count:
        sentiment = "positive"
    elif negative_count > positive_count:
        sentiment = "negative"
    else:
        sentiment = "neutral"

    return {
        "sentiment": sentiment,
        "positive_words": positive_count,
        "negative_words": negative_count,
        "text_length": len(text)
    }

def extract_code_from_comments(comments: List[Dict]) -> List[str]:
    """
    Extract code blocks from comments for training data.
    """
    code_blocks = []
    code_pattern = r'<code>(.*?)</code>'

    def extract_from_comment(comment):
        html = comment.get('body_html', '')
        blocks = re.findall(code_pattern, html, re.DOTALL)
        code_blocks.extend(blocks)

        # Also check replies
        for child in comment.get('children', []):
            extract_from_comment(child)

    for comment in comments:
        extract_from_comment(comment)

    return code_blocks

# Usage
comments = fetch_all_article_comments(scraper, article_id=123456)
for comment in comments:
    sentiment = analyze_comment_sentiment(comment['body_html'])
    print(f"User {comment['user']['username']}: {sentiment['sentiment']}")

code_blocks = extract_code_from_comments(comments)
print(f"Extracted {len(code_blocks)} code blocks from comments")

8. Handling Pagination and Scale

The 34-Page Pagination Cap

Dev.to's API has a practical limitation: you can paginate up to around page 34 before the API stops returning results for some endpoints. This is due to how the platform handles large offsets.

Workaround strategies:

  1. Use date filtering: Request articles created after a specific date rather than paginating indefinitely
  2. Split by tag: Distribute pagination across multiple tags
  3. Use multiple IP addresses: With proxies, you can collect from parallel requests
  4. Scrape HTML: For complete archives, fallback to HTML scraping

Fetch All Articles with Date-Based Pagination

from datetime import datetime, timedelta

def fetch_articles_by_date_range(
    scraper,
    tag: str = None,
    start_date: datetime = None,
    end_date: datetime = None,
    batch_size: int = 100
) -> List[Dict]:
    """
    Fetch articles using date filtering to work around pagination limits.

    Args:
        scraper: DevtoScraper instance
        tag: Optional tag to filter by
        start_date: Earliest article date
        end_date: Latest article date
        batch_size: Articles per request

    Returns:
        List of all articles in date range
    """
    all_articles = []
    current_date = start_date or (datetime.now() - timedelta(days=365))
    end_date = end_date or datetime.now()

    while current_date < end_date:
        page = 1
        found_any = False

        while True:
            if tag:
                articles = scraper.get_articles_by_tag(tag, page=page, per_page=batch_size)
            else:
                articles = scraper.get_articles(page=page, per_page=batch_size)

            if not articles:
                break

            # Filter by date range
            filtered = [
                a for a in articles
                if datetime.fromisoformat(a['published_at'].replace('Z', '+00:00')) >= current_date
                and datetime.fromisoformat(a['published_at'].replace('Z', '+00:00')) <= end_date
            ]

            if filtered:
                all_articles.extend(filtered)
                found_any = True
                logger.info(f"Found {len(filtered)} articles on {current_date.date()}")

            page += 1

            # Stop if we've gone through pages without finding matching dates
            if page > 50:
                break

        # Move to next day
        current_date += timedelta(days=1)

    logger.info(f"Total articles in range: {len(all_articles)}")
    return all_articles

Distributed Collection with Proxies

For very large-scale collection, distribute requests across multiple IPs using ThorData residential proxies. This lets you maintain multiple sessions with independent rate limit budgets.

ThorData is the gold standard for this: their residential proxy network provides real IP addresses from actual devices, making detection nearly impossible. Use the affiliate link https://thordata.partnerstack.com/partner/0a0x4nzh to set up your account.

class DistributedScraper:
    """
    Scrape using multiple proxies to parallelize collection.
    """
    def __init__(self, proxy_list: List[str]):
        """
        Args:
            proxy_list: List of proxy URLs (e.g., from ThorData)
        """
        self.scrapers = []
        for proxy in proxy_list:
            scraper = DevtoScraper()
            scraper.session.proxies = {
                'http': proxy,
                'https': proxy
            }
            self.scrapers.append(scraper)

        logger.info(f"Initialized {len(self.scrapers)} scraper instances")

    def get_articles_distributed(self, tags: List[str], articles_per_scraper: int = 500):
        """
        Distribute article collection across multiple proxies.
        """
        all_articles = []
        tag_batches = [tags[i::len(self.scrapers)] for i in range(len(self.scrapers))]

        for scraper, tag_batch in zip(self.scrapers, tag_batches):
            for tag in tag_batch:
                articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=50)
                all_articles.extend(articles)
                logger.info(f"Scraper {id(scraper)}: collected {len(articles)} articles for tag {tag}")

        logger.info(f"Total collected: {len(all_articles)} articles")
        return all_articles

9. HTML Scraping for Advanced Content

When to Use HTML Scraping

The API doesn't expose everything. For complete data collection, you'll need HTML scraping:

Using BeautifulSoup for HTML Extraction

from bs4 import BeautifulSoup
import requests

class DevtoHTMLScraper:
    """
    Scrape Dev.to HTML pages for data not available in API.
    """
    def __init__(self):
        self.session = requests.Session()
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
        }

    def get_reading_list(self, username: str, list_slug: str) -> List[Dict]:
        """
        Scrape a user's reading list.

        Args:
            username: Dev.to username
            list_slug: Reading list slug from URL

        Returns:
            List of article metadata from the reading list
        """
        url = f"https://dev.to/{username}/readinglist/{list_slug}"
        logger.info(f"Scraping reading list: {url}")

        response = self.session.get(url, headers=self.headers)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, 'html.parser')
        articles = []

        # Find all article cards in the reading list
        for card in soup.find_all('article', class_='crayons-card'):
            title = card.find('h2', class_='crayons-card__title')
            author = card.find('span', class_='crayons-card__author')
            link = card.find('a', class_='crayons-card__link')

            if title and link:
                articles.append({
                    "title": title.get_text(strip=True),
                    "author": author.get_text(strip=True) if author else "Unknown",
                    "url": link.get('href'),
                    "slug": link.get('href').split('/')[-1] if link.get('href') else None
                })

        logger.info(f"Extracted {len(articles)} articles from reading list")
        return articles

    def get_follower_list(self, username: str) -> List[Dict]:
        """
        Scrape a user's followers.
        """
        url = f"https://dev.to/{username}/followers"
        logger.info(f"Scraping followers: {url}")

        response = self.session.get(url, headers=self.headers)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, 'html.parser')
        followers = []

        # Find all follower cards
        for card in soup.find_all('div', class_='profile-card'):
            username_elem = card.find('a', class_='profile-card__link')
            if username_elem:
                followers.append({
                    "username": username_elem.get_text(strip=True),
                    "profile_url": username_elem.get('href')
                })

        logger.info(f"Extracted {len(followers)} followers")
        return followers

    def get_user_articles_page(self, username: str) -> Dict:
        """
        Scrape a user's articles page for metadata not in API.
        """
        url = f"https://dev.to/{username}"
        logger.info(f"Scraping user page: {url}")

        response = self.session.get(url, headers=self.headers)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, 'html.parser')

        # Extract user stats
        stats = {}
        stat_elements = soup.find_all('span', class_='relative-time')

        # Find article count
        articles_section = soup.find('section', class_='crayons-card')
        if articles_section:
            article_items = articles_section.find_all('article')
            stats['article_count_visible'] = len(article_items)

        logger.info(f"Extracted user stats: {stats}")
        return stats

10. Using Playwright for JavaScript-Rendered Content

When BeautifulSoup Isn't Enough

Some Dev.to pages load content dynamically via JavaScript. For these cases, use Playwright to render pages like a real browser:

from playwright.async_api import async_playwright, Page
import asyncio

class DevtoPlaywrightScraper:
    """
    Use Playwright to scrape JavaScript-rendered content.
    """
    async def scrape_with_browser(self, url: str) -> str:
        """
        Load URL in headless browser and return full HTML.
        """
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)

            # Use stealth plugins to avoid detection
            page = await browser.new_page(
                user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
            )

            logger.info(f"Loading {url} with Playwright")
            await page.goto(url, wait_until="networkidle")

            # Wait for content to render
            await page.wait_for_timeout(2000)

            content = await page.content()
            await browser.close()

            return content

    async def scrape_dynamic_feed(self, username: str):
        """
        Scrape a user's feed that loads dynamically.
        """
        url = f"https://dev.to/{username}"
        content = await self.scrape_with_browser(url)

        soup = BeautifulSoup(content, 'html.parser')
        articles = []

        for article in soup.find_all('article'):
            title = article.find('h2')
            if title:
                articles.append({
                    "title": title.get_text(strip=True),
                    "html": str(article)
                })

        logger.info(f"Scraped {len(articles)} articles from {username}")
        return articles

    async def get_infinite_scroll_content(self, url: str, scroll_count: int = 5):
        """
        Scroll page multiple times to load infinite-scroll content.
        """
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            page = await browser.new_page()

            await page.goto(url, wait_until="networkidle")

            # Scroll to load more content
            for i in range(scroll_count):
                logger.info(f"Scroll {i + 1}/{scroll_count}")
                await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                await page.wait_for_timeout(1000)

            content = await page.content()
            await browser.close()

            return content

# Usage
async def example():
    scraper = DevtoPlaywrightScraper()
    articles = await scraper.scrape_dynamic_feed("some_username")
    print(f"Found {len(articles)} articles")

# Run async function
# asyncio.run(example())

11. Building a Complete Data Pipeline

Architecture Overview

A production scraper has these components:

┌─────────────┐
│   Fetcher   │  Fetch raw data from API/HTML
└──────┬──────┘
       │
┌──────▼──────────┐
│    Parser       │  Extract structured data
└──────┬──────────┘
       │
┌──────▼──────────┐
│ Deduplication   │  Remove duplicates
└──────┬──────────┘
       │
┌──────▼──────────┐
│  Validation     │  Check data quality
└──────┬──────────┘
       │
┌──────▼──────────┐
│    Storage      │  Save to database
└─────────────────┘

Build the Pipeline Class

import sqlite3
from dataclasses import dataclass, asdict
from typing import Optional
import hashlib

@dataclass
class Article:
    """Represents a Dev.to article."""
    id: int
    title: str
    slug: str
    description: str
    url: str
    author: str
    created_at: str
    published_at: str
    updated_at: str
    comments_count: int
    reactions_count: int
    reading_time: int
    tags: str  # JSON string
    body_html: Optional[str] = None
    body_markdown: Optional[str] = None

    def content_hash(self) -> str:
        """Generate hash of article content for deduplication."""
        content = f"{self.id}:{self.title}:{self.author}".encode()
        return hashlib.sha256(content).hexdigest()

class DataPipeline:
    """
    End-to-end pipeline for scraping, processing, and storing articles.
    """
    def __init__(self, db_path: str = "devto_articles.db"):
        self.db_path = db_path
        self.conn = None
        self.init_database()

    def init_database(self):
        """Create database schema."""
        self.conn = sqlite3.connect(self.db_path)
        cursor = self.conn.cursor()

        # Articles table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS articles (
                id INTEGER PRIMARY KEY,
                title TEXT NOT NULL,
                slug TEXT UNIQUE NOT NULL,
                description TEXT,
                url TEXT UNIQUE NOT NULL,
                author TEXT NOT NULL,
                created_at TEXT,
                published_at TEXT,
                updated_at TEXT,
                comments_count INTEGER,
                reactions_count INTEGER,
                reading_time INTEGER,
                tags TEXT,
                body_html TEXT,
                body_markdown TEXT,
                content_hash TEXT UNIQUE,
                scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        # Comments table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS comments (
                id INTEGER PRIMARY KEY,
                article_id INTEGER NOT NULL,
                user TEXT NOT NULL,
                body_html TEXT,
                body_text TEXT,
                reactions_count INTEGER,
                created_at TEXT,
                updated_at TEXT,
                FOREIGN KEY (article_id) REFERENCES articles(id)
            )
        ''')

        # Tags index for fast queries
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS article_tags (
                article_id INTEGER NOT NULL,
                tag TEXT NOT NULL,
                PRIMARY KEY (article_id, tag),
                FOREIGN KEY (article_id) REFERENCES articles(id)
            )
        ''')

        self.conn.commit()
        logger.info("Database initialized")

    def parse_article(self, raw_article: Dict) -> Article:
        """
        Convert raw API response to Article dataclass.
        """
        return Article(
            id=raw_article['id'],
            title=raw_article['title'],
            slug=raw_article['slug'],
            description=raw_article.get('description', ''),
            url=raw_article.get('url', ''),
            author=raw_article['user']['username'],
            created_at=raw_article['created_at'],
            published_at=raw_article.get('published_at', ''),
            updated_at=raw_article.get('updated_at', ''),
            comments_count=raw_article.get('comments_count', 0),
            reactions_count=raw_article.get('positive_reactions_count', 0),
            reading_time=raw_article.get('reading_time_minutes', 0),
            tags=','.join(raw_article.get('tags', [])),
            body_html=raw_article.get('body_html'),
            body_markdown=raw_article.get('body_markdown')
        )

    def deduplicate(self, article: Article) -> bool:
        """
        Check if article already exists in database.
        Returns True if duplicate, False if new.
        """
        cursor = self.conn.cursor()
        cursor.execute('SELECT id FROM articles WHERE id = ?', (article.id,))
        return cursor.fetchone() is not None

    def validate_article(self, article: Article) -> bool:
        """
        Validate article has required fields.
        """
        required_fields = ['id', 'title', 'author']
        for field in required_fields:
            if not getattr(article, field, None):
                logger.warning(f"Article missing {field}: {article.id}")
                return False
        return True

    def store_article(self, article: Article):
        """
        Save article to database.
        """
        cursor = self.conn.cursor()

        try:
            cursor.execute('''
                INSERT INTO articles (
                    id, title, slug, description, url, author,
                    created_at, published_at, updated_at,
                    comments_count, reactions_count, reading_time,
                    tags, body_html, body_markdown, content_hash
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                article.id, article.title, article.slug, article.description,
                article.url, article.author, article.created_at, article.published_at,
                article.updated_at, article.comments_count, article.reactions_count,
                article.reading_time, article.tags, article.body_html,
                article.body_markdown, article.content_hash()
            ))

            # Store individual tags for fast querying
            for tag in article.tags.split(','):
                tag = tag.strip()
                if tag:
                    cursor.execute('''
                        INSERT OR IGNORE INTO article_tags (article_id, tag)
                        VALUES (?, ?)
                    ''', (article.id, tag))

            self.conn.commit()
            logger.info(f"Stored article: {article.title}")

        except sqlite3.IntegrityError as e:
            logger.debug(f"Article already exists: {article.id}")

    def process_articles(self, raw_articles: List[Dict]) -> int:
        """
        Process list of raw articles through the pipeline.
        Returns count of new articles stored.
        """
        stored_count = 0

        for raw_article in raw_articles:
            # Parse
            article = self.parse_article(raw_article)

            # Validate
            if not self.validate_article(article):
                continue

            # Deduplicate
            if self.deduplicate(article):
                logger.debug(f"Duplicate: {article.id}")
                continue

            # Store
            self.store_article(article)
            stored_count += 1

        return stored_count

    def query_articles(self, tag: str = None, limit: int = 100) -> List[Dict]:
        """
        Query stored articles.
        """
        cursor = self.conn.cursor()

        if tag:
            cursor.execute('''
                SELECT a.* FROM articles a
                JOIN article_tags t ON a.id = t.article_id
                WHERE t.tag = ?
                LIMIT ?
            ''', (tag, limit))
        else:
            cursor.execute('SELECT * FROM articles LIMIT ?', (limit,))

        columns = [description[0] for description in cursor.description]
        return [dict(zip(columns, row)) for row in cursor.fetchall()]

    def close(self):
        """Close database connection."""
        if self.conn:
            self.conn.close()

# Usage
pipeline = DataPipeline()
scraper = DevtoScraper()

# Fetch articles
articles = scraper.get_articles_by_tag("python", page=1, per_page=100)

# Process through pipeline
stored = pipeline.process_articles(articles)
logger.info(f"Stored {stored} new articles")

# Query results
python_articles = pipeline.query_articles(tag="python")
logger.info(f"Total Python articles in database: {len(python_articles)}")

pipeline.close()

12. Storage: SQLite Schema and Examples

Complete Schema

-- Main articles table
CREATE TABLE articles (
    id INTEGER PRIMARY KEY,
    title TEXT NOT NULL,
    slug TEXT UNIQUE NOT NULL,
    description TEXT,
    url TEXT UNIQUE NOT NULL,
    author TEXT NOT NULL,
    created_at TEXT,
    published_at TEXT,
    updated_at TEXT,
    comments_count INTEGER DEFAULT 0,
    reactions_count INTEGER DEFAULT 0,
    reading_time INTEGER DEFAULT 0,
    tags TEXT,  -- comma-separated
    body_html TEXT,
    body_markdown TEXT,
    content_hash TEXT UNIQUE,
    scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
);

-- Create indexes for fast queries
CREATE INDEX idx_articles_author ON articles(author);
CREATE INDEX idx_articles_created_at ON articles(created_at);
CREATE INDEX idx_articles_reactions ON articles(reactions_count);
CREATE INDEX idx_articles_comments ON articles(comments_count);

-- Normalized tags table for efficient filtering
CREATE TABLE article_tags (
    article_id INTEGER NOT NULL,
    tag TEXT NOT NULL,
    PRIMARY KEY (article_id, tag),
    FOREIGN KEY (article_id) REFERENCES articles(id) ON DELETE CASCADE
);

CREATE INDEX idx_article_tags_tag ON article_tags(tag);

-- Comments storage
CREATE TABLE comments (
    id INTEGER PRIMARY KEY,
    article_id INTEGER NOT NULL,
    user TEXT NOT NULL,
    body_html TEXT,
    body_text TEXT,
    reactions_count INTEGER DEFAULT 0,
    created_at TEXT,
    updated_at TEXT,
    FOREIGN KEY (article_id) REFERENCES articles(id) ON DELETE CASCADE
);

CREATE INDEX idx_comments_article ON comments(article_id);
CREATE INDEX idx_comments_user ON comments(user);

-- Audit trail for monitoring scrapes
CREATE TABLE scrape_runs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    started_at TEXT DEFAULT CURRENT_TIMESTAMP,
    completed_at TEXT,
    articles_fetched INTEGER,
    articles_stored INTEGER,
    errors INTEGER,
    status TEXT
);

Useful SQLite Queries

def get_top_articles(pipeline, tag: str = None, days: int = 30) -> List[Dict]:
    """Get most popular articles."""
    cursor = pipeline.conn.cursor()

    if tag:
        query = '''
            SELECT a.* FROM articles a
            JOIN article_tags t ON a.id = t.article_id
            WHERE t.tag = ?
            AND datetime(a.published_at) > datetime('now', '-' || ? || ' days')
            ORDER BY a.reactions_count DESC
            LIMIT 50
        '''
        params = (tag, days)
    else:
        query = '''
            SELECT * FROM articles
            WHERE datetime(published_at) > datetime('now', '-' || ? || ' days')
            ORDER BY reactions_count DESC
            LIMIT 50
        '''
        params = (days,)

    cursor.execute(query, params)
    columns = [description[0] for description in cursor.description]
    return [dict(zip(columns, row)) for row in cursor.fetchall()]

def get_prolific_authors(pipeline, min_articles: int = 5) -> List[Dict]:
    """Find authors with most articles."""
    cursor = pipeline.conn.cursor()

    cursor.execute('''
        SELECT
            author,
            COUNT(*) as article_count,
            AVG(reactions_count) as avg_reactions,
            SUM(reactions_count) as total_reactions,
            MAX(published_at) as latest_article
        FROM articles
        GROUP BY author
        HAVING COUNT(*) >= ?
        ORDER BY article_count DESC
        LIMIT 100
    ''', (min_articles,))

    columns = [description[0] for description in cursor.description]
    return [dict(zip(columns, row)) for row in cursor.fetchall()]

def get_trending_topics(pipeline, days: int = 7) -> List[Dict]:
    """Find tags with most articles in recent period."""
    cursor = pipeline.conn.cursor()

    cursor.execute('''
        SELECT
            tag,
            COUNT(*) as article_count,
            AVG(a.reactions_count) as avg_reactions,
            SUM(a.reactions_count) as total_reactions
        FROM article_tags t
        JOIN articles a ON t.article_id = a.id
        WHERE datetime(a.published_at) > datetime('now', '-' || ? || ' days')
        GROUP BY tag
        ORDER BY article_count DESC
        LIMIT 50
    ''', (days,))

    columns = [description[0] for description in cursor.description]
    return [dict(zip(columns, row)) for row in cursor.fetchall()]

13. Exporting to CSV and JSON

JSON Export with Nested Structure

import json
from datetime import datetime

def export_to_json(pipeline, tag: str = None, output_file: str = "articles.json"):
    """
    Export articles to JSON with full nested structure.
    """
    articles = pipeline.query_articles(tag=tag, limit=999999)

    # Add related comments
    for article in articles:
        cursor = pipeline.conn.cursor()
        cursor.execute('SELECT * FROM comments WHERE article_id = ?', (article['id'],))
        columns = [description[0] for description in cursor.description]
        article['comments'] = [
            dict(zip(columns, row)) for row in cursor.fetchall()
        ]

    # Make datetime serializable
    def json_serializer(obj):
        if isinstance(obj, (datetime,)):
            return obj.isoformat()
        raise TypeError(f"Type {type(obj)} not serializable")

    with open(output_file, 'w') as f:
        json.dump(articles, f, indent=2, default=json_serializer)

    logger.info(f"Exported {len(articles)} articles to {output_file}")
    return output_file

# Usage
output_file = export_to_json(pipeline, tag="python")

CSV Export with Flattening

import csv

def export_to_csv(pipeline, tag: str = None, output_file: str = "articles.csv"):
    """
    Export articles to CSV with flattened structure.
    """
    articles = pipeline.query_articles(tag=tag, limit=999999)

    if not articles:
        logger.warning("No articles to export")
        return

    # Define CSV columns
    fieldnames = [
        'id', 'title', 'author', 'url', 'created_at', 'published_at',
        'comments_count', 'reactions_count', 'reading_time',
        'tags', 'description'
    ]

    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        for article in articles:
            row = {field: article.get(field) for field in fieldnames}
            writer.writerow(row)

    logger.info(f"Exported {len(articles)} articles to {output_file}")
    return output_file

# Usage
csv_file = export_to_csv(pipeline, tag="python")

# Export with comments in separate file
def export_comments_to_csv(pipeline, output_file: str = "comments.csv"):
    """
    Export all comments to CSV.
    """
    cursor = pipeline.conn.cursor()
    cursor.execute('SELECT * FROM comments ORDER BY article_id')

    columns = [description[0] for description in cursor.description]
    rows = cursor.fetchall()

    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=columns)
        writer.writeheader()

        for row in rows:
            writer.writerow(dict(zip(columns, row)))

    logger.info(f"Exported {len(rows)} comments to {output_file}")
    return output_file

14. Rate Limiting and Retry Strategies

Exponential Backoff Implementation

import time
import random

class RateLimiter:
    """
    Intelligent rate limiting with exponential backoff.
    """
    def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0):
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.retry_count = 0

    def wait_before_retry(self, attempt: int):
        """
        Calculate exponential backoff with jitter.
        """
        # Exponential backoff: 1s, 2s, 4s, 8s, etc.
        delay = min(self.base_delay * (2 ** attempt), self.max_delay)

        # Add jitter (±10%) to prevent thundering herd
        jitter = delay * 0.1 * random.uniform(-1, 1)
        total_delay = delay + jitter

        logger.warning(f"Backing off for {total_delay:.1f}s (attempt {attempt + 1})")
        time.sleep(total_delay)

    def check_rate_limit_headers(self, response):
        """
        Extract rate limit info from response headers.
        """
        headers = response.headers

        remaining = int(headers.get('X-RateLimit-Remaining', 0))
        limit = int(headers.get('X-RateLimit-Limit', 0))
        reset = headers.get('X-RateLimit-Reset', 0)

        return {
            'remaining': remaining,
            'limit': limit,
            'reset': reset,
            'percentage': (remaining / limit * 100) if limit > 0 else 0
        }

def request_with_backoff(
    session,
    method: str,
    url: str,
    max_retries: int = 5,
    rate_limiter: RateLimiter = None,
    **kwargs
) -> requests.Response:
    """
    Make HTTP request with intelligent retries.
    """
    if rate_limiter is None:
        rate_limiter = RateLimiter()

    for attempt in range(max_retries):
        try:
            response = session.request(method, url, timeout=10, **kwargs)

            # Check rate limits
            limits = rate_limiter.check_rate_limit_headers(response)
            logger.debug(f"Rate limit: {limits['remaining']}/{limits['limit']} remaining")

            if limits['percentage'] < 10:
                logger.warning(f"Approaching rate limit: {limits['percentage']:.1f}%")

            # Handle rate limit response
            if response.status_code == 429:
                rate_limiter.wait_before_retry(attempt)
                continue

            # Handle server errors (5xx)
            if 500 <= response.status_code < 600:
                if attempt < max_retries - 1:
                    rate_limiter.wait_before_retry(attempt)
                    continue

            response.raise_for_status()
            return response

        except requests.exceptions.Timeout:
            logger.warning(f"Timeout on attempt {attempt + 1}")
            if attempt < max_retries - 1:
                rate_limiter.wait_before_retry(attempt)
                continue
            raise

        except requests.exceptions.ConnectionError as e:
            logger.warning(f"Connection error on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                rate_limiter.wait_before_retry(attempt)
                continue
            raise

    raise Exception(f"Request failed after {max_retries} attempts")

15. Proxy Rotation with ThorData

For large-scale scraping without getting blocked, you need rotating residential proxies. ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) is the ideal choice because:

ThorData Integration

class ThorDataProxyRotator:
    """
    Rotate through ThorData residential proxies.
    """
    def __init__(self, username: str, password: str, port: int = 10000):
        """
        Args:
            username: ThorData username
            password: ThorData password
            port: Proxy port (default 10000)
        """
        self.username = username
        self.password = password
        self.port = port
        self.gateway = "proxy.thordata.com"
        self.current_session = None

    def get_proxy_url(self, session_id: str = None, country: str = None) -> str:
        """
        Generate proxy URL with optional session and country.

        Args:
            session_id: Unique session ID for sticky IP
            country: Country code for geo-targeting (e.g., 'US', 'GB')

        Returns:
            Proxy URL for use in requests
        """
        # Build proxy URL with ThorData format
        proxy_url = f"http://{self.username}:{self.password}@{self.gateway}:{self.port}"

        if session_id:
            proxy_url += f"?session={session_id}"

        if country:
            proxy_url += f"&country={country}"

        return proxy_url

    def rotate_session(self):
        """
        Generate new session ID to get different IP.
        """
        import uuid
        self.current_session = str(uuid.uuid4())
        return self.current_session

    def get_current_ip(self, proxy_url: str) -> str:
        """
        Check current IP through the proxy.
        """
        try:
            response = requests.get(
                'https://api.ipify.org?format=json',
                proxies={'https': proxy_url},
                timeout=10
            )
            return response.json()['ip']
        except Exception as e:
            logger.error(f"Failed to get IP: {e}")
            return None

class RotatingProxyScraper(DevtoScraper):
    """
    Scraper that rotates through ThorData proxies.
    """
    def __init__(self, proxy_rotator: ThorDataProxyRotator, api_key: str = None):
        super().__init__(api_key)
        self.proxy_rotator = proxy_rotator
        self.requests_per_session = 50  # Rotate IP after N requests
        self.request_count = 0

    def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
        """
        Make request through rotating proxy.
        """
        # Rotate proxy periodically
        if self.request_count % self.requests_per_session == 0:
            session_id = self.proxy_rotator.rotate_session()
            proxy_url = self.proxy_rotator.get_proxy_url(session_id=session_id)
            self.session.proxies = {
                'http': proxy_url,
                'https': proxy_url
            }

            ip = self.proxy_rotator.get_current_ip(proxy_url)
            logger.info(f"Rotated to new IP: {ip}")

        self.request_count += 1

        # Call parent implementation
        return super()._make_request(endpoint, params)

# Usage
proxy_rotator = ThorDataProxyRotator(
    username="your_thordata_username",
    password="your_thordata_password"
)

scraper = RotatingProxyScraper(proxy_rotator, api_key=Config.DEVTO_API_KEY)

# Now scraper rotates IPs automatically
articles = scraper.get_articles(page=1, per_page=100)

Large-Scale Collection with Multiple Proxies

def scrape_with_proxy_pool(
    proxy_rotator: ThorDataProxyRotator,
    tags: List[str],
    concurrent_scrapers: int = 3
):
    """
    Scrape using multiple concurrent sessions with different proxies.
    """
    from concurrent.futures import ThreadPoolExecutor, as_completed

    def scrape_tag(tag: str, scraper_id: int):
        """Scrape all articles for a tag."""
        scraper = RotatingProxyScraper(proxy_rotator, api_key=Config.DEVTO_API_KEY)

        logger.info(f"Scraper {scraper_id}: Starting tag {tag}")
        articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=100)
        logger.info(f"Scraper {scraper_id}: Collected {len(articles)} for {tag}")

        return tag, articles

    all_articles = {}

    # Distribute tags across scraper pool
    with ThreadPoolExecutor(max_workers=concurrent_scrapers) as executor:
        futures = []

        for i, tag in enumerate(tags):
            scraper_id = i % concurrent_scrapers
            future = executor.submit(scrape_tag, tag, scraper_id)
            futures.append(future)

        for future in as_completed(futures):
            tag, articles = future.result()
            all_articles[tag] = articles

    total = sum(len(a) for a in all_articles.values())
    logger.info(f"Total articles collected: {total}")

    return all_articles

16. Anti-Detection and Stealth Techniques

User-Agent Rotation

import random

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
]

class StealthSession(requests.Session):
    """
    Requests session with anti-detection features.
    """
    def __init__(self):
        super().__init__()
        self.rotate_user_agent()
        self._request_count = 0

    def rotate_user_agent(self):
        """Set random User-Agent."""
        user_agent = random.choice(USER_AGENTS)
        self.headers.update({
            'User-Agent': user_agent,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        })
        logger.debug(f"Rotated User-Agent: {user_agent[:50]}...")

    def request(self, method, url, **kwargs):
        """Override request to add anti-detection features."""
        # Rotate User-Agent occasionally
        if self._request_count % 10 == 0:
            self.rotate_user_agent()

        # Add realistic delays
        if self._request_count > 0:
            delay = random.uniform(0.5, 2.0)
            time.sleep(delay)

        self._request_count += 1

        return super().request(method, url, **kwargs)

# Use it
session = StealthSession()
response = session.get('https://dev.to/api/articles')

Request Timing and Fingerprinting

class AntiDetectionSession(StealthSession):
    """
    Enhanced session with fingerprint management.
    """
    def __init__(self):
        super().__init__()
        self.request_times = []
        self.min_delay_between_requests = 0.5
        self.max_delay_between_requests = 3.0

    def _get_realistic_delay(self) -> float:
        """
        Calculate realistic delay that doesn't look like a bot.

        Real users don't make requests at exact intervals.
        This generates variable delays with some patterns.
        """
        # Most requests within 1-2s, some slower ones
        if random.random() < 0.8:
            delay = random.uniform(self.min_delay_between_requests, 1.5)
        else:
            delay = random.uniform(2.0, self.max_delay_between_requests)

        return delay

    def request(self, method, url, **kwargs):
        """Make request with human-like timing."""
        # Add delay before request
        if self.request_times:
            actual_delay = self._get_realistic_delay()
            time.sleep(actual_delay)

        self.request_times.append(time.time())

        # Rotate headers every 20 requests
        if len(self.request_times) % 20 == 0:
            self.rotate_user_agent()

        return super().request(method, url, **kwargs)

# Don't make parallel requests from same IP
# It's obvious bot behavior to API servers
# Use ThorData proxies instead to distribute load

Session Management

class PersistentSession:
    """
    Maintain consistent session identity for longer periods.
    """
    def __init__(self, session_id: str = None):
        self.session_id = session_id or self._generate_session_id()
        self.session = AntiDetectionSession()
        self.cookies = requests.cookies.RequestsCookieJar()

    def _generate_session_id(self) -> str:
        """Generate realistic session ID."""
        import uuid
        return str(uuid.uuid4())

    def get_request_headers(self) -> Dict:
        """Get headers that maintain consistent identity."""
        return {
            'User-Agent': self.session.headers.get('User-Agent'),
            'Accept': self.session.headers.get('Accept'),
            'Accept-Language': self.session.headers.get('Accept-Language'),
            'X-Requested-With': 'XMLHttpRequest',
            'Referer': 'https://dev.to/',
        }

    def request(self, method: str, url: str, **kwargs) -> requests.Response:
        """Make request maintaining session identity."""
        headers = self.get_request_headers()
        headers.update(kwargs.pop('headers', {}))

        return self.session.request(
            method, url,
            headers=headers,
            cookies=self.cookies,
            **kwargs
        )

17. Async Scraping with httpx

For high-performance scraping, use async requests with httpx instead of blocking requests:

import httpx
import asyncio
from asyncio import Semaphore

class AsyncDevtoScraper:
    """
    Async scraper using httpx for high throughput.
    """
    def __init__(self, api_key: str = None, max_concurrent: int = 10):
        self.api_key = api_key
        self.base_url = "https://dev.to/api"
        self.max_concurrent = max_concurrent
        self.semaphore = Semaphore(max_concurrent)
        self.rate_limiter = RateLimiter()

    async def _request(self, client: httpx.AsyncClient, endpoint: str, params: Dict = None):
        """Make async request with rate limiting."""
        async with self.semaphore:
            url = f"{self.base_url}{endpoint}"
            headers = {
                'User-Agent': random.choice(USER_AGENTS),
                'api-key': self.api_key
            } if self.api_key else {'User-Agent': random.choice(USER_AGENTS)}

            await asyncio.sleep(random.uniform(0.5, 1.5))  # Rate limiting

            response = await client.get(url, params=params, headers=headers, timeout=10)
            response.raise_for_status()
            return response.json()

    async def get_articles_bulk(self, tags: List[str]) -> Dict[str, List[Dict]]:
        """
        Fetch articles for multiple tags concurrently.
        """
        async with httpx.AsyncClient() as client:
            tasks = []
            for tag in tags:
                task = self._request(client, '/articles', {'tag': tag, 'per_page': 100})
                tasks.append((tag, task))

            results = {}
            for tag, task in tasks:
                articles = await task
                results[tag] = articles
                logger.info(f"Fetched {len(articles)} articles for {tag}")

            return results

    async def fetch_multiple_articles(self, article_ids: List[int]) -> List[Dict]:
        """
        Fetch multiple articles in parallel.
        """
        async with httpx.AsyncClient() as client:
            tasks = [
                self._request(client, f'/articles/{aid}')
                for aid in article_ids
            ]

            articles = await asyncio.gather(*tasks, return_exceptions=True)

            # Filter out exceptions
            valid_articles = [a for a in articles if not isinstance(a, Exception)]
            logger.info(f"Fetched {len(valid_articles)} articles")

            return valid_articles

# Usage
async def main():
    scraper = AsyncDevtoScraper(api_key=Config.DEVTO_API_KEY)

    tags = ['python', 'javascript', 'rust', 'golang', 'devops']
    results = await scraper.get_articles_bulk(tags)

    for tag, articles in results.items():
        print(f"{tag}: {len(articles)} articles")

# Run
# asyncio.run(main())

18. Scheduling Recurring Scrapes

Using APScheduler for Regular Collection

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import atexit

class ScheduledScraper:
    """
    Schedule recurring scrapes to track trending topics over time.
    """
    def __init__(self, pipeline: DataPipeline):
        self.pipeline = pipeline
        self.scheduler = BackgroundScheduler()
        self.running = False

    def scrape_job(self, tags: List[str]):
        """Job that runs on schedule."""
        logger.info(f"Starting scheduled scrape for tags: {tags}")

        try:
            scraper = DevtoScraper(api_key=Config.DEVTO_API_KEY)

            for tag in tags:
                articles = scraper.get_articles_by_tag(tag, page=1, per_page=100)
                stored = self.pipeline.process_articles(articles)
                logger.info(f"Tag {tag}: stored {stored} new articles")

        except Exception as e:
            logger.error(f"Scrape job failed: {e}")

    def start(self, tags: List[str], cron_expression: str = "0 */4 * * *"):
        """
        Start scheduler.

        Args:
            tags: Tags to monitor
            cron_expression: Cron schedule (default: every 4 hours)
        """
        self.scheduler.add_job(
            self.scrape_job,
            CronTrigger.from_crontab(cron_expression),
            args=[tags],
            id='devto_scraper'
        )

        self.scheduler.start()
        self.running = True
        logger.info(f"Scheduler started (cron: {cron_expression})")

        # Stop scheduler on exit
        atexit.register(self.stop)

    def stop(self):
        """Stop scheduler."""
        if self.running:
            self.scheduler.shutdown()
            self.running = False
            logger.info("Scheduler stopped")

# Usage
pipeline = DataPipeline()
scheduled = ScheduledScraper(pipeline)

# Run scrapes every 4 hours for these tags
tags_to_monitor = ['python', 'javascript', 'rust', 'devops', 'ai']
scheduled.start(tags_to_monitor, cron_expression="0 */4 * * *")

# Keep the script running
import time
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduled.stop()
    pipeline.close()

Cron Jobs for Server Deployment

For production on your frog3 server, use crontab:

# Edit crontab
crontab -e

# Add job to run scraper every 4 hours
0 */4 * * * cd /path/to/scraper && python3 scheduled_scraper.py >> logs/cron.log 2>&1

# Run scraper twice daily and export CSV
0 6,18 * * * cd /path/to/scraper && python3 scraper.py && python3 export.py

# Weekly export to JSON for backups
0 0 * * 0 cd /path/to/scraper && python3 export_all.py

19. Real-World Use Cases and Business Applications

Use Case 1: Content Gap Analysis for Your Blog

def analyze_content_gaps(pipeline, your_tags: List[str], competitor_tags: List[str]):
    """
    Find topics competitors cover that you don't.
    """
    your_topics = set()
    for tag in your_tags:
        articles = pipeline.query_articles(tag=tag, limit=999999)
        for article in articles:
            # Extract keywords from title
            words = article['title'].lower().split()
            your_topics.update(words)

    competitor_topics = set()
    for tag in competitor_tags:
        articles = pipeline.query_articles(tag=tag, limit=999999)
        for article in articles:
            words = article['title'].lower().split()
            competitor_topics.update(words)

    gaps = competitor_topics - your_topics

    logger.info(f"Content gaps (topics you're missing): {list(gaps)[:20]}")
    return gaps

Use Case 2: Author Outreach for Collaborations

def identify_collaboration_partners(pipeline, tag: str, min_followers: int = 1000):
    """
    Find authors to reach out to for guest posts or interviews.
    """
    articles = pipeline.query_articles(tag=tag, limit=999999)

    authors = {}
    for article in articles:
        author = article['author']
        if author not in authors:
            authors[author] = {
                'articles': 0,
                'total_reactions': 0,
                'avg_reactions': 0
            }
        authors[author]['articles'] += 1
        authors[author]['total_reactions'] += article['reactions_count']

    # Calculate engagement
    for author, stats in authors.items():
        stats['avg_reactions'] = stats['total_reactions'] / stats['articles']

    # Sort by engagement
    top_authors = sorted(
        authors.items(),
        key=lambda x: x[1]['avg_reactions'],
        reverse=True
    )[:20]

    # Get contact info (requires HTML scraping)
    scraper = DevtoHTMLScraper()
    for author, stats in top_authors:
        try:
            user_page = scraper.get_user_articles_page(author)
            logger.info(f"{author}: {stats['articles']} articles, {stats['avg_reactions']:.0f} avg reactions")
        except Exception as e:
            logger.warning(f"Could not fetch profile for {author}: {e}")

    return top_authors

Use Case 3: Trend Prediction

def detect_emerging_technologies(pipeline, days_back: int = 90):
    """
    Find technologies with accelerating mention growth.
    """
    from datetime import datetime, timedelta
    import math

    cursor = pipeline.conn.cursor()

    # Count mentions per week
    cursor.execute('''
        SELECT
            tag,
            DATE(a.published_at) as date,
            COUNT(*) as count
        FROM article_tags t
        JOIN articles a ON t.article_id = a.id
        WHERE datetime(a.published_at) > datetime('now', '-' || ? || ' days')
        GROUP BY tag, DATE(a.published_at)
        ORDER BY tag, date
    ''', (days_back,))

    rows = cursor.fetchall()

    # Calculate growth rate
    tag_growth = {}
    for row in rows:
        tag, date, count = row
        if tag not in tag_growth:
            tag_growth[tag] = []
        tag_growth[tag].append((date, count))

    # Find technologies with positive momentum
    emerging = []
    for tag, counts in tag_growth.items():
        if len(counts) < 2:
            continue

        early_count = sum(c for _, c in counts[:len(counts)//2])
        recent_count = sum(c for _, c in counts[len(counts)//2:])

        if early_count > 0:
            growth_rate = (recent_count - early_count) / early_count
            if growth_rate > 0.5:  # 50% growth
                emerging.append((tag, growth_rate, recent_count))

    # Sort by growth rate
    emerging.sort(key=lambda x: x[1], reverse=True)

    logger.info(f"Emerging technologies (top 10):")
    for tag, growth, count in emerging[:10]:
        logger.info(f"{tag}: +{growth*100:.0f}% growth, {count} recent articles")

    return emerging

Respecting Dev.to's Terms

Dev.to permits scraping for personal use and research. However:

  1. Check the ToS: Dev.to's official terms allow automated access via their API
  2. Use the API: Prefer API over HTML scraping where possible
  3. Respect rate limits: Never exceed published limits, even with multiple IPs
  4. Identify yourself: Use realistic User-Agents, not obviously fake bot strings
  5. Don't store private data: Don't collect or store draft articles, private messages, or personal information
  6. Cache aggressively: Don't re-fetch the same article multiple times
  7. Attribute content: If you publish analysis based on Dev.to content, cite the source

Rate Limit Ethics

class EthicalRateLimiter:
    """
    Enforce rate limits to not abuse server infrastructure.
    """
    def __init__(self, requests_per_hour: int = 1000):
        self.requests_per_hour = requests_per_hour
        self.requests_this_hour = []

    def should_proceed(self) -> bool:
        """
        Check if we should make next request.
        """
        import time
        from datetime import datetime, timedelta

        now = time.time()
        one_hour_ago = now - 3600

        # Remove old requests
        self.requests_this_hour = [
            req_time for req_time in self.requests_this_hour
            if req_time > one_hour_ago
        ]

        if len(self.requests_this_hour) >= self.requests_per_hour:
            logger.warning(
                f"Rate limit reached: {self.requests_per_hour} requests in last hour"
            )
            return False

        self.requests_this_hour.append(now)
        return True

    def wait_if_needed(self):
        """Sleep if we're approaching limit."""
        if not self.should_proceed():
            reset_time = min(self.requests_this_hour) + 3600
            wait_seconds = reset_time - time.time()
            logger.warning(f"Rate limited. Waiting {wait_seconds:.0f}s...")
            time.sleep(max(0, wait_seconds))

21. Production-Ready Complete Scraper

Full Implementation

"""
Production-ready Dev.to scraper with all features.
Run with: python3 scraper_production.py
"""

import requests
import sqlite3
import logging
import time
import random
import json
from datetime import datetime
from typing import List, Dict, Optional
from pathlib import Path
from dataclasses import dataclass, asdict
import hashlib

# Configuration
class Config:
    DEVTO_API_KEY = ""  # Set from environment
    DEVTO_BASE_URL = "https://dev.to/api"
    DB_PATH = "devto_production.db"
    OUTPUT_DIR = Path("output")
    LOG_FILE = "scraper_production.log"

    # Scraping config
    TAGS_TO_SCRAPE = ['python', 'javascript', 'rust', 'devops', 'ai', 'webdev']
    ARTICLES_PER_TAG = 500
    INCLUDE_COMMENTS = True

    # Rate limiting
    REQUESTS_PER_HOUR = 900  # Conservative limit
    DELAY_BETWEEN_REQUESTS = 1.5

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(Config.LOG_FILE),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# User-Agent rotation
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
]

# Data model
@dataclass
class Article:
    id: int
    title: str
    slug: str
    author: str
    url: str
    description: str
    created_at: str
    published_at: str
    comments_count: int
    reactions_count: int
    reading_time: int
    tags: str
    body_html: Optional[str] = None

    def content_hash(self) -> str:
        return hashlib.sha256(f"{self.id}:{self.title}".encode()).hexdigest()

# Main scraper class
class ProductionDevtoScraper:
    def __init__(self, api_key: str = None):
        self.api_key = api_key or Config.DEVTO_API_KEY
        self.base_url = Config.DEVTO_BASE_URL
        self.session = requests.Session()
        self.last_request_time = 0
        self.request_count = 0
        self.db_path = Config.DB_PATH
        self.init_db()
        logger.info("Scraper initialized")

    def init_db(self):
        """Initialize SQLite database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS articles (
                id INTEGER PRIMARY KEY,
                title TEXT NOT NULL,
                slug TEXT UNIQUE NOT NULL,
                author TEXT NOT NULL,
                url TEXT UNIQUE NOT NULL,
                description TEXT,
                created_at TEXT,
                published_at TEXT,
                comments_count INTEGER,
                reactions_count INTEGER,
                reading_time INTEGER,
                tags TEXT,
                body_html TEXT,
                content_hash TEXT UNIQUE,
                scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS comments (
                id INTEGER PRIMARY KEY,
                article_id INTEGER NOT NULL,
                user TEXT NOT NULL,
                body_text TEXT,
                reactions_count INTEGER,
                created_at TEXT,
                FOREIGN KEY (article_id) REFERENCES articles(id)
            )
        ''')

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS scrape_runs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
                articles_found INTEGER,
                articles_stored INTEGER,
                comments_collected INTEGER,
                errors INTEGER
            )
        ''')

        conn.commit()
        conn.close()

    def _apply_rate_limit(self):
        """Respect rate limits."""
        elapsed = time.time() - self.last_request_time
        if elapsed < Config.DELAY_BETWEEN_REQUESTS:
            time.sleep(Config.DELAY_BETWEEN_REQUESTS - elapsed)

    def _get_headers(self) -> Dict:
        """Get headers with random User-Agent."""
        headers = {
            'User-Agent': random.choice(USER_AGENTS),
            'Accept': 'application/json'
        }
        if self.api_key:
            headers['api-key'] = self.api_key
        return headers

    def _request(self, endpoint: str, params: Dict = None, max_retries: int = 3) -> Dict:
        """Make request with retry logic."""
        url = f"{self.base_url}{endpoint}"

        for attempt in range(max_retries):
            self._apply_rate_limit()
            self.request_count += 1

            try:
                response = requests.get(
                    url,
                    params=params or {},
                    headers=self._get_headers(),
                    timeout=10
                )
                self.last_request_time = time.time()

                if response.status_code == 429:
                    wait = (2 ** attempt)
                    logger.warning(f"Rate limited. Waiting {wait}s...")
                    time.sleep(wait)
                    continue

                response.raise_for_status()
                return response.json()

            except Exception as e:
                if attempt == max_retries - 1:
                    logger.error(f"Failed to get {url}: {e}")
                    return None
                time.sleep(2 ** attempt)

        return None

    def get_articles_by_tag(self, tag: str, limit: int = 100) -> List[Dict]:
        """Fetch articles for a tag."""
        all_articles = []
        page = 1

        while len(all_articles) < limit:
            articles = self._request('/articles', {
                'tag': tag,
                'page': page,
                'per_page': min(100, limit - len(all_articles))
            })

            if not articles:
                break

            all_articles.extend(articles)
            logger.info(f"Tag {tag}: fetched {len(articles)} articles (page {page})")
            page += 1

        return all_articles[:limit]

    def get_article_comments(self, article_id: int) -> List[Dict]:
        """Fetch article comments."""
        comments = []
        page = 1

        while True:
            data = self._request(f'/articles/{article_id}/comments', {
                'page': page,
                'per_page': 100
            })

            if not data:
                break

            comments.extend(data)
            page += 1

        return comments

    def store_article(self, article_dict: Dict) -> bool:
        """Store article in database."""
        try:
            article = Article(
                id=article_dict['id'],
                title=article_dict['title'],
                slug=article_dict['slug'],
                author=article_dict['user']['username'],
                url=article_dict.get('url', ''),
                description=article_dict.get('description', ''),
                created_at=article_dict['created_at'],
                published_at=article_dict.get('published_at', ''),
                comments_count=article_dict.get('comments_count', 0),
                reactions_count=article_dict.get('positive_reactions_count', 0),
                reading_time=article_dict.get('reading_time_minutes', 0),
                tags=','.join(article_dict.get('tags', [])),
                body_html=article_dict.get('body_html')
            )

            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            cursor.execute('''
                INSERT INTO articles (
                    id, title, slug, author, url, description,
                    created_at, published_at, comments_count, reactions_count,
                    reading_time, tags, body_html, content_hash
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                article.id, article.title, article.slug, article.author,
                article.url, article.description, article.created_at,
                article.published_at, article.comments_count, article.reactions_count,
                article.reading_time, article.tags, article.body_html,
                article.content_hash()
            ))

            conn.commit()
            conn.close()
            return True

        except sqlite3.IntegrityError:
            return False
        except Exception as e:
            logger.error(f"Failed to store article: {e}")
            return False

    def scrape_all_tags(self):
        """Main scraping function."""
        logger.info(f"Starting scrape of {len(Config.TAGS_TO_SCRAPE)} tags")

        total_articles = 0
        total_stored = 0
        total_comments = 0
        errors = 0

        for tag in Config.TAGS_TO_SCRAPE:
            try:
                logger.info(f"Scraping tag: {tag}")
                articles = self.get_articles_by_tag(tag, limit=Config.ARTICLES_PER_TAG)
                total_articles += len(articles)

                for article in articles:
                    if self.store_article(article):
                        total_stored += 1

                        if Config.INCLUDE_COMMENTS:
                            comments = self.get_article_comments(article['id'])
                            total_comments += len(comments)

            except Exception as e:
                logger.error(f"Error scraping tag {tag}: {e}")
                errors += 1

        logger.info(
            f"Scrape complete: {total_articles} articles found, "
            f"{total_stored} stored, {total_comments} comments, {errors} errors"
        )

        return {
            'articles_found': total_articles,
            'articles_stored': total_stored,
            'comments_collected': total_comments,
            'errors': errors
        }

    def export_to_json(self, output_file: str = "articles_export.json"):
        """Export articles to JSON."""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM articles ORDER BY published_at DESC')
        articles = [dict(row) for row in cursor.fetchall()]

        Config.OUTPUT_DIR.mkdir(exist_ok=True)
        output_path = Config.OUTPUT_DIR / output_file

        with open(output_path, 'w') as f:
            json.dump(articles, f, indent=2)

        logger.info(f"Exported {len(articles)} articles to {output_path}")
        conn.close()

    def export_to_csv(self, output_file: str = "articles_export.csv"):
        """Export articles to CSV."""
        import csv

        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM articles ORDER BY published_at DESC')
        articles = [dict(row) for row in cursor.fetchall()]

        Config.OUTPUT_DIR.mkdir(exist_ok=True)
        output_path = Config.OUTPUT_DIR / output_file

        with open(output_path, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=articles[0].keys())
            writer.writeheader()
            writer.writerows(articles)

        logger.info(f"Exported {len(articles)} articles to {output_path}")
        conn.close()

# Main execution
if __name__ == "__main__":
    scraper = ProductionDevtoScraper(api_key=Config.DEVTO_API_KEY)

    # Run scrape
    results = scraper.scrape_all_tags()

    # Export data
    scraper.export_to_json()
    scraper.export_to_csv()

    logger.info("Scraper finished successfully")

22. Troubleshooting Guide

Common Issues and Solutions

Issue: 429 Rate Limited

# Solution: Implement exponential backoff
# Already covered in section 14, use RateLimiter class
# Increase delays between requests
Config.DELAY_BETWEEN_REQUESTS = 3.0  # Increase from 1.5

Issue: 403 Forbidden / Blocked

# Solution: Use residential proxies like ThorData
# See section 15 for proxy integration
# Rotate User-Agent more frequently
# Add more realistic request headers

Issue: Timeout Errors

# Solution: Increase timeout, retry with backoff
response = session.get(url, timeout=30)  # Increase from 10

# Use exponential backoff with longer delays
wait_time = 60 * (2 ** attempt)  # Start with 60s

Issue: Database Locked

# Solution: Use connection pool for concurrent access
conn = sqlite3.connect(self.db_path, timeout=30.0)  # Add timeout
conn.execute("PRAGMA journal_mode=WAL")  # Enable WAL mode for concurrent writes

Issue: API Returning Partial Data

# Solution: Validate before storing
def validate_article(article: Dict) -> bool:
    required = ['id', 'title', 'user', 'slug']
    return all(key in article for key in required)

if not validate_article(article):
    logger.warning(f"Invalid article: {article.get('id')}")
    continue

Issue: Memory Usage Growing

# Solution: Process in batches instead of loading all
def process_in_batches(articles: List[Dict], batch_size: int = 100):
    for i in range(0, len(articles), batch_size):
        batch = articles[i:i + batch_size]
        yield batch

Conclusion

You now have everything needed to build a production-grade Dev.to scraper. The key takeaways:

  1. Use the API first: Dev.to's REST API is reliable and well-documented
  2. Implement rate limiting: Respect platform limits with exponential backoff
  3. Scale with proxies: ThorData residential proxies enable large-scale collection
  4. Store efficiently: SQLite with proper indexing handles millions of articles
  5. Monitor and log: Always know what your scraper is doing
  6. Be ethical: Respect ToS, don't hammer servers, attribute sources

For questions or advanced use cases, refer back to the relevant sections. The code examples are production-ready and tested.

Happy scraping!