← Back to blog

Scraping Discogs for Vinyl Record Prices and Release Data

Discogs is the largest database of music releases in the world -- over 16 million entries cataloged by contributors. For vinyl collectors, resellers, and data analysts, it is the primary source for pricing data, release variations, and market trends.

Discogs offers an official API, which is the right starting point. But the API has gaps: marketplace pricing history, sold listings, and some search filters are only available on the web interface. This post covers both approaches -- the API for structured data and web scraping for the gaps.

Why Discogs Data Matters

The vinyl market has exploded since 2020. New pressings, colored variants, and limited editions command premiums that can swing wildly by week. A first pressing of a classic jazz album might list for $40 today and sell for $120 next month. Data-driven collectors and resellers use Discogs to:

The Discogs API handles most of this. Sold price history requires web scraping. Together they give you complete market visibility.

The Discogs API

Register an application at discogs.com/settings/developers to get a personal access token. The API is well documented and returns JSON.

Rate limit: 60 requests per minute with authentication, 25 without. The API returns X-Discogs-Ratelimit-Remaining headers so you can track your budget.

# discogs_api.py
import httpx
import time

class DiscogsClient:
    BASE_URL = "https://api.discogs.com"

    def __init__(self, token: str):
        self.client = httpx.Client(
            base_url=self.BASE_URL,
            headers={
                "Authorization": f"Discogs token={token}",
                "User-Agent": "VinylPriceTracker/1.0",
            },
            timeout=15,
        )
        self.remaining = 60

    def _request(self, path: str, params: dict | None = None) -> dict:
        if self.remaining < 5:
            time.sleep(2)
        resp = self.client.get(path, params=params)
        self.remaining = int(resp.headers.get("X-Discogs-Ratelimit-Remaining", 60))
        resp.raise_for_status()
        return resp.json()

    def search_releases(self, query: str, format_: str = "Vinyl", page: int = 1) -> dict:
        return self._request("/database/search", {
            "q": query,
            "format": format_,
            "type": "release",
            "page": page,
            "per_page": 50,
        })

    def get_release(self, release_id: int) -> dict:
        return self._request(f"/releases/{release_id}")

    def get_master(self, master_id: int) -> dict:
        return self._request(f"/masters/{master_id}")

    def get_master_versions(self, master_id: int, page: int = 1) -> dict:
        return self._request(f"/masters/{master_id}/versions", {
            "page": page,
            "per_page": 100,
        })

    def get_label(self, label_id: int) -> dict:
        return self._request(f"/labels/{label_id}")

    def get_label_releases(self, label_id: int, page: int = 1) -> dict:
        return self._request(f"/labels/{label_id}/releases", {
            "page": page,
            "per_page": 100,
        })

    def get_artist(self, artist_id: int) -> dict:
        return self._request(f"/artists/{artist_id}")

    def get_artist_releases(self, artist_id: int, page: int = 1) -> dict:
        return self._request(f"/artists/{artist_id}/releases", {
            "page": page,
            "per_page": 100,
            "sort": "year",
            "sort_order": "asc",
        })

    def get_marketplace_stats(self, release_id: int) -> dict:
        return self._request(f"/marketplace/stats/{release_id}")

    def get_marketplace_listings(self, release_id: int, page: int = 1) -> dict:
        return self._request("/marketplace/search", {
            "release_id": release_id,
            "page": page,
            "per_page": 50,
            "sort": "price",
            "sort_order": "asc",
        })

Fetching Marketplace Prices via API

The marketplace stats endpoint gives you the lowest price, number of listings, and whether the item is blocked from sale in certain countries:

token = "YOUR_DISCOGS_TOKEN"
client = DiscogsClient(token)

# Search for a specific pressing
results = client.search_releases("Dark Side of the Moon")
release = results["results"][0]
print(f"{release['title']} (ID: {release['id']})")

# Get marketplace stats
stats = client.get_marketplace_stats(release["id"])
print(f"Lowest price: {stats.get('lowest_price', {}).get('value', 'N/A')}")
print(f"For sale: {stats.get('num_for_sale', 0)} listings")
print(f"Blocked: {stats.get('blocked_from_sale', False)}")

Exploring Master Releases and Pressings

Discogs differentiates between a Master Release (the canonical recording) and individual Releases (specific pressings). A master might have hundreds of pressing variants from different countries and decades. This is critical for collectors who care about original pressings versus reissues:

def analyze_pressings(client: DiscogsClient, master_id: int) -> list[dict]:
    """Get all pressings of a master release with marketplace data."""
    pressings = []
    page = 1

    while True:
        data = client.get_master_versions(master_id, page=page)
        versions = data.get("versions", [])

        if not versions:
            break

        for v in versions:
            # Only get marketplace data for vinyl
            if "Vinyl" in v.get("format", ""):
                try:
                    stats = client.get_marketplace_stats(v["id"])
                    pressings.append({
                        "release_id": v["id"],
                        "title": v.get("title"),
                        "country": v.get("country", "Unknown"),
                        "year": v.get("year"),
                        "label": v.get("label"),
                        "format": v.get("format"),
                        "lowest_price": stats.get("lowest_price", {}).get("value"),
                        "for_sale": stats.get("num_for_sale", 0),
                    })
                    time.sleep(1)
                except Exception as e:
                    print(f"  Error for release {v['id']}: {e}")

        if page >= data.get("pagination", {}).get("pages", 1):
            break
        page += 1

    return sorted(pressings, key=lambda x: x["year"] or 9999)


# Example: "Kind of Blue" by Miles Davis -- master ID 37636
pressings = analyze_pressings(client, 37636)
for p in pressings[:10]:
    price = f"${p['lowest_price']:.2f}" if p['lowest_price'] else "N/A"
    print(f"{p['year']} | {p['country']} | {price} | {p['for_sale']} for sale")

Scraping Sold Listing History

The API does not expose historical sold prices. That data is only visible on the web at /sell/history/RELEASE_ID. To get price trends, you need to scrape the HTML:

# discogs_sold_history.py
import httpx
from selectolax.parser import HTMLParser
import time
import random

def scrape_sold_history(release_id: int, proxy_url: str | None = None) -> list[dict]:
    \"\"\"Scrape sold listing history from Discogs web interface.\"\"\"
    url = f"https://www.discogs.com/sell/history/{release_id}"

    transport = httpx.HTTPTransport(proxy=proxy_url) if proxy_url else None
    client = httpx.Client(transport=transport, timeout=15)

    try:
        resp = client.get(url, headers={
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                          "AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/125.0.0.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "en-US,en;q=0.9",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
        })
        resp.raise_for_status()
    finally:
        client.close()

    tree = HTMLParser(resp.text)
    sales = []

    rows = tree.css("table.table_block tbody tr")
    for row in rows:
        cells = row.css("td")
        if len(cells) < 4:
            continue
        sales.append({
            "date": cells[0].text(strip=True),
            "condition": cells[1].text(strip=True),
            "price": cells[2].text(strip=True),
            "buyer_country": cells[3].text(strip=True),
        })

    return sales


# Usage
PROXY = "http://user:[email protected]:9000"
history = scrape_sold_history(249504, proxy_url=PROXY)
for sale in history[:5]:
    print(f"{sale['date']}: {sale['price']} ({sale['condition']})")

Install with pip install httpx selectolax.

Building an Artist Catalog Scraper

To build a complete discography with pricing data, paginate through an artist's releases and fetch marketplace stats for each:

# artist_catalog.py
import json

def build_artist_catalog(client: DiscogsClient, artist_id: int) -> list[dict]:
    \"\"\"Build complete vinyl catalog for an artist with prices.\"\"\"
    catalog = []
    page = 1

    while True:
        data = client.get_artist_releases(artist_id, page=page)
        releases = data.get("releases", [])

        if not releases:
            break

        for release in releases:
            if release.get("format") and "Vinyl" in release.get("format", ""):
                try:
                    stats = client.get_marketplace_stats(release["id"])
                    catalog.append({
                        "title": release["title"],
                        "year": release.get("year"),
                        "release_id": release["id"],
                        "label": release.get("label", ""),
                        "country": release.get("country", ""),
                        "format": release.get("format", ""),
                        "lowest_price": stats.get("lowest_price", {}).get("value"),
                        "num_for_sale": stats.get("num_for_sale", 0),
                    })
                    time.sleep(1)  # respect rate limits
                except Exception as e:
                    print(f"  Skipping {release['id']}: {e}")

        if page >= data["pagination"]["pages"]:
            break
        page += 1

    return catalog


# Example: get all vinyl releases by Radiohead (artist ID: 3840)
catalog = build_artist_catalog(client, 3840)
with open("radiohead_vinyl.json", "w") as f:
    json.dump(catalog, f, indent=2)
print(f"Found {len(catalog)} vinyl releases")

Label Catalog Scraping

For collectors focused on specific labels (Blue Note, Impulse!, Sub Pop), scraping a label's full catalog is more useful than searching by artist:

def scrape_label_catalog(client: DiscogsClient, label_id: int) -> list[dict]:
    \"\"\"Get all vinyl releases from a specific label.\"\"\"
    catalog = []
    page = 1

    while True:
        data = client.get_label_releases(label_id, page=page)
        releases = data.get("releases", [])

        if not releases:
            break

        for release in releases:
            stats = client.get_marketplace_stats(release["id"])
            catalog.append({
                "release_id": release["id"],
                "title": release.get("title"),
                "artist": release.get("artist"),
                "year": release.get("year"),
                "format": release.get("format", ""),
                "catno": release.get("catno", ""),
                "lowest_price": stats.get("lowest_price", {}).get("value"),
                "num_for_sale": stats.get("num_for_sale", 0),
            })
            time.sleep(1)

        pagination = data.get("pagination", {})
        if page >= pagination.get("pages", 1):
            break
        page += 1

    return catalog


# Blue Note Records (label ID: 2)
blue_note = scrape_label_catalog(client, 2)
print(f"Blue Note catalog: {len(blue_note)} releases")

Anti-Bot Protections and How to Handle Them

Discogs uses Cloudflare for bot protection on their web interface. The API is more permissive but strictly rate-limited. Here is what to watch for:

API requests: Stay under 60/minute. The client above tracks this automatically. If you hit the limit, the API returns 429 and you need to back off for 60 seconds.

Web scraping: Cloudflare will challenge requests that look automated. You need:

def create_scraping_session(proxy_url: str) -> httpx.Client:
    \"\"\"Create a session with browser-like headers for Discogs scraping.\"\"\"
    transport = httpx.HTTPTransport(proxy=proxy_url)
    return httpx.Client(
        transport=transport,
        timeout=15,
        headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/125.0.0.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
            "Sec-Fetch-User": "?1",
        },
        follow_redirects=True,
    )

Pagination Strategy for Large Datasets

When scraping thousands of releases, you need a robust pagination loop that handles API errors without losing progress:

import sqlite3
from datetime import datetime

def scrape_with_checkpoint(client: DiscogsClient, artist_id: int, db_path: str = "discogs.db"):
    \"\"\"Scrape artist catalog with checkpoint/resume support.\"\"\"
    conn = sqlite3.connect(db_path)
    conn.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS releases (
            release_id INTEGER PRIMARY KEY,
            artist_id INTEGER,
            title TEXT,
            year INTEGER,
            label TEXT,
            country TEXT,
            format TEXT,
            lowest_price REAL,
            num_for_sale INTEGER,
            scraped_at TEXT
        )
    \"\"\")
    conn.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS scrape_progress (
            artist_id INTEGER PRIMARY KEY,
            last_page INTEGER,
            total_pages INTEGER,
            updated_at TEXT
        )
    \"\"\")
    conn.commit()

    # Check if we have a checkpoint
    progress = conn.execute(
        "SELECT last_page, total_pages FROM scrape_progress WHERE artist_id = ?",
        (artist_id,)
    ).fetchone()

    start_page = (progress[0] + 1) if progress else 1
    page = start_page

    while True:
        try:
            data = client.get_artist_releases(artist_id, page=page)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                print(f"Rate limited. Sleeping 60s...")
                time.sleep(60)
                continue
            raise

        releases = data.get("releases", [])
        total_pages = data.get("pagination", {}).get("pages", 1)

        for release in releases:
            if "Vinyl" not in release.get("format", ""):
                continue
            try:
                stats = client.get_marketplace_stats(release["id"])
                conn.execute(
                    "INSERT OR REPLACE INTO releases VALUES (?,?,?,?,?,?,?,?,?,?)",
                    (
                        release["id"], artist_id, release["title"],
                        release.get("year"), release.get("label"),
                        release.get("country"), release.get("format"),
                        stats.get("lowest_price", {}).get("value"),
                        stats.get("num_for_sale", 0),
                        datetime.utcnow().isoformat(),
                    )
                )
                time.sleep(1)
            except Exception as e:
                print(f"  Release {release['id']} error: {e}")

        # Save checkpoint
        conn.execute(
            "INSERT OR REPLACE INTO scrape_progress VALUES (?,?,?,?)",
            (artist_id, page, total_pages, datetime.utcnow().isoformat())
        )
        conn.commit()

        print(f"Page {page}/{total_pages} done")

        if page >= total_pages:
            break
        page += 1

    conn.close()

Price Tracking Over Time

To monitor vinyl prices, run the catalog scraper on a schedule and store results in SQLite:

# price_tracker.py
import sqlite3
from datetime import date

def init_db(db_path: str = "vinyl_prices.db"):
    conn = sqlite3.connect(db_path)
    conn.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS prices (
            release_id INTEGER,
            date TEXT,
            lowest_price REAL,
            num_for_sale INTEGER,
            PRIMARY KEY (release_id, date)
        )
    \"\"\")
    conn.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS releases (
            release_id INTEGER PRIMARY KEY,
            title TEXT,
            artist TEXT,
            year INTEGER,
            label TEXT,
            country TEXT
        )
    \"\"\")
    conn.commit()
    return conn

def record_prices(conn: sqlite3.Connection, catalog: list[dict]):
    today = date.today().isoformat()
    for item in catalog:
        if item["lowest_price"] is not None:
            conn.execute(
                "INSERT OR REPLACE INTO prices VALUES (?, ?, ?, ?)",
                (item["release_id"], today, item["lowest_price"], item["num_for_sale"]),
            )
    conn.commit()

def get_price_trend(conn: sqlite3.Connection, release_id: int) -> list[dict]:
    rows = conn.execute(
        "SELECT date, lowest_price, num_for_sale FROM prices WHERE release_id = ? ORDER BY date",
        (release_id,),
    ).fetchall()
    return [{"date": r[0], "price": r[1], "listings": r[2]} for r in rows]

def find_price_drops(conn: sqlite3.Connection, drop_pct: float = 20.0) -> list[dict]:
    \"\"\"Find releases where price dropped significantly since last snapshot.\"\"\"
    rows = conn.execute(\"\"\"
        SELECT p1.release_id, r.title, p1.date as old_date, p1.lowest_price as old_price,
               p2.date as new_date, p2.lowest_price as new_price,
               round((p1.lowest_price - p2.lowest_price) / p1.lowest_price * 100, 1) as drop_pct
        FROM prices p1
        JOIN prices p2 ON p1.release_id = p2.release_id
        JOIN releases r ON p1.release_id = r.release_id
        WHERE p1.date < p2.date
          AND p1.lowest_price > 0
          AND (p1.lowest_price - p2.lowest_price) / p1.lowest_price * 100 >= ?
        ORDER BY drop_pct DESC
    \"\"\", (drop_pct,)).fetchall()

    return [
        {"release_id": r[0], "title": r[1], "old_date": r[2], "old_price": r[3],
         "new_date": r[4], "new_price": r[5], "drop_pct": r[6]}
        for r in rows
    ]

With a week or two of price data, you can calculate appreciation rates and identify hot genres:

import json
from collections import defaultdict

def calculate_appreciation(conn: sqlite3.Connection) -> list[dict]:
    \"\"\"Calculate price appreciation rate per release.\"\"\"
    rows = conn.execute(\"\"\"
        SELECT p.release_id, r.title, r.artist, r.year,
               MIN(p.lowest_price) as min_price,
               MAX(p.lowest_price) as max_price,
               COUNT(*) as data_points
        FROM prices p
        JOIN releases r ON p.release_id = r.release_id
        GROUP BY p.release_id
        HAVING data_points >= 3
        ORDER BY (max_price - min_price) / min_price DESC
        LIMIT 50
    \"\"\").fetchall()

    return [
        {
            "release_id": r[0], "title": r[1], "artist": r[2], "year": r[3],
            "min_price": r[4], "max_price": r[5],
            "appreciation_pct": round((r[5] - r[4]) / r[4] * 100, 1)
        }
        for r in rows
    ]

Scraping Discogs Want Lists for Demand Signals

The number of users who have added a release to their want list is a leading indicator of demand. Releases with growing want lists often see price increases:

def get_want_list_count(client: DiscogsClient, release_id: int) -> int:
    \"\"\"Get the number of users who want this release.\"\"\"
    data = client.get_release(release_id)
    stats = data.get("community", {})
    return stats.get("want", 0)

def get_have_want_ratio(client: DiscogsClient, release_id: int) -> dict:
    \"\"\"High want/have ratio = scarce and desirable.\"\"\"
    data = client.get_release(release_id)
    community = data.get("community", {})
    have = community.get("have", 1)
    want = community.get("want", 0)

    return {
        "have": have,
        "want": want,
        "ratio": round(want / have, 2) if have > 0 else 0,
        "title": data.get("title"),
    }

Complete Pipeline: From Search to SQLite

Putting it all together -- a pipeline that searches for releases matching criteria, fetches their data, tracks prices over time, and identifies opportunities:

def run_pipeline(
    token: str,
    search_queries: list[str],
    db_path: str = "vinyl_market.db",
    proxy_url: str | None = None,
):
    client = DiscogsClient(token)
    conn = init_db(db_path)

    all_release_ids = set()

    for query in search_queries:
        print(f"Searching: {query}")
        page = 1
        while True:
            results = client.search_releases(query, page=page)
            for r in results.get("results", []):
                all_release_ids.add(r["id"])
            if page >= results.get("pagination", {}).get("pages", 1):
                break
            page += 1
            time.sleep(1)

    print(f"Tracking {len(all_release_ids)} releases")

    catalog = []
    for release_id in all_release_ids:
        try:
            stats = client.get_marketplace_stats(release_id)
            release_data = client.get_release(release_id)
            catalog.append({
                "release_id": release_id,
                "title": release_data.get("title"),
                "lowest_price": stats.get("lowest_price", {}).get("value"),
                "num_for_sale": stats.get("num_for_sale", 0),
            })
            time.sleep(1)
        except Exception as e:
            print(f"  Error for {release_id}: {e}")

    record_prices(conn, catalog)

    drops = find_price_drops(conn, drop_pct=15.0)
    print(f"\\nPrice drops (>=15%):")
    for d in drops[:10]:
        print(f"  {d['title']}: ${d['old_price']} -> ${d['new_price']} ({d['drop_pct']}% drop)")

    conn.close()


# Run daily with cron
run_pipeline(
    token="YOUR_TOKEN",
    search_queries=["first pressing jazz", "audiophile 180g", "original press soul"],
    proxy_url="http://user:[email protected]:9000",
)

Handling Rate Limits Gracefully

At 60 requests per minute, you need to be careful when combining API and web scraping:

import time
from functools import wraps

def rate_limited(max_per_minute: int):
    \"\"\"Decorator to rate-limit a function.\"\"\"
    min_interval = 60.0 / max_per_minute
    last_called = [0.0]

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            wait = min_interval - elapsed
            if wait > 0:
                time.sleep(wait)
            result = func(*args, **kwargs)
            last_called[0] = time.time()
            return result
        return wrapper
    return decorator


@rate_limited(50)  # Stay under the 60/min limit
def safe_get_stats(client, release_id):
    return client.get_marketplace_stats(release_id)

Use Cases and What to Build With This Data

Discogs data supports several practical applications:

Price alert system: Monitor specific releases and send notifications when the lowest price drops below a threshold. Collectors use this to buy at market lows.

Pressing comparison tool: For any master release, show all pressing variants sorted by price. A Japanese pressing might be $200 while a UK pressing of the same master is $45.

Dealer inventory valuation: Record stores and estate sale buyers can quickly estimate collection value by cross-referencing with Discogs marketplace data.

Genre trend analysis: Track average prices by genre and decade over time. Identifying which sub-genres are trending before they get mainstream attention is where serious resellers make money.

Want list demand prediction: Releases with high want/have ratios and low current supply tend to appreciate. Building a scoring model on this data can surface buying opportunities.

Run this daily with a cron job and you will have a solid dataset for spotting underpriced records or tracking market trends across pressings. The API's 60 requests/minute is enough to track several hundred releases per run without any issues.