How to Scrape Dev.to Articles with Python (2026 Guide)
How to Scrape Dev.to Articles with Python (2026 Guide)
Table of Contents
- Introduction
- What is Dev.to and Why Scrape It?
- Understanding the Dev.to/Forem API Architecture
- Setting Up Your Python Environment
- Basic Article Scraping
- Scraping User Profiles and Their Articles
- Collecting Comments and Reactions
- Handling Pagination and Scale
- HTML Scraping for Advanced Content
- Using Playwright for JavaScript-Rendered Content
- Building a Complete Data Pipeline
- Storage: SQLite Schema and Examples
- Exporting to CSV and JSON
- Rate Limiting and Retry Strategies
- Proxy Rotation with ThorData
- Anti-Detection and Stealth Techniques
- Async Scraping with httpx
- Scheduling Recurring Scrapes
- Real-World Use Cases and Business Applications
- Legal and Ethical Considerations
- Production-Ready Complete Scraper
- Troubleshooting Guide
1. Introduction
In 2026, Dev.to remains one of the most valuable sources of technical content on the internet. With hundreds of thousands of developers sharing articles, code snippets, and insights daily, the platform generates a constant stream of data about what matters in software development.
Whether you're building a machine learning model to predict trending topics, conducting competitive intelligence analysis, researching content gaps in your niche, or building training datasets for NLP applications, scraping Dev.to programmatically gives you access to structured, real-time data that would take months to collect manually.
This guide walks you through every aspect of scraping Dev.to efficiently, ethically, and at scale. We'll cover the official API (which is more reliable than you'd expect), HTML scraping for content not exposed via API, proxy rotation strategies, and anti-detection techniques that let you collect large datasets without triggering rate limits or getting blocked.
By the end, you'll have a production-ready scraper that can collect thousands of articles, comments, and reactions while respecting the platform's infrastructure and terms of service.
2. What is Dev.to and Why Scrape It?
What is Dev.to?
Dev.to is a community platform built on Forem, an open-source publishing engine. It's the largest community of software developers on the web, with millions of monthly active users. The platform is characterized by:
- High-quality technical content: Articles range from beginner tutorials to advanced system design discussions
- Real-time trend data: Trending topics emerge hours before appearing on traditional tech news sites
- Community engagement metrics: Comments, reactions, and discussions show real developer sentiment
- User profiles: Author credibility, follower counts, and publication history are publicly available
- Cross-platform integration: Articles often include code examples, external links, and rich media
Unlike corporate tech news sites, Dev.to content reflects what working developers actually care about. This makes it invaluable for research and intelligence gathering.
Why Scrape Dev.to?
Content Gap Analysis: Analyze 10,000 articles in your niche to find topics you're not covering. Identify patterns in what performs well (word count, formatting, keywords) and adapt your content strategy accordingly.
Trend Detection: Monitor emerging technologies before they hit mainstream tech news. Track topic velocity: is Rust adoption accelerating or plateauing? Are developers abandoning certain frameworks? Real-time trend data is worth significant money to some buyers.
Competitive Intelligence: Track what your competitors are publishing, how quickly their content gets traction, and what their audience responds to. Build a database of competitor's articles and analyze their publishing schedule and topic choices.
Author Outreach: Identify subject matter experts in your target market by finding the most-followed authors discussing specific technologies. Use follower counts and article performance to prioritize outreach lists.
Training Data for NLP/ML Models: Build datasets of real technical articles, comments, and discussions to train custom classifiers. Use comment data to identify sentiment around specific tools or languages. Create embeddings for semantic search.
Audience Research: Understand which topics, technologies, and writing styles resonate with your target audience. Track how engagement evolves with article length, code examples, and publishing time.
Content Syndication: Curate the best weekly articles for newsletters or Slack channels. Automate discovery of high-quality content in specific tags.
Job Market Intelligence: Analyze which technologies and skills are trending based on what developers are learning and discussing. Use this to guide training programs or hiring strategies.
What You'll Learn in This Guide
This is not a theoretical guide. We'll build real, working code that:
- Fetches articles from the official API with proper authentication
- Scrapes paginated results without hitting rate limits
- Extracts comments, reactions, and engagement metrics
- Handles errors gracefully with exponential backoff
- Rotates proxies and User-Agents to avoid detection
- Stores data in SQLite for efficient querying
- Exports to CSV and JSON for analysis
- Scales to collect thousands of articles in minutes
- Respects the platform's infrastructure
3. Understanding the Dev.to/Forem API Architecture
Official API Endpoints
Dev.to exposes a well-documented REST API with the following key endpoints:
GET /api/articles # List all articles
GET /api/articles/:id # Get single article
GET /api/articles/:id/comments # Get comments on article
GET /api/users/:username # Get user profile
GET /api/users/:username/articles # Get user's articles
GET /api/tags/:name # Tag metadata
GET /api/articles/search # Search articles
Unlike many sites, Dev.to doesn't require authentication for most endpoints. However, authenticated requests get higher rate limits and access to private/draft articles if you own them.
Rate Limiting
Dev.to implements rate limiting based on IP address and API key:
- Unauthenticated: 10 requests per 10 seconds per IP
- Authenticated: 1,000 requests per hour per API key
For production scraping at scale, you'll want to:
1. Use an API key (free from your Dev.to settings)
2. Implement exponential backoff when hitting limits
3. Use residential proxies to distribute requests across multiple IPs
4. Respect the X-RateLimit-* headers returned by the API
Response Format
All API responses are JSON with a consistent structure:
[
{
"id": 1234567,
"title": "Getting Started with Rust",
"description": "A practical guide to Rust...",
"slug": "getting-started-with-rust-abcd",
"path": "/user/getting-started-with-rust-abcd",
"url": "https://dev.to/user/getting-started-with-rust-abcd",
"comments_count": 42,
"positive_reactions_count": 256,
"created_at": "2026-01-15T10:30:00Z",
"published_at": "2026-01-15T10:30:00Z",
"last_comment_at": "2026-03-20T15:45:00Z",
"user": {
"name": "Jane Developer",
"username": "janedev",
"twitter_username": "janedev",
"github_username": "janedev",
"website_url": "https://janedev.com",
"profile_image": "https://...",
"profile_image_90": "https://..."
},
"tags": ["rust", "beginners", "webdev"],
"reading_time_minutes": 8
}
]
Key fields to understand:
- slug: The URL-safe identifier for the article
- path: The relative URL on Dev.to
- tags: Array of tag strings (not objects)
- positive_reactions_count: Likes/hearts
- comments_count: Total comment threads
- reading_time_minutes: Estimated read duration
4. Setting Up Your Python Environment
Install Dependencies
Use Python 3.14 with uv for dependency management:
uv venv --python 3.14
source .venv/bin/activate
uv pip install requests httpx playwright python-dateutil aiohttp tqdm sqlite3
Create Configuration File
Store API keys and configuration in a config file rather than environment variables:
# config.py
import os
from pathlib import Path
class Config:
# Dev.to API
DEVTO_API_KEY = os.getenv("DEVTO_API_KEY", "")
DEVTO_BASE_URL = "https://dev.to/api"
# Rate limiting
RATE_LIMIT_DELAY = 1.5 # seconds between requests (unauthenticated)
AUTH_RATE_LIMIT_DELAY = 0.05 # seconds between requests (authenticated)
MAX_RETRIES = 5
RETRY_BASE_DELAY = 1 # seconds, multiplies exponentially
# Data storage
DB_PATH = Path("devto_articles.db")
OUTPUT_DIR = Path("output")
# Proxy settings (optional)
USE_PROXIES = False
PROXY_LIST = []
# Logging
LOG_LEVEL = "INFO"
LOG_FILE = "scraper.log"
# Create output directory
Config.OUTPUT_DIR.mkdir(exist_ok=True)
Initialize Logging
Proper logging is essential for debugging and monitoring long-running scrapes:
# logger.py
import logging
from config import Config
def setup_logging():
"""Configure logging to file and console."""
logger = logging.getLogger("devto_scraper")
logger.setLevel(Config.LOG_LEVEL)
# File handler
file_handler = logging.FileHandler(Config.LOG_FILE)
file_handler.setLevel(logging.DEBUG)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(Config.LOG_LEVEL)
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
5. Basic Article Scraping
Fetch Articles from All Articles Feed
The simplest approach is to fetch articles from the main feed, which includes articles from across the platform:
# scraper_basic.py
import requests
import time
from typing import List, Dict, Optional
from config import Config
from logger import logger
class DevtoScraper:
def __init__(self, api_key: Optional[str] = None):
"""
Initialize scraper with optional API key for higher rate limits.
"""
self.api_key = api_key or Config.DEVTO_API_KEY
self.base_url = Config.DEVTO_BASE_URL
self.session = requests.Session()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
}
if self.api_key:
self.headers["api-key"] = self.api_key
self.last_request_time = 0
def _apply_rate_limit(self):
"""Respect rate limits between requests."""
delay = Config.AUTH_RATE_LIMIT_DELAY if self.api_key else Config.RATE_LIMIT_DELAY
elapsed = time.time() - self.last_request_time
if elapsed < delay:
time.sleep(delay - elapsed)
def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
"""
Make HTTP request with rate limiting and retry logic.
"""
url = f"{self.base_url}{endpoint}"
params = params or {}
for attempt in range(Config.MAX_RETRIES):
self._apply_rate_limit()
try:
response = self.session.get(url, headers=self.headers, params=params, timeout=10)
self.last_request_time = time.time()
# Log rate limit info
if "X-RateLimit-Remaining" in response.headers:
remaining = response.headers.get("X-RateLimit-Remaining")
logger.debug(f"Rate limit remaining: {remaining}")
if response.status_code == 429:
# Rate limited - back off exponentially
wait_time = Config.RETRY_BASE_DELAY * (2 ** attempt)
logger.warning(f"Rate limited. Waiting {wait_time}s before retry {attempt + 1}/{Config.MAX_RETRIES}")
time.sleep(wait_time)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == Config.MAX_RETRIES - 1:
logger.error(f"Request failed after {Config.MAX_RETRIES} attempts: {e}")
raise
wait_time = Config.RETRY_BASE_DELAY * (2 ** attempt)
logger.warning(f"Request failed: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
raise Exception(f"Failed to get {url} after {Config.MAX_RETRIES} attempts")
def get_articles(self, page: int = 1, per_page: int = 30, state: str = "published") -> List[Dict]:
"""
Fetch articles from the main feed.
Args:
page: Page number (1-indexed)
per_page: Articles per page (max 1000)
state: 'published', 'all', or 'fresh'
Returns:
List of article dictionaries
"""
params = {
"page": page,
"per_page": min(per_page, 1000),
"state": state,
"order_by": "created_at"
}
logger.info(f"Fetching articles: page {page}, per_page {per_page}")
articles = self._make_request("/articles", params)
logger.info(f"Retrieved {len(articles)} articles")
return articles
def get_articles_by_tag(self, tag: str, page: int = 1, per_page: int = 30) -> List[Dict]:
"""
Fetch articles filtered by tag.
Args:
tag: Tag name (e.g., 'python', 'javascript')
page: Page number (1-indexed)
per_page: Articles per page
Returns:
List of article dictionaries
"""
params = {
"tag": tag,
"page": page,
"per_page": min(per_page, 1000),
"order_by": "created_at"
}
logger.info(f"Fetching articles with tag '{tag}': page {page}")
articles = self._make_request("/articles", params)
logger.info(f"Retrieved {len(articles)} articles with tag '{tag}'")
return articles
def get_article(self, article_id: int) -> Dict:
"""
Fetch a single article by ID.
Args:
article_id: The article's numeric ID
Returns:
Article dictionary with full content
"""
logger.info(f"Fetching article {article_id}")
article = self._make_request(f"/articles/{article_id}")
return article
def get_article_by_slug(self, username: str, slug: str) -> Dict:
"""
Fetch article by username and slug (alternative to ID).
Args:
username: Article author's username
slug: Article slug (URL-safe identifier)
Returns:
Article dictionary
"""
logger.info(f"Fetching article {username}/{slug}")
article = self._make_request(f"/articles/{username}/{slug}")
return article
# Example usage
if __name__ == "__main__":
scraper = DevtoScraper(api_key=Config.DEVTO_API_KEY)
# Get first page of articles
articles = scraper.get_articles(page=1, per_page=30)
for article in articles:
print(f"{article['title']} by {article['user']['username']}")
# Get Python articles
python_articles = scraper.get_articles_by_tag("python", page=1, per_page=50)
print(f"\nFound {len(python_articles)} Python articles")
# Get a single article's details
if articles:
article_id = articles[0]['id']
full_article = scraper.get_article(article_id)
print(f"\nFull article content:\n{full_article.get('body_html', '')[:500]}")
Understanding Pagination
Dev.to's API supports standard pagination parameters:
- page: 1-indexed page number
- per_page: Articles per page (1-1000, default 30)
The API returns an empty array when you exceed the maximum page number, so you can iterate until you get an empty response:
def fetch_all_articles_paginated(scraper, tag: str = None, max_pages: int = None):
"""
Fetch all articles from a tag, handling pagination automatically.
"""
all_articles = []
page = 1
while True:
if max_pages and page > max_pages:
break
if tag:
articles = scraper.get_articles_by_tag(tag, page=page, per_page=100)
else:
articles = scraper.get_articles(page=page, per_page=100)
if not articles:
logger.info(f"Reached end of pagination at page {page}")
break
all_articles.extend(articles)
logger.info(f"Collected {len(all_articles)} total articles")
page += 1
return all_articles
# Fetch all Python articles
python_articles = fetch_all_articles_paginated(scraper, tag="python", max_pages=50)
print(f"Total Python articles: {len(python_articles)}")
6. Scraping User Profiles and Their Articles
Fetch User Profile Data
User profiles contain valuable metadata like follower counts, join date, and social links:
# In DevtoScraper class
def get_user(self, username: str) -> Dict:
"""
Fetch user profile by username.
Args:
username: Dev.to username
Returns:
User profile dictionary
"""
logger.info(f"Fetching user profile: {username}")
user = self._make_request(f"/users/{username}")
return user
def get_user_articles(self, username: str, page: int = 1, per_page: int = 30) -> List[Dict]:
"""
Fetch all articles by a specific user.
Args:
username: Dev.to username
page: Page number (1-indexed)
per_page: Articles per page
Returns:
List of user's articles
"""
logger.info(f"Fetching articles for user {username}: page {page}")
articles = self._make_request(f"/users/{username}/articles", {
"page": page,
"per_page": min(per_page, 1000)
})
return articles
Build Author Profiles Database
For competitive intelligence or author outreach, build a database of author profiles:
def build_author_database(scraper, tag: str, output_file: str = "authors.json"):
"""
Extract all unique authors from a tag feed and fetch their profiles.
"""
import json
all_articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=100)
# Extract unique authors
authors = {}
for article in all_articles:
username = article['user']['username']
if username not in authors:
authors[username] = None
logger.info(f"Found {len(authors)} unique authors")
# Fetch full profiles
author_profiles = {}
for i, username in enumerate(authors.keys(), 1):
try:
profile = scraper.get_user(username)
author_profiles[username] = {
"name": profile.get("name"),
"username": profile.get("username"),
"bio": profile.get("bio"),
"joined": profile.get("created_at"),
"location": profile.get("location"),
"website": profile.get("website_url"),
"twitter": profile.get("twitter_username"),
"github": profile.get("github_username"),
"image_url": profile.get("profile_image")
}
logger.info(f"Fetched profile {i}/{len(authors)}: {username}")
except Exception as e:
logger.error(f"Failed to fetch profile for {username}: {e}")
# Save to file
with open(output_file, 'w') as f:
json.dump(author_profiles, f, indent=2)
logger.info(f"Saved {len(author_profiles)} author profiles to {output_file}")
return author_profiles
Example: Top Authors in Your Niche
To find influencers in a specific technology, analyze article performance by author:
def find_top_authors(scraper, tag: str, min_reactions: int = 100):
"""
Find the most engaging authors in a tag.
"""
all_articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=50)
author_stats = {}
for article in all_articles:
username = article['user']['username']
reactions = article['positive_reactions_count']
if username not in author_stats:
author_stats[username] = {
"articles": 0,
"total_reactions": 0,
"avg_reactions": 0,
"top_article": None,
"top_reactions": 0
}
author_stats[username]["articles"] += 1
author_stats[username]["total_reactions"] += reactions
if reactions > author_stats[username]["top_reactions"]:
author_stats[username]["top_article"] = article['title']
author_stats[username]["top_reactions"] = reactions
# Calculate averages and filter
for username, stats in author_stats.items():
stats["avg_reactions"] = stats["total_reactions"] / stats["articles"]
# Sort by average reactions
top_authors = sorted(
author_stats.items(),
key=lambda x: x[1]["avg_reactions"],
reverse=True
)
# Filter by minimum reactions
filtered = [
(username, stats) for username, stats in top_authors
if stats["avg_reactions"] >= min_reactions
]
logger.info(f"Found {len(filtered)} authors with avg {min_reactions}+ reactions")
for username, stats in filtered[:20]:
logger.info(
f"{username}: {stats['articles']} articles, "
f"avg {stats['avg_reactions']:.0f} reactions"
)
return dict(filtered)
7. Collecting Comments and Reactions
Fetch Article Comments
Comments provide sentiment analysis, questions, and engagement data:
# In DevtoScraper class
def get_article_comments(self, article_id: int, page: int = 1, per_page: int = 30) -> List[Dict]:
"""
Fetch comments on a specific article.
Args:
article_id: The article's numeric ID
page: Page number (1-indexed)
per_page: Comments per page
Returns:
List of comment dictionaries
"""
logger.info(f"Fetching comments for article {article_id}: page {page}")
params = {
"page": page,
"per_page": min(per_page, 1000),
}
comments = self._make_request(f"/articles/{article_id}/comments", params)
return comments
def fetch_all_article_comments(scraper, article_id: int) -> List[Dict]:
"""
Fetch all comments for an article, handling pagination.
"""
all_comments = []
page = 1
while True:
comments = scraper.get_article_comments(article_id, page=page, per_page=100)
if not comments:
logger.info(f"Reached end of comments at page {page}")
break
all_comments.extend(comments)
logger.info(f"Collected {len(all_comments)} total comments")
page += 1
return all_comments
Comment Structure and Data
Comments include:
{
"id": 12345678,
"type_of": "comment",
"id_code": "abc123",
"user": {
"name": "John Developer",
"username": "johndev",
"twitter_username": "johndev",
"github_username": "johndev",
"website_url": "https://johndev.com",
"profile_image": "https://...",
"profile_image_90": "https://..."
},
"positive_reactions_count": 15,
"created_at": "2026-03-15T10:30:00Z",
"updated_at": "2026-03-15T10:30:00Z",
"body_html": "<p>Great article! Here's what worked for me...</p>",
"children": [ # Replies to this comment
{
"id": 12345679,
"type_of": "comment",
"user": {...},
"positive_reactions_count": 5,
"created_at": "2026-03-15T11:00:00Z",
"body_html": "<p>Thanks for the feedback!</p>",
"children": []
}
]
}
Extract Comment Sentiment and Topics
Comments reveal what developers think about technologies:
import re
from html.parser import HTMLParser
class HTMLStripper(HTMLParser):
"""Remove HTML tags from text."""
def __init__(self):
super().__init__()
self.reset()
self.strict = False
self.convert_charrefs = True
self.text = []
def handle_data(self, data):
self.text.append(data)
def get_data(self):
return ''.join(self.text)
def strip_html(html):
"""Convert HTML to plain text."""
stripper = HTMLStripper()
stripper.feed(html)
return stripper.get_data()
def analyze_comment_sentiment(body_html: str) -> Dict:
"""
Basic sentiment analysis of a comment.
For production, use a library like TextBlob or transformers.
"""
text = strip_html(body_html).lower()
positive_words = ['great', 'awesome', 'excellent', 'helpful', 'love', 'thanks', 'worked']
negative_words = ['terrible', 'awful', 'hate', 'broken', 'doesn\'t work', 'buggy']
positive_count = sum(1 for word in positive_words if word in text)
negative_count = sum(1 for word in negative_words if word in text)
if positive_count > negative_count:
sentiment = "positive"
elif negative_count > positive_count:
sentiment = "negative"
else:
sentiment = "neutral"
return {
"sentiment": sentiment,
"positive_words": positive_count,
"negative_words": negative_count,
"text_length": len(text)
}
def extract_code_from_comments(comments: List[Dict]) -> List[str]:
"""
Extract code blocks from comments for training data.
"""
code_blocks = []
code_pattern = r'<code>(.*?)</code>'
def extract_from_comment(comment):
html = comment.get('body_html', '')
blocks = re.findall(code_pattern, html, re.DOTALL)
code_blocks.extend(blocks)
# Also check replies
for child in comment.get('children', []):
extract_from_comment(child)
for comment in comments:
extract_from_comment(comment)
return code_blocks
# Usage
comments = fetch_all_article_comments(scraper, article_id=123456)
for comment in comments:
sentiment = analyze_comment_sentiment(comment['body_html'])
print(f"User {comment['user']['username']}: {sentiment['sentiment']}")
code_blocks = extract_code_from_comments(comments)
print(f"Extracted {len(code_blocks)} code blocks from comments")
8. Handling Pagination and Scale
The 34-Page Pagination Cap
Dev.to's API has a practical limitation: you can paginate up to around page 34 before the API stops returning results for some endpoints. This is due to how the platform handles large offsets.
Workaround strategies:
- Use date filtering: Request articles created after a specific date rather than paginating indefinitely
- Split by tag: Distribute pagination across multiple tags
- Use multiple IP addresses: With proxies, you can collect from parallel requests
- Scrape HTML: For complete archives, fallback to HTML scraping
Fetch All Articles with Date-Based Pagination
from datetime import datetime, timedelta
def fetch_articles_by_date_range(
scraper,
tag: str = None,
start_date: datetime = None,
end_date: datetime = None,
batch_size: int = 100
) -> List[Dict]:
"""
Fetch articles using date filtering to work around pagination limits.
Args:
scraper: DevtoScraper instance
tag: Optional tag to filter by
start_date: Earliest article date
end_date: Latest article date
batch_size: Articles per request
Returns:
List of all articles in date range
"""
all_articles = []
current_date = start_date or (datetime.now() - timedelta(days=365))
end_date = end_date or datetime.now()
while current_date < end_date:
page = 1
found_any = False
while True:
if tag:
articles = scraper.get_articles_by_tag(tag, page=page, per_page=batch_size)
else:
articles = scraper.get_articles(page=page, per_page=batch_size)
if not articles:
break
# Filter by date range
filtered = [
a for a in articles
if datetime.fromisoformat(a['published_at'].replace('Z', '+00:00')) >= current_date
and datetime.fromisoformat(a['published_at'].replace('Z', '+00:00')) <= end_date
]
if filtered:
all_articles.extend(filtered)
found_any = True
logger.info(f"Found {len(filtered)} articles on {current_date.date()}")
page += 1
# Stop if we've gone through pages without finding matching dates
if page > 50:
break
# Move to next day
current_date += timedelta(days=1)
logger.info(f"Total articles in range: {len(all_articles)}")
return all_articles
Distributed Collection with Proxies
For very large-scale collection, distribute requests across multiple IPs using ThorData residential proxies. This lets you maintain multiple sessions with independent rate limit budgets.
ThorData is the gold standard for this: their residential proxy network provides real IP addresses from actual devices, making detection nearly impossible. Use the affiliate link https://thordata.partnerstack.com/partner/0a0x4nzh to set up your account.
class DistributedScraper:
"""
Scrape using multiple proxies to parallelize collection.
"""
def __init__(self, proxy_list: List[str]):
"""
Args:
proxy_list: List of proxy URLs (e.g., from ThorData)
"""
self.scrapers = []
for proxy in proxy_list:
scraper = DevtoScraper()
scraper.session.proxies = {
'http': proxy,
'https': proxy
}
self.scrapers.append(scraper)
logger.info(f"Initialized {len(self.scrapers)} scraper instances")
def get_articles_distributed(self, tags: List[str], articles_per_scraper: int = 500):
"""
Distribute article collection across multiple proxies.
"""
all_articles = []
tag_batches = [tags[i::len(self.scrapers)] for i in range(len(self.scrapers))]
for scraper, tag_batch in zip(self.scrapers, tag_batches):
for tag in tag_batch:
articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=50)
all_articles.extend(articles)
logger.info(f"Scraper {id(scraper)}: collected {len(articles)} articles for tag {tag}")
logger.info(f"Total collected: {len(all_articles)} articles")
return all_articles
9. HTML Scraping for Advanced Content
When to Use HTML Scraping
The API doesn't expose everything. For complete data collection, you'll need HTML scraping:
- Reading lists: Curated article collections
- Following/Followers: User network graphs
- User analytics: View counts, comments count
- Internal metadata: Scheduling info, draft status
- Historical data: Older articles with API availability issues
Using BeautifulSoup for HTML Extraction
from bs4 import BeautifulSoup
import requests
class DevtoHTMLScraper:
"""
Scrape Dev.to HTML pages for data not available in API.
"""
def __init__(self):
self.session = requests.Session()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
def get_reading_list(self, username: str, list_slug: str) -> List[Dict]:
"""
Scrape a user's reading list.
Args:
username: Dev.to username
list_slug: Reading list slug from URL
Returns:
List of article metadata from the reading list
"""
url = f"https://dev.to/{username}/readinglist/{list_slug}"
logger.info(f"Scraping reading list: {url}")
response = self.session.get(url, headers=self.headers)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
articles = []
# Find all article cards in the reading list
for card in soup.find_all('article', class_='crayons-card'):
title = card.find('h2', class_='crayons-card__title')
author = card.find('span', class_='crayons-card__author')
link = card.find('a', class_='crayons-card__link')
if title and link:
articles.append({
"title": title.get_text(strip=True),
"author": author.get_text(strip=True) if author else "Unknown",
"url": link.get('href'),
"slug": link.get('href').split('/')[-1] if link.get('href') else None
})
logger.info(f"Extracted {len(articles)} articles from reading list")
return articles
def get_follower_list(self, username: str) -> List[Dict]:
"""
Scrape a user's followers.
"""
url = f"https://dev.to/{username}/followers"
logger.info(f"Scraping followers: {url}")
response = self.session.get(url, headers=self.headers)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
followers = []
# Find all follower cards
for card in soup.find_all('div', class_='profile-card'):
username_elem = card.find('a', class_='profile-card__link')
if username_elem:
followers.append({
"username": username_elem.get_text(strip=True),
"profile_url": username_elem.get('href')
})
logger.info(f"Extracted {len(followers)} followers")
return followers
def get_user_articles_page(self, username: str) -> Dict:
"""
Scrape a user's articles page for metadata not in API.
"""
url = f"https://dev.to/{username}"
logger.info(f"Scraping user page: {url}")
response = self.session.get(url, headers=self.headers)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Extract user stats
stats = {}
stat_elements = soup.find_all('span', class_='relative-time')
# Find article count
articles_section = soup.find('section', class_='crayons-card')
if articles_section:
article_items = articles_section.find_all('article')
stats['article_count_visible'] = len(article_items)
logger.info(f"Extracted user stats: {stats}")
return stats
10. Using Playwright for JavaScript-Rendered Content
When BeautifulSoup Isn't Enough
Some Dev.to pages load content dynamically via JavaScript. For these cases, use Playwright to render pages like a real browser:
from playwright.async_api import async_playwright, Page
import asyncio
class DevtoPlaywrightScraper:
"""
Use Playwright to scrape JavaScript-rendered content.
"""
async def scrape_with_browser(self, url: str) -> str:
"""
Load URL in headless browser and return full HTML.
"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
# Use stealth plugins to avoid detection
page = await browser.new_page(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
)
logger.info(f"Loading {url} with Playwright")
await page.goto(url, wait_until="networkidle")
# Wait for content to render
await page.wait_for_timeout(2000)
content = await page.content()
await browser.close()
return content
async def scrape_dynamic_feed(self, username: str):
"""
Scrape a user's feed that loads dynamically.
"""
url = f"https://dev.to/{username}"
content = await self.scrape_with_browser(url)
soup = BeautifulSoup(content, 'html.parser')
articles = []
for article in soup.find_all('article'):
title = article.find('h2')
if title:
articles.append({
"title": title.get_text(strip=True),
"html": str(article)
})
logger.info(f"Scraped {len(articles)} articles from {username}")
return articles
async def get_infinite_scroll_content(self, url: str, scroll_count: int = 5):
"""
Scroll page multiple times to load infinite-scroll content.
"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto(url, wait_until="networkidle")
# Scroll to load more content
for i in range(scroll_count):
logger.info(f"Scroll {i + 1}/{scroll_count}")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(1000)
content = await page.content()
await browser.close()
return content
# Usage
async def example():
scraper = DevtoPlaywrightScraper()
articles = await scraper.scrape_dynamic_feed("some_username")
print(f"Found {len(articles)} articles")
# Run async function
# asyncio.run(example())
11. Building a Complete Data Pipeline
Architecture Overview
A production scraper has these components:
┌─────────────┐
│ Fetcher │ Fetch raw data from API/HTML
└──────┬──────┘
│
┌──────▼──────────┐
│ Parser │ Extract structured data
└──────┬──────────┘
│
┌──────▼──────────┐
│ Deduplication │ Remove duplicates
└──────┬──────────┘
│
┌──────▼──────────┐
│ Validation │ Check data quality
└──────┬──────────┘
│
┌──────▼──────────┐
│ Storage │ Save to database
└─────────────────┘
Build the Pipeline Class
import sqlite3
from dataclasses import dataclass, asdict
from typing import Optional
import hashlib
@dataclass
class Article:
"""Represents a Dev.to article."""
id: int
title: str
slug: str
description: str
url: str
author: str
created_at: str
published_at: str
updated_at: str
comments_count: int
reactions_count: int
reading_time: int
tags: str # JSON string
body_html: Optional[str] = None
body_markdown: Optional[str] = None
def content_hash(self) -> str:
"""Generate hash of article content for deduplication."""
content = f"{self.id}:{self.title}:{self.author}".encode()
return hashlib.sha256(content).hexdigest()
class DataPipeline:
"""
End-to-end pipeline for scraping, processing, and storing articles.
"""
def __init__(self, db_path: str = "devto_articles.db"):
self.db_path = db_path
self.conn = None
self.init_database()
def init_database(self):
"""Create database schema."""
self.conn = sqlite3.connect(self.db_path)
cursor = self.conn.cursor()
# Articles table
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL,
description TEXT,
url TEXT UNIQUE NOT NULL,
author TEXT NOT NULL,
created_at TEXT,
published_at TEXT,
updated_at TEXT,
comments_count INTEGER,
reactions_count INTEGER,
reading_time INTEGER,
tags TEXT,
body_html TEXT,
body_markdown TEXT,
content_hash TEXT UNIQUE,
scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Comments table
cursor.execute('''
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY,
article_id INTEGER NOT NULL,
user TEXT NOT NULL,
body_html TEXT,
body_text TEXT,
reactions_count INTEGER,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (article_id) REFERENCES articles(id)
)
''')
# Tags index for fast queries
cursor.execute('''
CREATE TABLE IF NOT EXISTS article_tags (
article_id INTEGER NOT NULL,
tag TEXT NOT NULL,
PRIMARY KEY (article_id, tag),
FOREIGN KEY (article_id) REFERENCES articles(id)
)
''')
self.conn.commit()
logger.info("Database initialized")
def parse_article(self, raw_article: Dict) -> Article:
"""
Convert raw API response to Article dataclass.
"""
return Article(
id=raw_article['id'],
title=raw_article['title'],
slug=raw_article['slug'],
description=raw_article.get('description', ''),
url=raw_article.get('url', ''),
author=raw_article['user']['username'],
created_at=raw_article['created_at'],
published_at=raw_article.get('published_at', ''),
updated_at=raw_article.get('updated_at', ''),
comments_count=raw_article.get('comments_count', 0),
reactions_count=raw_article.get('positive_reactions_count', 0),
reading_time=raw_article.get('reading_time_minutes', 0),
tags=','.join(raw_article.get('tags', [])),
body_html=raw_article.get('body_html'),
body_markdown=raw_article.get('body_markdown')
)
def deduplicate(self, article: Article) -> bool:
"""
Check if article already exists in database.
Returns True if duplicate, False if new.
"""
cursor = self.conn.cursor()
cursor.execute('SELECT id FROM articles WHERE id = ?', (article.id,))
return cursor.fetchone() is not None
def validate_article(self, article: Article) -> bool:
"""
Validate article has required fields.
"""
required_fields = ['id', 'title', 'author']
for field in required_fields:
if not getattr(article, field, None):
logger.warning(f"Article missing {field}: {article.id}")
return False
return True
def store_article(self, article: Article):
"""
Save article to database.
"""
cursor = self.conn.cursor()
try:
cursor.execute('''
INSERT INTO articles (
id, title, slug, description, url, author,
created_at, published_at, updated_at,
comments_count, reactions_count, reading_time,
tags, body_html, body_markdown, content_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
article.id, article.title, article.slug, article.description,
article.url, article.author, article.created_at, article.published_at,
article.updated_at, article.comments_count, article.reactions_count,
article.reading_time, article.tags, article.body_html,
article.body_markdown, article.content_hash()
))
# Store individual tags for fast querying
for tag in article.tags.split(','):
tag = tag.strip()
if tag:
cursor.execute('''
INSERT OR IGNORE INTO article_tags (article_id, tag)
VALUES (?, ?)
''', (article.id, tag))
self.conn.commit()
logger.info(f"Stored article: {article.title}")
except sqlite3.IntegrityError as e:
logger.debug(f"Article already exists: {article.id}")
def process_articles(self, raw_articles: List[Dict]) -> int:
"""
Process list of raw articles through the pipeline.
Returns count of new articles stored.
"""
stored_count = 0
for raw_article in raw_articles:
# Parse
article = self.parse_article(raw_article)
# Validate
if not self.validate_article(article):
continue
# Deduplicate
if self.deduplicate(article):
logger.debug(f"Duplicate: {article.id}")
continue
# Store
self.store_article(article)
stored_count += 1
return stored_count
def query_articles(self, tag: str = None, limit: int = 100) -> List[Dict]:
"""
Query stored articles.
"""
cursor = self.conn.cursor()
if tag:
cursor.execute('''
SELECT a.* FROM articles a
JOIN article_tags t ON a.id = t.article_id
WHERE t.tag = ?
LIMIT ?
''', (tag, limit))
else:
cursor.execute('SELECT * FROM articles LIMIT ?', (limit,))
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def close(self):
"""Close database connection."""
if self.conn:
self.conn.close()
# Usage
pipeline = DataPipeline()
scraper = DevtoScraper()
# Fetch articles
articles = scraper.get_articles_by_tag("python", page=1, per_page=100)
# Process through pipeline
stored = pipeline.process_articles(articles)
logger.info(f"Stored {stored} new articles")
# Query results
python_articles = pipeline.query_articles(tag="python")
logger.info(f"Total Python articles in database: {len(python_articles)}")
pipeline.close()
12. Storage: SQLite Schema and Examples
Complete Schema
-- Main articles table
CREATE TABLE articles (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL,
description TEXT,
url TEXT UNIQUE NOT NULL,
author TEXT NOT NULL,
created_at TEXT,
published_at TEXT,
updated_at TEXT,
comments_count INTEGER DEFAULT 0,
reactions_count INTEGER DEFAULT 0,
reading_time INTEGER DEFAULT 0,
tags TEXT, -- comma-separated
body_html TEXT,
body_markdown TEXT,
content_hash TEXT UNIQUE,
scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
);
-- Create indexes for fast queries
CREATE INDEX idx_articles_author ON articles(author);
CREATE INDEX idx_articles_created_at ON articles(created_at);
CREATE INDEX idx_articles_reactions ON articles(reactions_count);
CREATE INDEX idx_articles_comments ON articles(comments_count);
-- Normalized tags table for efficient filtering
CREATE TABLE article_tags (
article_id INTEGER NOT NULL,
tag TEXT NOT NULL,
PRIMARY KEY (article_id, tag),
FOREIGN KEY (article_id) REFERENCES articles(id) ON DELETE CASCADE
);
CREATE INDEX idx_article_tags_tag ON article_tags(tag);
-- Comments storage
CREATE TABLE comments (
id INTEGER PRIMARY KEY,
article_id INTEGER NOT NULL,
user TEXT NOT NULL,
body_html TEXT,
body_text TEXT,
reactions_count INTEGER DEFAULT 0,
created_at TEXT,
updated_at TEXT,
FOREIGN KEY (article_id) REFERENCES articles(id) ON DELETE CASCADE
);
CREATE INDEX idx_comments_article ON comments(article_id);
CREATE INDEX idx_comments_user ON comments(user);
-- Audit trail for monitoring scrapes
CREATE TABLE scrape_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT DEFAULT CURRENT_TIMESTAMP,
completed_at TEXT,
articles_fetched INTEGER,
articles_stored INTEGER,
errors INTEGER,
status TEXT
);
Useful SQLite Queries
def get_top_articles(pipeline, tag: str = None, days: int = 30) -> List[Dict]:
"""Get most popular articles."""
cursor = pipeline.conn.cursor()
if tag:
query = '''
SELECT a.* FROM articles a
JOIN article_tags t ON a.id = t.article_id
WHERE t.tag = ?
AND datetime(a.published_at) > datetime('now', '-' || ? || ' days')
ORDER BY a.reactions_count DESC
LIMIT 50
'''
params = (tag, days)
else:
query = '''
SELECT * FROM articles
WHERE datetime(published_at) > datetime('now', '-' || ? || ' days')
ORDER BY reactions_count DESC
LIMIT 50
'''
params = (days,)
cursor.execute(query, params)
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def get_prolific_authors(pipeline, min_articles: int = 5) -> List[Dict]:
"""Find authors with most articles."""
cursor = pipeline.conn.cursor()
cursor.execute('''
SELECT
author,
COUNT(*) as article_count,
AVG(reactions_count) as avg_reactions,
SUM(reactions_count) as total_reactions,
MAX(published_at) as latest_article
FROM articles
GROUP BY author
HAVING COUNT(*) >= ?
ORDER BY article_count DESC
LIMIT 100
''', (min_articles,))
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def get_trending_topics(pipeline, days: int = 7) -> List[Dict]:
"""Find tags with most articles in recent period."""
cursor = pipeline.conn.cursor()
cursor.execute('''
SELECT
tag,
COUNT(*) as article_count,
AVG(a.reactions_count) as avg_reactions,
SUM(a.reactions_count) as total_reactions
FROM article_tags t
JOIN articles a ON t.article_id = a.id
WHERE datetime(a.published_at) > datetime('now', '-' || ? || ' days')
GROUP BY tag
ORDER BY article_count DESC
LIMIT 50
''', (days,))
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
13. Exporting to CSV and JSON
JSON Export with Nested Structure
import json
from datetime import datetime
def export_to_json(pipeline, tag: str = None, output_file: str = "articles.json"):
"""
Export articles to JSON with full nested structure.
"""
articles = pipeline.query_articles(tag=tag, limit=999999)
# Add related comments
for article in articles:
cursor = pipeline.conn.cursor()
cursor.execute('SELECT * FROM comments WHERE article_id = ?', (article['id'],))
columns = [description[0] for description in cursor.description]
article['comments'] = [
dict(zip(columns, row)) for row in cursor.fetchall()
]
# Make datetime serializable
def json_serializer(obj):
if isinstance(obj, (datetime,)):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
with open(output_file, 'w') as f:
json.dump(articles, f, indent=2, default=json_serializer)
logger.info(f"Exported {len(articles)} articles to {output_file}")
return output_file
# Usage
output_file = export_to_json(pipeline, tag="python")
CSV Export with Flattening
import csv
def export_to_csv(pipeline, tag: str = None, output_file: str = "articles.csv"):
"""
Export articles to CSV with flattened structure.
"""
articles = pipeline.query_articles(tag=tag, limit=999999)
if not articles:
logger.warning("No articles to export")
return
# Define CSV columns
fieldnames = [
'id', 'title', 'author', 'url', 'created_at', 'published_at',
'comments_count', 'reactions_count', 'reading_time',
'tags', 'description'
]
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for article in articles:
row = {field: article.get(field) for field in fieldnames}
writer.writerow(row)
logger.info(f"Exported {len(articles)} articles to {output_file}")
return output_file
# Usage
csv_file = export_to_csv(pipeline, tag="python")
# Export with comments in separate file
def export_comments_to_csv(pipeline, output_file: str = "comments.csv"):
"""
Export all comments to CSV.
"""
cursor = pipeline.conn.cursor()
cursor.execute('SELECT * FROM comments ORDER BY article_id')
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
for row in rows:
writer.writerow(dict(zip(columns, row)))
logger.info(f"Exported {len(rows)} comments to {output_file}")
return output_file
14. Rate Limiting and Retry Strategies
Exponential Backoff Implementation
import time
import random
class RateLimiter:
"""
Intelligent rate limiting with exponential backoff.
"""
def __init__(self, base_delay: float = 1.0, max_delay: float = 60.0):
self.base_delay = base_delay
self.max_delay = max_delay
self.retry_count = 0
def wait_before_retry(self, attempt: int):
"""
Calculate exponential backoff with jitter.
"""
# Exponential backoff: 1s, 2s, 4s, 8s, etc.
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
# Add jitter (±10%) to prevent thundering herd
jitter = delay * 0.1 * random.uniform(-1, 1)
total_delay = delay + jitter
logger.warning(f"Backing off for {total_delay:.1f}s (attempt {attempt + 1})")
time.sleep(total_delay)
def check_rate_limit_headers(self, response):
"""
Extract rate limit info from response headers.
"""
headers = response.headers
remaining = int(headers.get('X-RateLimit-Remaining', 0))
limit = int(headers.get('X-RateLimit-Limit', 0))
reset = headers.get('X-RateLimit-Reset', 0)
return {
'remaining': remaining,
'limit': limit,
'reset': reset,
'percentage': (remaining / limit * 100) if limit > 0 else 0
}
def request_with_backoff(
session,
method: str,
url: str,
max_retries: int = 5,
rate_limiter: RateLimiter = None,
**kwargs
) -> requests.Response:
"""
Make HTTP request with intelligent retries.
"""
if rate_limiter is None:
rate_limiter = RateLimiter()
for attempt in range(max_retries):
try:
response = session.request(method, url, timeout=10, **kwargs)
# Check rate limits
limits = rate_limiter.check_rate_limit_headers(response)
logger.debug(f"Rate limit: {limits['remaining']}/{limits['limit']} remaining")
if limits['percentage'] < 10:
logger.warning(f"Approaching rate limit: {limits['percentage']:.1f}%")
# Handle rate limit response
if response.status_code == 429:
rate_limiter.wait_before_retry(attempt)
continue
# Handle server errors (5xx)
if 500 <= response.status_code < 600:
if attempt < max_retries - 1:
rate_limiter.wait_before_retry(attempt)
continue
response.raise_for_status()
return response
except requests.exceptions.Timeout:
logger.warning(f"Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
rate_limiter.wait_before_retry(attempt)
continue
raise
except requests.exceptions.ConnectionError as e:
logger.warning(f"Connection error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
rate_limiter.wait_before_retry(attempt)
continue
raise
raise Exception(f"Request failed after {max_retries} attempts")
15. Proxy Rotation with ThorData
For large-scale scraping without getting blocked, you need rotating residential proxies. ThorData (https://thordata.partnerstack.com/partner/0a0x4nzh) is the ideal choice because:
- Real residential IPs: Actual device IPs, not datacenter proxies that Dev.to easily blocks
- 99.9% uptime: Reliable for long-running scrapes
- City-level targeting: Rotate through different geographic regions
- Unlimited bandwidth: No surprises mid-scrape
- API for dynamic rotation: Automatically get new IPs without code changes
ThorData Integration
class ThorDataProxyRotator:
"""
Rotate through ThorData residential proxies.
"""
def __init__(self, username: str, password: str, port: int = 10000):
"""
Args:
username: ThorData username
password: ThorData password
port: Proxy port (default 10000)
"""
self.username = username
self.password = password
self.port = port
self.gateway = "proxy.thordata.com"
self.current_session = None
def get_proxy_url(self, session_id: str = None, country: str = None) -> str:
"""
Generate proxy URL with optional session and country.
Args:
session_id: Unique session ID for sticky IP
country: Country code for geo-targeting (e.g., 'US', 'GB')
Returns:
Proxy URL for use in requests
"""
# Build proxy URL with ThorData format
proxy_url = f"http://{self.username}:{self.password}@{self.gateway}:{self.port}"
if session_id:
proxy_url += f"?session={session_id}"
if country:
proxy_url += f"&country={country}"
return proxy_url
def rotate_session(self):
"""
Generate new session ID to get different IP.
"""
import uuid
self.current_session = str(uuid.uuid4())
return self.current_session
def get_current_ip(self, proxy_url: str) -> str:
"""
Check current IP through the proxy.
"""
try:
response = requests.get(
'https://api.ipify.org?format=json',
proxies={'https': proxy_url},
timeout=10
)
return response.json()['ip']
except Exception as e:
logger.error(f"Failed to get IP: {e}")
return None
class RotatingProxyScraper(DevtoScraper):
"""
Scraper that rotates through ThorData proxies.
"""
def __init__(self, proxy_rotator: ThorDataProxyRotator, api_key: str = None):
super().__init__(api_key)
self.proxy_rotator = proxy_rotator
self.requests_per_session = 50 # Rotate IP after N requests
self.request_count = 0
def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
"""
Make request through rotating proxy.
"""
# Rotate proxy periodically
if self.request_count % self.requests_per_session == 0:
session_id = self.proxy_rotator.rotate_session()
proxy_url = self.proxy_rotator.get_proxy_url(session_id=session_id)
self.session.proxies = {
'http': proxy_url,
'https': proxy_url
}
ip = self.proxy_rotator.get_current_ip(proxy_url)
logger.info(f"Rotated to new IP: {ip}")
self.request_count += 1
# Call parent implementation
return super()._make_request(endpoint, params)
# Usage
proxy_rotator = ThorDataProxyRotator(
username="your_thordata_username",
password="your_thordata_password"
)
scraper = RotatingProxyScraper(proxy_rotator, api_key=Config.DEVTO_API_KEY)
# Now scraper rotates IPs automatically
articles = scraper.get_articles(page=1, per_page=100)
Large-Scale Collection with Multiple Proxies
def scrape_with_proxy_pool(
proxy_rotator: ThorDataProxyRotator,
tags: List[str],
concurrent_scrapers: int = 3
):
"""
Scrape using multiple concurrent sessions with different proxies.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
def scrape_tag(tag: str, scraper_id: int):
"""Scrape all articles for a tag."""
scraper = RotatingProxyScraper(proxy_rotator, api_key=Config.DEVTO_API_KEY)
logger.info(f"Scraper {scraper_id}: Starting tag {tag}")
articles = fetch_all_articles_paginated(scraper, tag=tag, max_pages=100)
logger.info(f"Scraper {scraper_id}: Collected {len(articles)} for {tag}")
return tag, articles
all_articles = {}
# Distribute tags across scraper pool
with ThreadPoolExecutor(max_workers=concurrent_scrapers) as executor:
futures = []
for i, tag in enumerate(tags):
scraper_id = i % concurrent_scrapers
future = executor.submit(scrape_tag, tag, scraper_id)
futures.append(future)
for future in as_completed(futures):
tag, articles = future.result()
all_articles[tag] = articles
total = sum(len(a) for a in all_articles.values())
logger.info(f"Total articles collected: {total}")
return all_articles
16. Anti-Detection and Stealth Techniques
User-Agent Rotation
import random
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
]
class StealthSession(requests.Session):
"""
Requests session with anti-detection features.
"""
def __init__(self):
super().__init__()
self.rotate_user_agent()
self._request_count = 0
def rotate_user_agent(self):
"""Set random User-Agent."""
user_agent = random.choice(USER_AGENTS)
self.headers.update({
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
logger.debug(f"Rotated User-Agent: {user_agent[:50]}...")
def request(self, method, url, **kwargs):
"""Override request to add anti-detection features."""
# Rotate User-Agent occasionally
if self._request_count % 10 == 0:
self.rotate_user_agent()
# Add realistic delays
if self._request_count > 0:
delay = random.uniform(0.5, 2.0)
time.sleep(delay)
self._request_count += 1
return super().request(method, url, **kwargs)
# Use it
session = StealthSession()
response = session.get('https://dev.to/api/articles')
Request Timing and Fingerprinting
class AntiDetectionSession(StealthSession):
"""
Enhanced session with fingerprint management.
"""
def __init__(self):
super().__init__()
self.request_times = []
self.min_delay_between_requests = 0.5
self.max_delay_between_requests = 3.0
def _get_realistic_delay(self) -> float:
"""
Calculate realistic delay that doesn't look like a bot.
Real users don't make requests at exact intervals.
This generates variable delays with some patterns.
"""
# Most requests within 1-2s, some slower ones
if random.random() < 0.8:
delay = random.uniform(self.min_delay_between_requests, 1.5)
else:
delay = random.uniform(2.0, self.max_delay_between_requests)
return delay
def request(self, method, url, **kwargs):
"""Make request with human-like timing."""
# Add delay before request
if self.request_times:
actual_delay = self._get_realistic_delay()
time.sleep(actual_delay)
self.request_times.append(time.time())
# Rotate headers every 20 requests
if len(self.request_times) % 20 == 0:
self.rotate_user_agent()
return super().request(method, url, **kwargs)
# Don't make parallel requests from same IP
# It's obvious bot behavior to API servers
# Use ThorData proxies instead to distribute load
Session Management
class PersistentSession:
"""
Maintain consistent session identity for longer periods.
"""
def __init__(self, session_id: str = None):
self.session_id = session_id or self._generate_session_id()
self.session = AntiDetectionSession()
self.cookies = requests.cookies.RequestsCookieJar()
def _generate_session_id(self) -> str:
"""Generate realistic session ID."""
import uuid
return str(uuid.uuid4())
def get_request_headers(self) -> Dict:
"""Get headers that maintain consistent identity."""
return {
'User-Agent': self.session.headers.get('User-Agent'),
'Accept': self.session.headers.get('Accept'),
'Accept-Language': self.session.headers.get('Accept-Language'),
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'https://dev.to/',
}
def request(self, method: str, url: str, **kwargs) -> requests.Response:
"""Make request maintaining session identity."""
headers = self.get_request_headers()
headers.update(kwargs.pop('headers', {}))
return self.session.request(
method, url,
headers=headers,
cookies=self.cookies,
**kwargs
)
17. Async Scraping with httpx
For high-performance scraping, use async requests with httpx instead of blocking requests:
import httpx
import asyncio
from asyncio import Semaphore
class AsyncDevtoScraper:
"""
Async scraper using httpx for high throughput.
"""
def __init__(self, api_key: str = None, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = "https://dev.to/api"
self.max_concurrent = max_concurrent
self.semaphore = Semaphore(max_concurrent)
self.rate_limiter = RateLimiter()
async def _request(self, client: httpx.AsyncClient, endpoint: str, params: Dict = None):
"""Make async request with rate limiting."""
async with self.semaphore:
url = f"{self.base_url}{endpoint}"
headers = {
'User-Agent': random.choice(USER_AGENTS),
'api-key': self.api_key
} if self.api_key else {'User-Agent': random.choice(USER_AGENTS)}
await asyncio.sleep(random.uniform(0.5, 1.5)) # Rate limiting
response = await client.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
async def get_articles_bulk(self, tags: List[str]) -> Dict[str, List[Dict]]:
"""
Fetch articles for multiple tags concurrently.
"""
async with httpx.AsyncClient() as client:
tasks = []
for tag in tags:
task = self._request(client, '/articles', {'tag': tag, 'per_page': 100})
tasks.append((tag, task))
results = {}
for tag, task in tasks:
articles = await task
results[tag] = articles
logger.info(f"Fetched {len(articles)} articles for {tag}")
return results
async def fetch_multiple_articles(self, article_ids: List[int]) -> List[Dict]:
"""
Fetch multiple articles in parallel.
"""
async with httpx.AsyncClient() as client:
tasks = [
self._request(client, f'/articles/{aid}')
for aid in article_ids
]
articles = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
valid_articles = [a for a in articles if not isinstance(a, Exception)]
logger.info(f"Fetched {len(valid_articles)} articles")
return valid_articles
# Usage
async def main():
scraper = AsyncDevtoScraper(api_key=Config.DEVTO_API_KEY)
tags = ['python', 'javascript', 'rust', 'golang', 'devops']
results = await scraper.get_articles_bulk(tags)
for tag, articles in results.items():
print(f"{tag}: {len(articles)} articles")
# Run
# asyncio.run(main())
18. Scheduling Recurring Scrapes
Using APScheduler for Regular Collection
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import atexit
class ScheduledScraper:
"""
Schedule recurring scrapes to track trending topics over time.
"""
def __init__(self, pipeline: DataPipeline):
self.pipeline = pipeline
self.scheduler = BackgroundScheduler()
self.running = False
def scrape_job(self, tags: List[str]):
"""Job that runs on schedule."""
logger.info(f"Starting scheduled scrape for tags: {tags}")
try:
scraper = DevtoScraper(api_key=Config.DEVTO_API_KEY)
for tag in tags:
articles = scraper.get_articles_by_tag(tag, page=1, per_page=100)
stored = self.pipeline.process_articles(articles)
logger.info(f"Tag {tag}: stored {stored} new articles")
except Exception as e:
logger.error(f"Scrape job failed: {e}")
def start(self, tags: List[str], cron_expression: str = "0 */4 * * *"):
"""
Start scheduler.
Args:
tags: Tags to monitor
cron_expression: Cron schedule (default: every 4 hours)
"""
self.scheduler.add_job(
self.scrape_job,
CronTrigger.from_crontab(cron_expression),
args=[tags],
id='devto_scraper'
)
self.scheduler.start()
self.running = True
logger.info(f"Scheduler started (cron: {cron_expression})")
# Stop scheduler on exit
atexit.register(self.stop)
def stop(self):
"""Stop scheduler."""
if self.running:
self.scheduler.shutdown()
self.running = False
logger.info("Scheduler stopped")
# Usage
pipeline = DataPipeline()
scheduled = ScheduledScraper(pipeline)
# Run scrapes every 4 hours for these tags
tags_to_monitor = ['python', 'javascript', 'rust', 'devops', 'ai']
scheduled.start(tags_to_monitor, cron_expression="0 */4 * * *")
# Keep the script running
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduled.stop()
pipeline.close()
Cron Jobs for Server Deployment
For production on your frog3 server, use crontab:
# Edit crontab
crontab -e
# Add job to run scraper every 4 hours
0 */4 * * * cd /path/to/scraper && python3 scheduled_scraper.py >> logs/cron.log 2>&1
# Run scraper twice daily and export CSV
0 6,18 * * * cd /path/to/scraper && python3 scraper.py && python3 export.py
# Weekly export to JSON for backups
0 0 * * 0 cd /path/to/scraper && python3 export_all.py
19. Real-World Use Cases and Business Applications
Use Case 1: Content Gap Analysis for Your Blog
def analyze_content_gaps(pipeline, your_tags: List[str], competitor_tags: List[str]):
"""
Find topics competitors cover that you don't.
"""
your_topics = set()
for tag in your_tags:
articles = pipeline.query_articles(tag=tag, limit=999999)
for article in articles:
# Extract keywords from title
words = article['title'].lower().split()
your_topics.update(words)
competitor_topics = set()
for tag in competitor_tags:
articles = pipeline.query_articles(tag=tag, limit=999999)
for article in articles:
words = article['title'].lower().split()
competitor_topics.update(words)
gaps = competitor_topics - your_topics
logger.info(f"Content gaps (topics you're missing): {list(gaps)[:20]}")
return gaps
Use Case 2: Author Outreach for Collaborations
def identify_collaboration_partners(pipeline, tag: str, min_followers: int = 1000):
"""
Find authors to reach out to for guest posts or interviews.
"""
articles = pipeline.query_articles(tag=tag, limit=999999)
authors = {}
for article in articles:
author = article['author']
if author not in authors:
authors[author] = {
'articles': 0,
'total_reactions': 0,
'avg_reactions': 0
}
authors[author]['articles'] += 1
authors[author]['total_reactions'] += article['reactions_count']
# Calculate engagement
for author, stats in authors.items():
stats['avg_reactions'] = stats['total_reactions'] / stats['articles']
# Sort by engagement
top_authors = sorted(
authors.items(),
key=lambda x: x[1]['avg_reactions'],
reverse=True
)[:20]
# Get contact info (requires HTML scraping)
scraper = DevtoHTMLScraper()
for author, stats in top_authors:
try:
user_page = scraper.get_user_articles_page(author)
logger.info(f"{author}: {stats['articles']} articles, {stats['avg_reactions']:.0f} avg reactions")
except Exception as e:
logger.warning(f"Could not fetch profile for {author}: {e}")
return top_authors
Use Case 3: Trend Prediction
def detect_emerging_technologies(pipeline, days_back: int = 90):
"""
Find technologies with accelerating mention growth.
"""
from datetime import datetime, timedelta
import math
cursor = pipeline.conn.cursor()
# Count mentions per week
cursor.execute('''
SELECT
tag,
DATE(a.published_at) as date,
COUNT(*) as count
FROM article_tags t
JOIN articles a ON t.article_id = a.id
WHERE datetime(a.published_at) > datetime('now', '-' || ? || ' days')
GROUP BY tag, DATE(a.published_at)
ORDER BY tag, date
''', (days_back,))
rows = cursor.fetchall()
# Calculate growth rate
tag_growth = {}
for row in rows:
tag, date, count = row
if tag not in tag_growth:
tag_growth[tag] = []
tag_growth[tag].append((date, count))
# Find technologies with positive momentum
emerging = []
for tag, counts in tag_growth.items():
if len(counts) < 2:
continue
early_count = sum(c for _, c in counts[:len(counts)//2])
recent_count = sum(c for _, c in counts[len(counts)//2:])
if early_count > 0:
growth_rate = (recent_count - early_count) / early_count
if growth_rate > 0.5: # 50% growth
emerging.append((tag, growth_rate, recent_count))
# Sort by growth rate
emerging.sort(key=lambda x: x[1], reverse=True)
logger.info(f"Emerging technologies (top 10):")
for tag, growth, count in emerging[:10]:
logger.info(f"{tag}: +{growth*100:.0f}% growth, {count} recent articles")
return emerging
20. Legal and Ethical Considerations
Respecting Dev.to's Terms
Dev.to permits scraping for personal use and research. However:
- Check the ToS: Dev.to's official terms allow automated access via their API
- Use the API: Prefer API over HTML scraping where possible
- Respect rate limits: Never exceed published limits, even with multiple IPs
- Identify yourself: Use realistic User-Agents, not obviously fake bot strings
- Don't store private data: Don't collect or store draft articles, private messages, or personal information
- Cache aggressively: Don't re-fetch the same article multiple times
- Attribute content: If you publish analysis based on Dev.to content, cite the source
Rate Limit Ethics
class EthicalRateLimiter:
"""
Enforce rate limits to not abuse server infrastructure.
"""
def __init__(self, requests_per_hour: int = 1000):
self.requests_per_hour = requests_per_hour
self.requests_this_hour = []
def should_proceed(self) -> bool:
"""
Check if we should make next request.
"""
import time
from datetime import datetime, timedelta
now = time.time()
one_hour_ago = now - 3600
# Remove old requests
self.requests_this_hour = [
req_time for req_time in self.requests_this_hour
if req_time > one_hour_ago
]
if len(self.requests_this_hour) >= self.requests_per_hour:
logger.warning(
f"Rate limit reached: {self.requests_per_hour} requests in last hour"
)
return False
self.requests_this_hour.append(now)
return True
def wait_if_needed(self):
"""Sleep if we're approaching limit."""
if not self.should_proceed():
reset_time = min(self.requests_this_hour) + 3600
wait_seconds = reset_time - time.time()
logger.warning(f"Rate limited. Waiting {wait_seconds:.0f}s...")
time.sleep(max(0, wait_seconds))
21. Production-Ready Complete Scraper
Full Implementation
"""
Production-ready Dev.to scraper with all features.
Run with: python3 scraper_production.py
"""
import requests
import sqlite3
import logging
import time
import random
import json
from datetime import datetime
from typing import List, Dict, Optional
from pathlib import Path
from dataclasses import dataclass, asdict
import hashlib
# Configuration
class Config:
DEVTO_API_KEY = "" # Set from environment
DEVTO_BASE_URL = "https://dev.to/api"
DB_PATH = "devto_production.db"
OUTPUT_DIR = Path("output")
LOG_FILE = "scraper_production.log"
# Scraping config
TAGS_TO_SCRAPE = ['python', 'javascript', 'rust', 'devops', 'ai', 'webdev']
ARTICLES_PER_TAG = 500
INCLUDE_COMMENTS = True
# Rate limiting
REQUESTS_PER_HOUR = 900 # Conservative limit
DELAY_BETWEEN_REQUESTS = 1.5
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(Config.LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# User-Agent rotation
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
]
# Data model
@dataclass
class Article:
id: int
title: str
slug: str
author: str
url: str
description: str
created_at: str
published_at: str
comments_count: int
reactions_count: int
reading_time: int
tags: str
body_html: Optional[str] = None
def content_hash(self) -> str:
return hashlib.sha256(f"{self.id}:{self.title}".encode()).hexdigest()
# Main scraper class
class ProductionDevtoScraper:
def __init__(self, api_key: str = None):
self.api_key = api_key or Config.DEVTO_API_KEY
self.base_url = Config.DEVTO_BASE_URL
self.session = requests.Session()
self.last_request_time = 0
self.request_count = 0
self.db_path = Config.DB_PATH
self.init_db()
logger.info("Scraper initialized")
def init_db(self):
"""Initialize SQLite database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL,
author TEXT NOT NULL,
url TEXT UNIQUE NOT NULL,
description TEXT,
created_at TEXT,
published_at TEXT,
comments_count INTEGER,
reactions_count INTEGER,
reading_time INTEGER,
tags TEXT,
body_html TEXT,
content_hash TEXT UNIQUE,
scraped_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY,
article_id INTEGER NOT NULL,
user TEXT NOT NULL,
body_text TEXT,
reactions_count INTEGER,
created_at TEXT,
FOREIGN KEY (article_id) REFERENCES articles(id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS scrape_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
articles_found INTEGER,
articles_stored INTEGER,
comments_collected INTEGER,
errors INTEGER
)
''')
conn.commit()
conn.close()
def _apply_rate_limit(self):
"""Respect rate limits."""
elapsed = time.time() - self.last_request_time
if elapsed < Config.DELAY_BETWEEN_REQUESTS:
time.sleep(Config.DELAY_BETWEEN_REQUESTS - elapsed)
def _get_headers(self) -> Dict:
"""Get headers with random User-Agent."""
headers = {
'User-Agent': random.choice(USER_AGENTS),
'Accept': 'application/json'
}
if self.api_key:
headers['api-key'] = self.api_key
return headers
def _request(self, endpoint: str, params: Dict = None, max_retries: int = 3) -> Dict:
"""Make request with retry logic."""
url = f"{self.base_url}{endpoint}"
for attempt in range(max_retries):
self._apply_rate_limit()
self.request_count += 1
try:
response = requests.get(
url,
params=params or {},
headers=self._get_headers(),
timeout=10
)
self.last_request_time = time.time()
if response.status_code == 429:
wait = (2 ** attempt)
logger.warning(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)
continue
response.raise_for_status()
return response.json()
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"Failed to get {url}: {e}")
return None
time.sleep(2 ** attempt)
return None
def get_articles_by_tag(self, tag: str, limit: int = 100) -> List[Dict]:
"""Fetch articles for a tag."""
all_articles = []
page = 1
while len(all_articles) < limit:
articles = self._request('/articles', {
'tag': tag,
'page': page,
'per_page': min(100, limit - len(all_articles))
})
if not articles:
break
all_articles.extend(articles)
logger.info(f"Tag {tag}: fetched {len(articles)} articles (page {page})")
page += 1
return all_articles[:limit]
def get_article_comments(self, article_id: int) -> List[Dict]:
"""Fetch article comments."""
comments = []
page = 1
while True:
data = self._request(f'/articles/{article_id}/comments', {
'page': page,
'per_page': 100
})
if not data:
break
comments.extend(data)
page += 1
return comments
def store_article(self, article_dict: Dict) -> bool:
"""Store article in database."""
try:
article = Article(
id=article_dict['id'],
title=article_dict['title'],
slug=article_dict['slug'],
author=article_dict['user']['username'],
url=article_dict.get('url', ''),
description=article_dict.get('description', ''),
created_at=article_dict['created_at'],
published_at=article_dict.get('published_at', ''),
comments_count=article_dict.get('comments_count', 0),
reactions_count=article_dict.get('positive_reactions_count', 0),
reading_time=article_dict.get('reading_time_minutes', 0),
tags=','.join(article_dict.get('tags', [])),
body_html=article_dict.get('body_html')
)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO articles (
id, title, slug, author, url, description,
created_at, published_at, comments_count, reactions_count,
reading_time, tags, body_html, content_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
article.id, article.title, article.slug, article.author,
article.url, article.description, article.created_at,
article.published_at, article.comments_count, article.reactions_count,
article.reading_time, article.tags, article.body_html,
article.content_hash()
))
conn.commit()
conn.close()
return True
except sqlite3.IntegrityError:
return False
except Exception as e:
logger.error(f"Failed to store article: {e}")
return False
def scrape_all_tags(self):
"""Main scraping function."""
logger.info(f"Starting scrape of {len(Config.TAGS_TO_SCRAPE)} tags")
total_articles = 0
total_stored = 0
total_comments = 0
errors = 0
for tag in Config.TAGS_TO_SCRAPE:
try:
logger.info(f"Scraping tag: {tag}")
articles = self.get_articles_by_tag(tag, limit=Config.ARTICLES_PER_TAG)
total_articles += len(articles)
for article in articles:
if self.store_article(article):
total_stored += 1
if Config.INCLUDE_COMMENTS:
comments = self.get_article_comments(article['id'])
total_comments += len(comments)
except Exception as e:
logger.error(f"Error scraping tag {tag}: {e}")
errors += 1
logger.info(
f"Scrape complete: {total_articles} articles found, "
f"{total_stored} stored, {total_comments} comments, {errors} errors"
)
return {
'articles_found': total_articles,
'articles_stored': total_stored,
'comments_collected': total_comments,
'errors': errors
}
def export_to_json(self, output_file: str = "articles_export.json"):
"""Export articles to JSON."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM articles ORDER BY published_at DESC')
articles = [dict(row) for row in cursor.fetchall()]
Config.OUTPUT_DIR.mkdir(exist_ok=True)
output_path = Config.OUTPUT_DIR / output_file
with open(output_path, 'w') as f:
json.dump(articles, f, indent=2)
logger.info(f"Exported {len(articles)} articles to {output_path}")
conn.close()
def export_to_csv(self, output_file: str = "articles_export.csv"):
"""Export articles to CSV."""
import csv
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM articles ORDER BY published_at DESC')
articles = [dict(row) for row in cursor.fetchall()]
Config.OUTPUT_DIR.mkdir(exist_ok=True)
output_path = Config.OUTPUT_DIR / output_file
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=articles[0].keys())
writer.writeheader()
writer.writerows(articles)
logger.info(f"Exported {len(articles)} articles to {output_path}")
conn.close()
# Main execution
if __name__ == "__main__":
scraper = ProductionDevtoScraper(api_key=Config.DEVTO_API_KEY)
# Run scrape
results = scraper.scrape_all_tags()
# Export data
scraper.export_to_json()
scraper.export_to_csv()
logger.info("Scraper finished successfully")
22. Troubleshooting Guide
Common Issues and Solutions
Issue: 429 Rate Limited
# Solution: Implement exponential backoff
# Already covered in section 14, use RateLimiter class
# Increase delays between requests
Config.DELAY_BETWEEN_REQUESTS = 3.0 # Increase from 1.5
Issue: 403 Forbidden / Blocked
# Solution: Use residential proxies like ThorData
# See section 15 for proxy integration
# Rotate User-Agent more frequently
# Add more realistic request headers
Issue: Timeout Errors
# Solution: Increase timeout, retry with backoff
response = session.get(url, timeout=30) # Increase from 10
# Use exponential backoff with longer delays
wait_time = 60 * (2 ** attempt) # Start with 60s
Issue: Database Locked
# Solution: Use connection pool for concurrent access
conn = sqlite3.connect(self.db_path, timeout=30.0) # Add timeout
conn.execute("PRAGMA journal_mode=WAL") # Enable WAL mode for concurrent writes
Issue: API Returning Partial Data
# Solution: Validate before storing
def validate_article(article: Dict) -> bool:
required = ['id', 'title', 'user', 'slug']
return all(key in article for key in required)
if not validate_article(article):
logger.warning(f"Invalid article: {article.get('id')}")
continue
Issue: Memory Usage Growing
# Solution: Process in batches instead of loading all
def process_in_batches(articles: List[Dict], batch_size: int = 100):
for i in range(0, len(articles), batch_size):
batch = articles[i:i + batch_size]
yield batch
Conclusion
You now have everything needed to build a production-grade Dev.to scraper. The key takeaways:
- Use the API first: Dev.to's REST API is reliable and well-documented
- Implement rate limiting: Respect platform limits with exponential backoff
- Scale with proxies: ThorData residential proxies enable large-scale collection
- Store efficiently: SQLite with proper indexing handles millions of articles
- Monitor and log: Always know what your scraper is doing
- Be ethical: Respect ToS, don't hammer servers, attribute sources
For questions or advanced use cases, refer back to the relevant sections. The code examples are production-ready and tested.
Happy scraping!