← Back to blog

Scraping Docker Hub Image Metadata in 2026: Tags, Pull Counts, and Layer Info

Scraping Docker Hub Image Metadata in 2026: Tags, Pull Counts, and Layer Info

Docker Hub hosts over 15 million container images. If you're building a security scanner, tracking image popularity, auditing base image usage across your organization, building a dependency graph for container images, or just want to monitor your own published images — you need structured data from Docker Hub.

The good news: Docker Hub has a public API. The bad news: it's rate-limited hard and the documentation is scattered across three different API versions, with breaking changes and deprecations along the way.

Here's what actually works in 2026 — covering both the Hub metadata API and the Registry v2 protocol for layer-level data.

Docker Hub's Two APIs

Docker Hub exposes two distinct APIs, and confusing them is the number one mistake people make:

1. Docker Hub API (hub.docker.com) Returns human-friendly metadata: descriptions, star counts, pull counts, last-updated timestamps, and tag lists. This is what the website uses. Authentication is optional for public images.

2. Docker Registry API v2 (registry-1.docker.io) Returns the actual image manifests, layer digests, and compressed sizes. This is what docker pull uses under the hood. Requires a bearer token even for public images.

You'll need both for a complete picture. Start with the Hub API for metadata, add Registry v2 when you need layer-level detail.

Rate Limits in 2026

Docker Hub enforces different rate limits across the two APIs:

Hub API (hub.docker.com): - Undocumented but roughly 100-150 requests/minute before HTTP 429 - Per-IP enforcement

Registry API (registry-1.docker.io): - Anonymous pulls: 100 requests per 6 hours per IP - Authenticated (free account): 200 requests per 6 hours - Authenticated (Pro): 5,000 requests per day - These limits apply to both pulls (actual image downloads) and manifest requests

For bulk collection — scraping metadata for thousands of images — you'll hit Registry API limits fast. The Hub API is more permissive for metadata.

Setting Up

import httpx
import time
import json
import sqlite3
import logging
from datetime import datetime
from typing import Optional
from pathlib import Path

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)

HUB_BASE = "https://hub.docker.com/v2"
REGISTRY_BASE = "https://registry-1.docker.io/v2"
AUTH_BASE = "https://auth.docker.io"

# Optional: Docker Hub credentials for higher rate limits
DH_USERNAME = None
DH_PASSWORD = None

# Optional: ThorData proxy for rate limit management
PROXY_URL = "http://USER:[email protected]:9000"


def make_client(use_proxy: bool = False) -> httpx.Client:
    """Create HTTP client with optional proxy."""
    kwargs = {
        "timeout": httpx.Timeout(30.0, connect=10.0),
        "follow_redirects": True,
    }
    if use_proxy:
        kwargs["proxy"] = PROXY_URL
    return httpx.Client(**kwargs)


def safe_get(
    client: httpx.Client,
    url: str,
    params: dict = None,
    headers: dict = None,
    max_retries: int = 4,
) -> Optional[dict]:
    """
    GET request with exponential backoff for rate limits.
    Returns parsed JSON or None.
    """
    for attempt in range(max_retries):
        try:
            resp = client.get(url, params=params, headers=headers)

            if resp.status_code == 200:
                return resp.json()

            elif resp.status_code == 429:
                # Rate limited
                retry_after = int(resp.headers.get("retry-after", 60))
                wait = max(retry_after, 2 ** attempt * 5)
                logger.warning(f"Rate limited (429), waiting {wait}s")
                time.sleep(wait)
                continue

            elif resp.status_code == 401:
                logger.error(f"Unauthorized: {url}")
                return None

            elif resp.status_code == 404:
                logger.debug(f"Not found: {url}")
                return None

            elif resp.status_code == 403:
                logger.error(f"Forbidden: {url} — check credentials")
                return None

            else:
                logger.warning(f"HTTP {resp.status_code}: {url}")
                time.sleep(2 ** attempt)

        except httpx.TimeoutException:
            wait = 2 ** attempt + 2
            logger.warning(f"Timeout, retrying in {wait}s")
            time.sleep(wait)

        except httpx.NetworkError as e:
            logger.error(f"Network error: {e}")
            time.sleep(5)

    logger.error(f"Failed after {max_retries} attempts: {url}")
    return None

Extracting Repository Metadata from the Hub API

def get_repo_info(
    client: httpx.Client,
    namespace: str,
    repo: str,
) -> Optional[dict]:
    """
    Get repository metadata.
    For official images (nginx, python, node), namespace is 'library'.
    For user images, namespace is the Docker Hub username.
    """
    url = f"{HUB_BASE}/repositories/{namespace}/{repo}/"
    data = safe_get(client, url)
    if not data:
        return None

    return {
        "full_name": f"{namespace}/{repo}",
        "description": data.get("description", ""),
        "full_description": data.get("full_description", ""),
        "star_count": data.get("star_count", 0),
        "pull_count": data.get("pull_count", 0),
        "last_updated": data.get("last_updated"),
        "is_official": data.get("is_official", False),
        "is_automated": data.get("is_automated", False),
        "hub_user": data.get("user"),
        "affiliation": data.get("affiliation"),
        "status": data.get("status"),
    }


def get_repo_tags(
    client: httpx.Client,
    namespace: str,
    repo: str,
    max_pages: int = 20,
    page_size: int = 100,
) -> list:
    """
    Paginate through all tags for a repository.
    Returns list of tag dicts with size and architecture info.
    """
    tags = []
    page = 1

    while page <= max_pages:
        url = f"{HUB_BASE}/repositories/{namespace}/{repo}/tags/"
        params = {"page_size": page_size, "page": page}

        data = safe_get(client, url, params=params)
        if not data:
            break

        results = data.get("results", [])
        if not results:
            break

        for tag in results:
            # Parse image architectures
            images = []
            for img in tag.get("images", []):
                images.append({
                    "architecture": img.get("architecture"),
                    "os": img.get("os"),
                    "size": img.get("size", 0),
                    "digest": img.get("digest", ""),
                    "status": img.get("status"),
                    "last_pushed": img.get("last_pushed"),
                })

            tags.append({
                "name": tag.get("name"),
                "full_size": tag.get("full_size", 0),
                "last_updated": tag.get("last_updated"),
                "last_pushed": tag.get("tag_last_pushed"),
                "digest": tag.get("digest", ""),
                "images": images,
                "image_count": len(images),
            })

        # Check for next page
        if not data.get("next"):
            break

        page += 1
        time.sleep(0.5)

    return tags


def search_repositories(
    client: httpx.Client,
    query: str,
    max_pages: int = 5,
    page_size: int = 25,
) -> list:
    """
    Search Docker Hub for repositories matching a query.
    """
    repos = []
    page = 1

    while page <= max_pages:
        url = f"{HUB_BASE}/search/repositories/"
        params = {
            "query": query,
            "page_size": page_size,
            "page": page,
        }

        data = safe_get(client, url, params=params)
        if not data:
            break

        results = data.get("results", [])
        if not results:
            break

        for r in results:
            repos.append({
                "name": r.get("name"),
                "namespace": r.get("namespace"),
                "full_name": r.get("repo_name"),
                "description": r.get("short_description", ""),
                "star_count": r.get("star_count", 0),
                "pull_count": r.get("pull_count", 0),
                "is_official": r.get("is_official", False),
                "is_automated": r.get("is_automated", False),
            })

        if not data.get("next"):
            break

        page += 1
        time.sleep(0.5)

    return repos


def list_org_repos(
    client: httpx.Client,
    org: str,
    max_pages: int = 10,
) -> list:
    """List all public repositories for an organization."""
    repos = []
    page = 1

    while page <= max_pages:
        url = f"{HUB_BASE}/repositories/{org}/"
        params = {"page_size": 100, "page": page}

        data = safe_get(client, url, params=params)
        if not data:
            break

        results = data.get("results", [])
        if not results:
            break

        for r in results:
            repos.append({
                "name": r.get("name"),
                "namespace": r.get("namespace"),
                "description": r.get("description", ""),
                "pull_count": r.get("pull_count", 0),
                "star_count": r.get("star_count", 0),
                "last_updated": r.get("last_updated"),
            })

        if not data.get("next"):
            break

        page += 1
        time.sleep(0.5)

    return repos

Getting Layer Data from Registry API v2

The Hub API gives you aggregate sizes, but to see individual layers — what changed between tags, what's shared across images, layer-level security scanning — you need the Registry API. This requires an authentication token, even for public images:

def get_registry_token(
    client: httpx.Client,
    repo: str,
    username: str = None,
    password: str = None,
) -> Optional[str]:
    """
    Get a bearer token for the Docker registry.
    repo format: 'namespace/imagename' (e.g., 'library/nginx')
    """
    params = {
        "service": "registry.docker.io",
        "scope": f"repository:{repo}:pull",
    }

    # Authenticated token gives higher rate limits
    auth = None
    if username and password:
        auth = (username, password)

    url = f"{AUTH_BASE}/token"
    try:
        if auth:
            resp = client.get(url, params=params, auth=auth)
        else:
            resp = client.get(url, params=params)

        if resp.status_code == 200:
            return resp.json().get("token")
        else:
            logger.error(f"Token request failed: HTTP {resp.status_code}")
            return None

    except httpx.RequestError as e:
        logger.error(f"Token request error: {e}")
        return None


def get_manifest(
    client: httpx.Client,
    repo: str,
    tag_or_digest: str,
    token: str = None,
) -> Optional[dict]:
    """
    Fetch image manifest with layer details.

    For multi-arch images, this returns a manifest list.
    For single-arch, returns the manifest with layers.
    """
    if token is None:
        token = get_registry_token(client, repo)
    if not token:
        return None

    url = f"{REGISTRY_BASE}/{repo}/manifests/{tag_or_digest}"
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": (
            "application/vnd.oci.image.index.v1+json, "
            "application/vnd.docker.distribution.manifest.list.v2+json, "
            "application/vnd.docker.distribution.manifest.v2+json, "
            "application/vnd.oci.image.manifest.v1+json"
        ),
    }

    return safe_get(client, url, headers=headers)


def get_layer_info(
    client: httpx.Client,
    repo: str,
    tag: str = "latest",
    target_arch: str = "amd64",
    target_os: str = "linux",
) -> Optional[dict]:
    """
    Get layer information for a specific platform variant.
    Returns list of layers with digest and size.
    """
    token = get_registry_token(client, repo)
    if not token:
        return None

    manifest = get_manifest(client, repo, tag, token=token)
    if not manifest:
        return None

    schema_version = manifest.get("schemaVersion", 1)
    media_type = manifest.get("mediaType", "")

    # Multi-arch manifest list
    if manifest.get("manifests"):
        target_manifest = None
        for m in manifest["manifests"]:
            platform = m.get("platform", {})
            if (
                platform.get("architecture") == target_arch
                and platform.get("os") == target_os
            ):
                target_manifest = m
                break

        if not target_manifest:
            logger.warning(f"No {target_os}/{target_arch} variant found for {repo}:{tag}")
            # Fall back to first available
            target_manifest = manifest["manifests"][0]

        # Fetch the platform-specific manifest
        digest = target_manifest.get("digest")
        platform_manifest = get_manifest(client, repo, digest, token=token)
        if not platform_manifest:
            return None

        layers = platform_manifest.get("layers", [])
        platform = target_manifest.get("platform", {})

    # Single-arch manifest
    elif manifest.get("layers"):
        layers = manifest["layers"]
        platform = {"architecture": "unknown", "os": "unknown"}

    else:
        logger.warning(f"Unexpected manifest structure for {repo}:{tag}")
        return None

    parsed_layers = []
    total_size = 0
    for i, layer in enumerate(layers):
        size = layer.get("size", 0)
        total_size += size
        parsed_layers.append({
            "index": i,
            "digest": layer.get("digest", ""),
            "media_type": layer.get("mediaType", ""),
            "size_bytes": size,
            "size_mb": round(size / (1024 * 1024), 2),
        })

    return {
        "repo": repo,
        "tag": tag,
        "platform": platform,
        "layer_count": len(parsed_layers),
        "total_size_bytes": total_size,
        "total_size_mb": round(total_size / (1024 * 1024), 2),
        "layers": parsed_layers,
    }


def compare_tag_layers(
    client: httpx.Client,
    repo: str,
    tag_a: str,
    tag_b: str,
) -> dict:
    """
    Compare layers between two tags of the same image.
    Identifies shared layers (same digest) vs changed layers.
    """
    info_a = get_layer_info(client, repo, tag_a)
    info_b = get_layer_info(client, repo, tag_b)

    if not info_a or not info_b:
        return {}

    digests_a = {layer["digest"] for layer in info_a["layers"]}
    digests_b = {layer["digest"] for layer in info_b["layers"]}

    shared = digests_a & digests_b
    only_in_a = digests_a - digests_b
    only_in_b = digests_b - digests_a

    return {
        "tag_a": tag_a,
        "tag_b": tag_b,
        "layers_a": info_a["layer_count"],
        "layers_b": info_b["layer_count"],
        "shared_layers": len(shared),
        "layers_only_in_a": len(only_in_a),
        "layers_only_in_b": len(only_in_b),
        "size_a_mb": info_a["total_size_mb"],
        "size_b_mb": info_b["total_size_mb"],
        "size_diff_mb": round(info_b["total_size_mb"] - info_a["total_size_mb"], 2),
    }

Data Storage with SQLite

def init_database(db_path: str = "dockerhub.db") -> sqlite3.Connection:
    """Initialize SQLite schema for Docker Hub data."""
    conn = sqlite3.connect(db_path)

    conn.executescript("""
        CREATE TABLE IF NOT EXISTS repositories (
            full_name TEXT PRIMARY KEY,
            namespace TEXT,
            name TEXT,
            description TEXT,
            star_count INTEGER DEFAULT 0,
            pull_count INTEGER DEFAULT 0,
            last_updated TEXT,
            is_official BOOLEAN DEFAULT 0,
            is_automated BOOLEAN DEFAULT 0,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_pulls ON repositories(pull_count);
        CREATE INDEX IF NOT EXISTS idx_namespace ON repositories(namespace);

        CREATE TABLE IF NOT EXISTS tags (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            repo_name TEXT,
            tag_name TEXT,
            full_size INTEGER DEFAULT 0,
            last_updated TEXT,
            last_pushed TEXT,
            digest TEXT,
            image_count INTEGER DEFAULT 0,
            architectures TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(repo_name, tag_name)
        );

        CREATE INDEX IF NOT EXISTS idx_tag_repo ON tags(repo_name);

        CREATE TABLE IF NOT EXISTS layers (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            repo_name TEXT,
            tag_name TEXT,
            platform TEXT,
            layer_index INTEGER,
            digest TEXT,
            media_type TEXT,
            size_bytes INTEGER DEFAULT 0,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_layer_digest ON layers(digest);
        CREATE INDEX IF NOT EXISTS idx_layer_repo ON layers(repo_name, tag_name);

        CREATE TABLE IF NOT EXISTS pull_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            repo_name TEXT,
            pull_count INTEGER,
            star_count INTEGER,
            snapshot_date TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE UNIQUE INDEX IF NOT EXISTS idx_pull_hist
        ON pull_history(repo_name, snapshot_date);
    """)

    conn.commit()
    return conn


def save_repository(conn: sqlite3.Connection, repo: dict) -> None:
    """Save repository metadata."""
    try:
        conn.execute("""
            INSERT OR REPLACE INTO repositories
            (full_name, namespace, name, description, star_count,
             pull_count, last_updated, is_official, is_automated)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            repo.get("full_name"), repo.get("full_name", "").split("/")[0],
            repo.get("full_name", "").split("/")[-1],
            repo.get("description", ""),
            repo.get("star_count", 0), repo.get("pull_count", 0),
            repo.get("last_updated"), repo.get("is_official", False),
            repo.get("is_automated", False),
        ))

        # Record pull count snapshot
        today = datetime.now().strftime("%Y-%m-%d")
        conn.execute("""
            INSERT OR REPLACE INTO pull_history (repo_name, pull_count, star_count, snapshot_date)
            VALUES (?, ?, ?, ?)
        """, (repo.get("full_name"), repo.get("pull_count", 0), repo.get("star_count", 0), today))

        conn.commit()
    except sqlite3.Error as e:
        logger.error(f"DB error saving repo: {e}")


def save_tags(conn: sqlite3.Connection, repo_name: str, tags: list) -> int:
    """Save tag records for a repository."""
    saved = 0
    for tag in tags:
        archs = "|".join(
            img.get("architecture", "unknown")
            for img in tag.get("images", [])
        )
        try:
            conn.execute("""
                INSERT OR REPLACE INTO tags
                (repo_name, tag_name, full_size, last_updated, last_pushed,
                 digest, image_count, architectures)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                repo_name, tag.get("name"), tag.get("full_size", 0),
                tag.get("last_updated"), tag.get("last_pushed"),
                tag.get("digest", ""), tag.get("image_count", 0), archs,
            ))
            saved += 1
        except sqlite3.Error as e:
            logger.error(f"DB error saving tag: {e}")

    conn.commit()
    return saved


def save_layers(conn: sqlite3.Connection, layer_info: dict) -> int:
    """Save layer data for a tag."""
    if not layer_info:
        return 0

    saved = 0
    platform_str = json.dumps(layer_info.get("platform", {}))

    for layer in layer_info.get("layers", []):
        try:
            conn.execute("""
                INSERT INTO layers
                (repo_name, tag_name, platform, layer_index, digest, media_type, size_bytes)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                layer_info.get("repo"), layer_info.get("tag"),
                platform_str, layer.get("index"),
                layer.get("digest", ""), layer.get("media_type", ""),
                layer.get("size_bytes", 0),
            ))
            saved += 1
        except sqlite3.Error as e:
            logger.error(f"DB error saving layer: {e}")

    conn.commit()
    return saved

Rate Limiting and Proxy Strategy

Docker Hub's rate limits are IP-based. For scraping metadata for thousands of images — say, auditing all public images from a specific publisher — you'll burn through the 100 anonymous pulls per 6-hour window fast.

The practical solutions, in order of preference:

1. Authenticate with a free account — doubles your Registry API quota to 200 pulls/6hrs.

2. Use the Hub API for metadata — it has separate, more permissive limits than the Registry API. Most monitoring tasks can use Hub API alone.

3. Rotate IPs with ThorData — Each IP gets its own rate limit quota. Rotating residential IPs effectively multiplies your throughput for bulk collection.

def make_proxied_client(session_id: str = None) -> httpx.Client:
    """
    Create client with residential proxy.
    session_id: Use same session for sticky IP behavior.
    """
    user = "your_thordata_user"
    password = "your_thordata_pass"

    if session_id:
        user += f"-session-{session_id}"

    proxy_url = f"http://{user}:{password}@proxy.thordata.com:9000"

    return httpx.Client(
        proxy=proxy_url,
        timeout=httpx.Timeout(30.0, connect=10.0),
        follow_redirects=True,
    )


def scrape_with_ip_rotation(
    repos: list,
    delay_between: float = 1.0,
) -> list:
    """
    Scrape multiple repos with IP rotation to stay under rate limits.
    Each batch of 80 requests uses a fresh IP.
    """
    results = []
    batch_size = 80  # Stay well under 100/6hr limit per IP

    for i, (namespace, repo) in enumerate(repos):
        # Rotate IP every batch_size requests
        if i % batch_size == 0:
            session_id = f"batch{i // batch_size}"
            client = make_proxied_client(session_id=session_id)
            logger.info(f"Rotating to new IP (batch {i // batch_size})")

        info = get_repo_info(client, namespace, repo)
        if info:
            results.append(info)

        time.sleep(delay_between)

    return results

Async Scraping for Speed

For scraping many repositories concurrently:

import asyncio
import httpx

async def scrape_repos_async(
    repos: list,
    concurrency: int = 5,
    delay: float = 0.5,
) -> list:
    """
    Scrape multiple repos concurrently.
    repos: list of (namespace, name) tuples
    """
    results = []
    semaphore = asyncio.Semaphore(concurrency)

    async def fetch_one(ns: str, name: str, client: httpx.AsyncClient) -> Optional[dict]:
        async with semaphore:
            url = f"{HUB_BASE}/repositories/{ns}/{name}/"
            try:
                resp = await client.get(url)
                if resp.status_code == 200:
                    data = resp.json()
                    return {
                        "full_name": f"{ns}/{name}",
                        "pulls": data.get("pull_count", 0),
                        "stars": data.get("star_count", 0),
                        "updated": data.get("last_updated"),
                        "description": data.get("description", ""),
                    }
                elif resp.status_code == 429:
                    logger.warning(f"Rate limited on {ns}/{name}")
                    await asyncio.sleep(30)
                return None
            except Exception as e:
                logger.error(f"Error fetching {ns}/{name}: {e}")
                return None

    async with httpx.AsyncClient(
        timeout=httpx.Timeout(30.0, connect=10.0),
        follow_redirects=True,
    ) as client:
        tasks = [fetch_one(ns, name, client) for ns, name in repos]
        raw_results = await asyncio.gather(*tasks)
        results = [r for r in raw_results if r is not None]

    return sorted(results, key=lambda r: r.get("pulls", 0), reverse=True)


# Popular official images to track
OFFICIAL_IMAGES = [
    ("library", "nginx"), ("library", "python"), ("library", "node"),
    ("library", "postgres"), ("library", "redis"), ("library", "alpine"),
    ("library", "ubuntu"), ("library", "golang"), ("library", "mysql"),
    ("library", "mongo"), ("library", "elasticsearch"), ("library", "kafka"),
    ("library", "rabbitmq"), ("library", "traefik"), ("library", "vault"),
]

data = asyncio.run(scrape_repos_async(OFFICIAL_IMAGES))
for r in data:
    print(f"{r['full_name']:25s} {r['pulls']:>15,} pulls  {r['stars']:>5} stars")

Complete Monitoring Pipeline

def run_docker_hub_monitor(
    repos: list = None,
    org: str = None,
    search_query: str = None,
    db_path: str = "dockerhub.db",
    collect_layers: bool = False,
) -> None:
    """
    Complete Docker Hub monitoring pipeline.

    repos: explicit list of (namespace, name) tuples
    org: scrape all repos for this organization
    search_query: search for matching repositories
    collect_layers: also fetch Registry API layer data (slower, uses more quota)
    """
    conn = init_database(db_path)
    client = make_client()

    all_repos = []

    # Build the list of repos to monitor
    if repos:
        all_repos.extend(repos)

    if org:
        logger.info(f"Fetching org repos for: {org}")
        org_repos = list_org_repos(client, org)
        for r in org_repos:
            ns = r.get("namespace") or org
            name = r.get("name", "")
            if name:
                all_repos.append((ns, name))
        logger.info(f"Found {len(org_repos)} repos in {org}")

    if search_query:
        logger.info(f"Searching for: {search_query}")
        search_results = search_repositories(client, search_query, max_pages=5)
        for r in search_results:
            full = r.get("full_name", "")
            if "/" in full:
                ns, name = full.rsplit("/", 1)
                all_repos.append((ns, name))

    logger.info(f"Total repos to process: {len(all_repos)}")

    # Deduplicate
    all_repos = list(dict.fromkeys(all_repos))

    for i, (namespace, name) in enumerate(all_repos):
        logger.info(f"Processing {namespace}/{name} ({i+1}/{len(all_repos)})")

        # Repository metadata
        info = get_repo_info(client, namespace, name)
        if info:
            save_repository(conn, info)
            logger.info(f"  {info['pull_count']:,} pulls, {info['star_count']} stars")

        # Tag list
        tags = get_repo_tags(client, namespace, name, max_pages=5)
        if tags:
            saved = save_tags(conn, f"{namespace}/{name}", tags)
            logger.info(f"  Saved {saved} tags")

            # Layer data for latest tag (if enabled)
            if collect_layers and tags:
                latest_tag = tags[0].get("name", "latest")
                layer_info = get_layer_info(
                    client, f"{namespace}/{name}", latest_tag
                )
                if layer_info:
                    save_layers(conn, layer_info)
                    logger.info(
                        f"  Latest ({latest_tag}): "
                        f"{layer_info['layer_count']} layers, "
                        f"{layer_info['total_size_mb']:.1f} MB"
                    )

        time.sleep(0.8)

    conn.close()
    client.close()
    logger.info("Docker Hub monitoring complete.")


# Example: Monitor official images
run_docker_hub_monitor(
    repos=OFFICIAL_IMAGES,
    db_path="official_images.db",
    collect_layers=False,  # Set True for security auditing
)

# Example: Monitor all images from an organization
run_docker_hub_monitor(
    org="bitnami",
    db_path="bitnami_images.db",
)

# Example: Search and monitor images for a technology
run_docker_hub_monitor(
    search_query="prometheus",
    db_path="prometheus_images.db",
)

What You Can Build

Security audit tools — Find base images across your infrastructure, identify outdated base layers, detect images that haven't been updated in months. Layer digest comparison shows exactly what changed between versions.

Dependency graphs — Map which organizations share base layers. Images built FROM the same digest are linked. This is how supply chain attacks propagate.

Pull count trending — Track pull count growth over time. Rapidly growing images (especially in the same namespace as popular official images) can be a malware signal.

Release tracking — Monitor specific images for new tags. Useful for CI/CD integrations, changelogs, or security newsletters.

Image size analysis — Compare image sizes across versions and organizations. Bloated images are often security risks and performance issues.

Ecosystem mapping — Which images are most depended upon in a particular domain (ML frameworks, databases, web servers)? Pull counts at scale tell you.

What You Cannot Get

A few things Docker Hub does not expose through any API:

Summary

Docker Hub's APIs are stable and well-behaved — this is one of the easier scraping targets in 2026. The Hub API for metadata, Registry v2 for layer-level detail. The main challenge is scale: if you need data for thousands of images, plan your rate limiting strategy upfront.

For monitoring dashboards or security audits, the combination gives you everything you need: - Hub API for pull counts, descriptions, tag lists, last-updated timestamps - Registry v2 for layer digests, sizes, and manifest details

Use ThorData rotating residential proxies when your IP rate limit budget runs out during bulk collection. Keep requests at 1-2/second even with proxies — Docker Hub is not trying to stop metadata scraping, they just protect against bandwidth abuse.