← Back to blog

How to Scrape Houzz Interior Design Data in 2026 (Playwright Guide)

How to Scrape Houzz Interior Design Data in 2026 (Playwright Guide)

Houzz is one of the richest publicly accessible datasets for interior design: millions of annotated project photos, product listings with prices, contractor profiles, and curated ideabooks. If you're building a design recommendation engine, a competitive pricing tool, a lead-generation system for home-improvement contractors, or a visual AI dataset, Houzz is the source to crack.

It is also one of the more technically hostile sites to scrape. This guide explains why, how to set up a working scraper in 2026, and how to build a complete SQLite-backed data pipeline.

Why Houzz Requires Browser Automation

Houzz is a heavily client-rendered React SPA. Nearly all meaningful content — photo grids, product cards, professional listings — is injected into the DOM after JavaScript execution. Send a plain HTTP request to https://www.houzz.com/photos/living-room-ideas and you receive an empty shell with a few thousand bytes of bootstrap HTML. The actual data arrives through a series of authenticated GraphQL calls that are only triggered once the JS bundle has initialized.

Beyond the rendering challenge, Houzz deploys Imperva (formerly Incapsula) for bot detection at the network edge. Imperva collects:

A standard Python requests session fails Imperva's challenge page before it can even negotiate a session cookie. Playwright running a real Chromium instance passes TLS and HTTP/2 checks automatically because the fingerprint matches a real browser.

Additional hurdles: - Canvas and WebGL fingerprinting — scripts probe the GPU renderer string and canvas pixel output - Lazy-loaded content — photos only enter the DOM as the user scrolls - Rate limiting per IP — after ~200-300 requests from a datacenter IP, Houzz returns 429s or redirect loops to the Imperva challenge page - Session-bound requests — GraphQL queries include session tokens that expire

Setting Up the Environment

pip install playwright playwright-stealth httpx sqlite3
playwright install chromium

The playwright-stealth patch overrides several headless detection vectors: it masks navigator.webdriver, randomizes canvas noise, and spoofs common browser properties that differ between headed and headless Chromium.

import asyncio
import json
import re
import sqlite3
import random
import time
from datetime import datetime
from pathlib import Path
from playwright.async_api import async_playwright, Page, BrowserContext
from playwright_stealth import stealth_async

Proxy Configuration for Playwright

Datacenter proxies fail Imperva consistently. Residential proxies — addresses belonging to real ISP subscribers — pass the network-layer check because the IP reputation is clean and the ASN is not associated with hosting.

ThorData provides rotating residential proxies that work well with Playwright's built-in proxy routing. Configure the proxy at the browser level so every request, including the initial TLS handshake and all subsequent XHRs, routes through the same residential exit node for session consistency.

THORDATA_USER = "YOUR_USERNAME"
THORDATA_PASS = "YOUR_PASSWORD"
THORDATA_HOST = "gate.thordata.net"
THORDATA_PORT = "PORT"

def get_proxy_config(country: str = "US", state: str = None) -> dict:
    """Build ThorData proxy config for Playwright."""
    username = THORDATA_USER
    if country:
        username += f"-country-{country}"
    if state:
        username += f"-state-{state}"

    return {
        "server": f"http://{THORDATA_HOST}:{THORDATA_PORT}",
        "username": username,
        "password": THORDATA_PASS,
    }


async def make_browser(playwright, proxy_config: dict = None, headless: bool = True):
    """Launch a stealthy Chromium browser with optional proxy."""
    launch_kwargs = {
        "headless": headless,
        "args": [
            "--disable-blink-features=AutomationControlled",
            "--no-sandbox",
            "--disable-setuid-sandbox",
            "--disable-dev-shm-usage",
            "--disable-gpu-sandbox",
        ],
    }
    if proxy_config:
        launch_kwargs["proxy"] = proxy_config

    browser = await playwright.chromium.launch(**launch_kwargs)
    return browser


async def make_context(browser, apply_stealth: bool = True) -> BrowserContext:
    """Create a browser context with realistic fingerprint."""
    context = await browser.new_context(
        viewport={"width": 1440, "height": 900},
        user_agent=(
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/124.0.0.0 Safari/537.36"
        ),
        locale="en-US",
        timezone_id="America/New_York",
        color_scheme="light",
        device_scale_factor=1,
        java_script_enabled=True,
        accept_downloads=False,
    )

    if apply_stealth:
        await stealth_async(context)

    return context

Intercepting GraphQL Responses

The most efficient approach is intercepting GraphQL API calls rather than parsing DOM:

async def intercept_graphql(page: Page, target_operations: list[str] = None) -> list[dict]:
    """
    Intercept GraphQL responses from Houzz's internal API.

    target_operations: list of GraphQL operation names to capture,
                       or None to capture all
    """
    captured = []

    async def handle_response(response):
        if not ("houzz.com/api" in response.url or "/graphql" in response.url):
            return
        if response.status != 200:
            return

        try:
            body = await response.json()
        except Exception:
            return

        # Filter by operation if specified
        if target_operations:
            op_name = response.request.post_data_json.get("operationName", "") if response.request.post_data else ""
            if op_name and op_name not in target_operations:
                return

        captured.append({
            "url": response.url,
            "data": body,
            "timestamp": datetime.now().isoformat(),
        })

    page.on("response", handle_response)
    return captured


class HouzzScraper:
    """Stateful Houzz scraper with session management."""

    def __init__(self, proxy_config: dict = None, db_path: str = "houzz_data.db"):
        self.proxy_config = proxy_config
        self.db_path = db_path
        self.playwright = None
        self.browser = None
        self.context = None
        self.page = None
        self.request_count = 0
        self.session_limit = 80  # Rotate proxy session after this many requests

    async def __aenter__(self):
        self.playwright = await async_playwright().start()
        self.browser = await make_browser(self.playwright, self.proxy_config)
        self.context = await make_context(self.browser)
        self.page = await self.context.new_page()
        return self

    async def __aexit__(self, *args):
        if self.page:
            await self.page.close()
        if self.context:
            await self.context.close()
        if self.browser:
            await self.browser.close()
        if self.playwright:
            await self.playwright.stop()

    async def rotate_session(self):
        """Close current context and open a fresh one with new proxy session."""
        if self.context:
            await self.context.close()
        self.context = await make_context(self.browser)
        self.page = await self.context.new_page()
        self.request_count = 0
        print("  Rotated browser session")

    async def goto(self, url: str, wait_until: str = "networkidle"):
        """Navigate with rotation check."""
        if self.request_count >= self.session_limit:
            await self.rotate_session()

        await self.page.goto(url, wait_until=wait_until, timeout=45000)
        self.request_count += 1

Extracting Photo Galleries

async def scrape_photo_grid(page: Page, category_url: str,
                              max_photos: int = 60) -> list[dict]:
    """
    Scrape photo grid from a Houzz category page.

    category_url: e.g., 'https://www.houzz.com/photos/living-room-ideas'
    """
    captured_responses = await intercept_graphql(page)

    await page.goto(category_url, wait_until="networkidle")
    await page.wait_for_timeout(2000)

    # Scroll to trigger lazy loading
    await scroll_to_load(page, target_count=max_photos)

    photos = []

    # First try: extract from intercepted GraphQL responses
    for resp_data in captured_responses:
        try:
            data = resp_data["data"]
            # Houzz GraphQL responses typically have data.photosConnection or similar
            photo_edges = (
                data.get("data", {}).get("photosConnection", {}).get("edges", [])
                or data.get("data", {}).get("photos", {}).get("results", [])
            )
            for edge in photo_edges:
                node = edge.get("node", edge)
                photos.append({
                    "id": node.get("id", ""),
                    "title": node.get("title", ""),
                    "caption": node.get("caption", ""),
                    "image_url": node.get("mediumImageUrl") or node.get("imageUrl", ""),
                    "image_hd_url": node.get("largeImageUrl", ""),
                    "project_url": node.get("url", ""),
                    "room_type": node.get("roomType", ""),
                    "style": node.get("style", {}).get("name", ""),
                    "likes": node.get("savesCount", 0),
                    "professional_name": node.get("professional", {}).get("displayName", ""),
                })
        except Exception:
            continue

    # Fallback: DOM parsing
    if not photos:
        photos = await _parse_photo_cards(page)

    return photos[:max_photos]


async def _parse_photo_cards(page: Page) -> list[dict]:
    """Parse photo cards from the DOM."""
    photos = []

    cards = await page.query_selector_all(
        '[data-component="photo-card"], '
        '[class*="hz-photo-card"], '
        'li[class*="photo"]'
    )

    for card in cards:
        img = await card.query_selector("img")
        link = await card.query_selector("a[href]")
        src = await img.get_attribute("src") if img else None
        # Get HD version if available
        srcset = await img.get_attribute("srcset") if img else None
        href = await link.get_attribute("href") if link else None

        if src or href:
            photos.append({
                "image_url": src,
                "image_srcset": srcset,
                "project_url": href,
            })

    return photos


async def scroll_to_load(page: Page, target_count: int = 60,
                           max_stalls: int = 3) -> None:
    """Scroll page to trigger lazy loading until target photo count is reached."""
    previous_count = 0
    stall_count = 0
    scroll_step = 0

    while stall_count < max_stalls:
        # Count current photo cards
        current_count = await page.locator(
            '[data-component="photo-card"], [class*="hz-photo-card"], li[class*="photo"]'
        ).count()

        if current_count >= target_count:
            break

        if current_count == previous_count:
            stall_count += 1
        else:
            stall_count = 0

        previous_count = current_count

        # Scroll with natural variation
        scroll_distance = int(900 + random.random() * 600)
        await page.evaluate(f"window.scrollBy(0, {scroll_distance})")
        scroll_step += 1

        # Variable delay — faster when making progress, slower on stalls
        delay = 1200 + (stall_count * 500) + int(random.random() * 400)
        await page.wait_for_timeout(delay)

Scraping Product Listings

async def scrape_product_page(page: Page, product_url: str) -> dict:
    """Scrape a Houzz product listing page."""
    captured = await intercept_graphql(page)

    await page.goto(product_url, wait_until="domcontentloaded", timeout=30000)

    try:
        await page.wait_for_selector(
            '[data-component="product-info"], [class*="product-main"]',
            timeout=10000
        )
    except Exception:
        pass  # Continue even if selector not found

    product = {
        "url": product_url,
        "name": "",
        "brand": "",
        "price": "",
        "price_original": "",
        "sale": False,
        "rating": None,
        "review_count": 0,
        "description": "",
        "sku": "",
        "category": "",
        "shipping": "",
        "availability": "",
        "images": [],
    }

    # Try structured data first
    ld_data = await page.evaluate("""
        () => {
            const scripts = document.querySelectorAll('script[type="application/ld+json"]');
            for (const script of scripts) {
                try {
                    const data = JSON.parse(script.textContent);
                    if (data['@type'] === 'Product') return data;
                } catch(e) {}
            }
            return null;
        }
    """)

    if ld_data:
        offers = ld_data.get("offers", {})
        product.update({
            "name": ld_data.get("name", ""),
            "brand": ld_data.get("brand", {}).get("name", ""),
            "description": ld_data.get("description", ""),
            "sku": ld_data.get("sku", ""),
            "price": offers.get("price", ""),
            "availability": offers.get("availability", ""),
        })
        aggregate = ld_data.get("aggregateRating", {})
        if aggregate:
            product["rating"] = aggregate.get("ratingValue")
            product["review_count"] = aggregate.get("reviewCount", 0)

    # Also try DOM selectors
    name_el = await page.query_selector('[data-testid="product-name"], h1[class*="product"]')
    if name_el and not product["name"]:
        product["name"] = (await name_el.inner_text()).strip()

    price_el = await page.query_selector('[data-testid="product-price"], [class*="price-value"]')
    if price_el and not product["price"]:
        product["price"] = (await price_el.inner_text()).strip()

    # Collect product images
    img_urls = await page.evaluate("""
        () => {
            const imgs = document.querySelectorAll('[class*="product-image"] img, [data-testid="product-images"] img');
            return [...new Set([...imgs].map(img => img.src || img.dataset.src).filter(Boolean))];
        }
    """)
    product["images"] = img_urls[:10]

    return product


async def scrape_product_category(page: Page, category_url: str,
                                    max_products: int = 50) -> list[dict]:
    """Scrape product listings from a category page."""
    captured = await intercept_graphql(page)

    await page.goto(category_url, wait_until="networkidle")
    await scroll_to_load(page, target_count=max_products)

    # Try intercepted GraphQL data first
    products = []
    for resp_data in captured:
        try:
            data = resp_data["data"]
            product_edges = (
                data.get("data", {}).get("productsConnection", {}).get("edges", [])
                or data.get("data", {}).get("products", {}).get("results", [])
            )
            for edge in product_edges:
                node = edge.get("node", edge)
                products.append({
                    "id": node.get("id", ""),
                    "name": node.get("name", ""),
                    "brand": node.get("brand", {}).get("name", ""),
                    "price": node.get("price", {}).get("displayPrice", ""),
                    "original_price": node.get("price", {}).get("originalPrice", ""),
                    "rating": node.get("aggregateRating", {}).get("ratingValue"),
                    "review_count": node.get("aggregateRating", {}).get("reviewCount", 0),
                    "url": node.get("url", ""),
                    "image_url": node.get("imageUrl", ""),
                    "category": node.get("category", {}).get("name", ""),
                })
        except Exception:
            continue

    # DOM fallback
    if not products:
        product_cards = await page.query_selector_all(
            '[data-component="product-card"], [class*="hz-product"], '
            'li[data-product-id]'
        )
        for card in product_cards:
            name_el = await card.query_selector("h3, [class*='product-name']")
            price_el = await card.query_selector("[class*='price']")
            link_el = await card.query_selector("a[href]")

            if name_el:
                products.append({
                    "name": (await name_el.inner_text()).strip(),
                    "price": (await price_el.inner_text()).strip() if price_el else "",
                    "url": await link_el.get_attribute("href") if link_el else "",
                })

    return products[:max_products]

Professional Profile Scraping

async def scrape_professional_profile(page: Page, pro_url: str) -> dict:
    """Scrape a Houzz professional profile."""
    await page.goto(pro_url, wait_until="domcontentloaded")

    try:
        await page.wait_for_selector('[class*="pro-profile"]', timeout=8000)
    except Exception:
        pass

    # Extract structured data
    profile_data = await page.evaluate("""
        () => {
            const result = {};

            // JSON-LD structured data
            const scripts = document.querySelectorAll('script[type="application/ld+json"]');
            for (const script of scripts) {
                try {
                    const data = JSON.parse(script.textContent);
                    if (['LocalBusiness', 'HomeAndConstructionBusiness', 'Organization'].includes(data['@type'])) {
                        result.name = data.name;
                        result.description = data.description;
                        result.address = data.address;
                        result.phone = data.telephone;
                        result.url = data.url;
                        result.rating = data.aggregateRating?.ratingValue;
                        result.review_count = data.aggregateRating?.reviewCount;
                        result.price_range = data.priceRange;
                        break;
                    }
                } catch(e) {}
            }

            // Additional stats from DOM
            const statsEls = document.querySelectorAll('[class*="pro-stat"], [class*="stats-value"]');
            statsEls.forEach(el => {
                const parent = el.closest('[class*="stat-item"]');
                if (parent) {
                    const label = parent.querySelector('[class*="stat-label"]')?.innerText?.trim();
                    const value = el.innerText?.trim();
                    if (label && value) result[label.toLowerCase().replace(/\s+/g, '_')] = value;
                }
            });

            return result;
        }
    """)

    # Scrape services/specialties
    services = await page.evaluate("""
        () => {
            const els = document.querySelectorAll('[class*="service-tag"], [class*="specialty"], [class*="category-tag"]');
            return [...new Set([...els].map(el => el.innerText.trim()).filter(Boolean))];
        }
    """)

    # Badge information (licensed, insured, etc.)
    badges = await page.evaluate("""
        () => {
            const els = document.querySelectorAll('[class*="badge"], [class*="credential"], [class*="verified"]');
            return [...els].map(el => el.innerText.trim()).filter(b => b.length > 0 && b.length < 60);
        }
    """)

    # Photos count
    photos_count = await page.evaluate("""
        () => {
            const el = document.querySelector('[class*="photo-count"], [class*="photos-tab"]');
            return el ? el.innerText.trim() : '0';
        }
    """)

    profile_data.update({
        "url": pro_url,
        "services": services[:20],
        "badges": list(set(badges))[:10],
        "photos_count": photos_count,
    })

    return profile_data


async def search_professionals(page: Page, query: str,
                                 location: str = None,
                                 category: str = None,
                                 max_results: int = 30) -> list[dict]:
    """Search for professionals on Houzz."""
    base_url = "https://www.houzz.com/professionals"
    params = []
    if query:
        params.append(f"q={query.replace(' ', '+')}")
    if location:
        params.append(f"location={location.replace(' ', '+')}")
    if category:
        base_url += f"/{category}"

    url = base_url + ("?" + "&".join(params) if params else "")
    await page.goto(url, wait_until="networkidle")

    pros = await page.evaluate("""
        (maxResults) => {
            const cards = document.querySelectorAll('[data-component="pro-card"], [class*="pro-result"]');
            return [...cards].slice(0, maxResults).map(card => ({
                name: card.querySelector('[class*="pro-name"], h3')?.innerText?.trim() || '',
                location: card.querySelector('[class*="location"]')?.innerText?.trim() || '',
                rating: card.querySelector('[class*="rating-value"]')?.innerText?.trim() || '',
                review_count: card.querySelector('[class*="review-count"]')?.innerText?.trim() || '',
                category: card.querySelector('[class*="category"]')?.innerText?.trim() || '',
                url: card.querySelector('a[href]')?.href || '',
            }));
        }
    """, max_results)

    return pros

SQLite Storage

def init_houzz_db(db_path: str = "houzz_data.db") -> sqlite3.Connection:
    """Initialize SQLite database for Houzz data."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")

    conn.execute("""
        CREATE TABLE IF NOT EXISTS photos (
            id TEXT PRIMARY KEY,
            title TEXT,
            caption TEXT,
            image_url TEXT,
            image_hd_url TEXT,
            project_url TEXT,
            room_type TEXT,
            style TEXT,
            likes INTEGER DEFAULT 0,
            professional_name TEXT,
            category_scraped TEXT,
            raw_data TEXT,  -- Full JSON for re-parsing
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id TEXT,
            name TEXT NOT NULL,
            brand TEXT,
            price TEXT,
            original_price TEXT,
            sale INTEGER DEFAULT 0,
            rating REAL,
            review_count INTEGER DEFAULT 0,
            description TEXT,
            sku TEXT,
            category TEXT,
            url TEXT,
            image_url TEXT,
            availability TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (id, scraped_at)
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS professionals (
            url TEXT PRIMARY KEY,
            name TEXT,
            description TEXT,
            address TEXT,  -- JSON
            phone TEXT,
            rating REAL,
            review_count INTEGER DEFAULT 0,
            price_range TEXT,
            services TEXT,  -- JSON array
            badges TEXT,    -- JSON array
            photos_count TEXT,
            raw_data TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS ideabook_items (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            ideabook_url TEXT NOT NULL,
            item_title TEXT,
            item_url TEXT,
            item_type TEXT,  -- 'photo', 'product'
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    conn.execute("CREATE INDEX IF NOT EXISTS idx_photos_room ON photos(room_type)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_photos_style ON photos(style)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_photos_likes ON photos(likes)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_products_category ON products(category)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_pros_rating ON professionals(rating)")

    conn.commit()
    return conn


def save_photos(conn: sqlite3.Connection, photos: list[dict],
                 category: str = "") -> int:
    """Bulk save photos to database."""
    saved = 0
    for photo in photos:
        photo_id = photo.get("id") or str(hash(photo.get("image_url", "")))
        try:
            conn.execute(
                """INSERT OR IGNORE INTO photos
                   (id, title, caption, image_url, image_hd_url, project_url,
                    room_type, style, likes, professional_name, category_scraped, raw_data)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    photo_id, photo.get("title"), photo.get("caption"),
                    photo.get("image_url"), photo.get("image_hd_url"),
                    photo.get("project_url"), photo.get("room_type"),
                    photo.get("style"), photo.get("likes", 0),
                    photo.get("professional_name"), category,
                    json.dumps(photo),
                )
            )
            saved += 1
        except sqlite3.Error:
            continue
    conn.commit()
    return saved


def save_product(conn: sqlite3.Connection, product: dict) -> None:
    """Save a product record."""
    product_id = product.get("id") or str(hash(product.get("url", "")))
    conn.execute(
        """INSERT OR REPLACE INTO products
           (id, name, brand, price, original_price, rating, review_count,
            description, sku, category, url, image_url, availability)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
        (
            product_id, product.get("name"), product.get("brand"),
            product.get("price"), product.get("price_original"),
            product.get("rating"), product.get("review_count", 0),
            product.get("description"), product.get("sku"),
            product.get("category"), product.get("url"),
            product.get("images", [None])[0] if product.get("images") else None,
            product.get("availability"),
        )
    )
    conn.commit()

Full Scraping Pipeline

ROOM_CATEGORIES = [
    ("living-room", "https://www.houzz.com/photos/living-room-ideas"),
    ("bedroom", "https://www.houzz.com/photos/bedroom-ideas"),
    ("kitchen", "https://www.houzz.com/photos/kitchen-ideas"),
    ("bathroom", "https://www.houzz.com/photos/bathroom-ideas"),
    ("dining-room", "https://www.houzz.com/photos/dining-room-ideas"),
    ("home-office", "https://www.houzz.com/photos/home-office-ideas"),
]

PRODUCT_CATEGORIES = [
    ("sofas", "https://www.houzz.com/products/sofas-catid-140715"),
    ("beds", "https://www.houzz.com/products/beds-catid-140818"),
    ("dining-tables", "https://www.houzz.com/products/dining-tables-catid-140796"),
]


async def run_houzz_pipeline(
    db_path: str = "houzz_data.db",
    proxy_config: dict = None,
    photos_per_category: int = 60,
    products_per_category: int = 40,
) -> dict:
    """Full Houzz scraping pipeline for photos and products."""
    conn = init_houzz_db(db_path)
    stats = {"photos": 0, "products": 0, "professionals": 0, "errors": 0}

    async with async_playwright() as p:
        browser = await make_browser(p, proxy_config)
        context = await make_context(browser)
        page = await context.new_page()

        # Phase 1: Scrape photo galleries
        print("\n=== Photo Galleries ===")
        for room_type, url in ROOM_CATEGORIES:
            print(f"  {room_type}...")
            try:
                photos = await scrape_photo_grid(page, url, photos_per_category)
                saved = save_photos(conn, photos, room_type)
                stats["photos"] += saved
                print(f"  Saved {saved}/{len(photos)} photos")
            except Exception as e:
                print(f"  Error: {e}")
                stats["errors"] += 1

            # Delay between categories
            await asyncio.sleep(random.uniform(8, 15))

        # Phase 2: Scrape products
        print("\n=== Products ===")
        for cat_name, url in PRODUCT_CATEGORIES:
            print(f"  {cat_name}...")
            try:
                products = await scrape_product_category(page, url, products_per_category)
                for product in products:
                    save_product(conn, product)
                stats["products"] += len(products)
                print(f"  Saved {len(products)} products")
            except Exception as e:
                print(f"  Error: {e}")
                stats["errors"] += 1

            await asyncio.sleep(random.uniform(8, 15))

        await browser.close()

    conn.close()

    print(f"\n=== Pipeline Complete ===")
    for key, val in stats.items():
        print(f"  {key}: {val}")

    return stats


def analyze_style_distribution(db_path: str) -> list:
    """Count photos by style to understand popular interior styles."""
    conn = sqlite3.connect(db_path)
    rows = conn.execute(
        """SELECT style, COUNT(*) as count, AVG(likes) as avg_likes
           FROM photos
           WHERE style != '' AND style IS NOT NULL
           GROUP BY style
           ORDER BY count DESC""",
    ).fetchall()
    conn.close()
    return rows


def price_range_by_category(db_path: str) -> list:
    """Analyze product pricing by category."""
    conn = sqlite3.connect(db_path)
    rows = conn.execute(
        """SELECT category,
                  COUNT(*) as product_count,
                  AVG(CAST(REPLACE(REPLACE(price, '$', ''), ',', '') AS REAL)) as avg_price
           FROM products
           WHERE price != '' AND price IS NOT NULL AND category != ''
           GROUP BY category
           ORDER BY product_count DESC""",
    ).fetchall()
    conn.close()
    return rows


if __name__ == "__main__":
    proxy = get_proxy_config(country="US", state="NY")
    asyncio.run(run_houzz_pipeline(
        db_path="houzz_data.db",
        proxy_config=proxy,
        photos_per_category=80,
        products_per_category=50,
    ))

Rate Limits and Politeness

Even with residential proxies, aggressive crawling damages IP reputation over time. Practical throttle guidelines:

Operation Safe rate Notes
Photo category pages 1 req/10-15s Scroll takes time anyway
Product pages 1 req/8-12s DOM-heavy, wait for content
Professional profiles 1 req/6-10s Lighter pages
Professional search 1 req/12-20s Most likely to trigger captcha

Rotate proxy sessions every 50-80 page loads to avoid per-IP rate limits accumulating within a single residential IP.

Store raw HTML or intercepted JSON to disk before parsing, so a schema change in Houzz's frontend does not require a re-crawl of the entire dataset. Houzz updates their React bundle regularly.

Summary

Scraping Houzz in 2026 requires Playwright (not requests), stealth patches to mask headless Chromium signals, and residential proxies from ThorData to pass Imperva's network-layer checks. The core workflow is:

  1. Launch a stealthy browser context through a rotating residential proxy
  2. Navigate to the target URL and wait for the SPA to fully render
  3. Intercept GraphQL responses for clean structured data (preferred over DOM parsing)
  4. Simulate realistic scroll behavior to trigger lazy loading
  5. Store results to SQLite with proper indexing for analysis queries

With this stack in place, all four major Houzz data types — photos, products, professional profiles, and project collections — are accessible through the public interface.