← Back to blog

Scraping Trustpilot Reviews at Scale (2026)

Scraping Trustpilot Reviews at Scale (2026)

Trustpilot is one of the few major review platforms that still exposes a usable public JSON API without requiring authentication for most operations. That makes it a practical target for competitive intelligence, sentiment analysis, and brand monitoring pipelines. This guide covers the complete workflow: locating business unit IDs, querying the reviews endpoint, handling pagination, filtering by star rating, detecting suspicious reviews, storing in SQLite, and scaling with proxies.

What You Can Extract

Each Trustpilot review object includes:

The verified flag matters for analysis. Verified reviews come from customers the company invited via email upload — these have higher credibility than open submissions. Unverified reviews are publicly submitted by anyone.

Setting Up Your Environment

pip install httpx requests beautifulsoup4 sqlite3 curl-cffi

For proxy support and Akamai bypass:

pip install curl-cffi

Finding Business Unit IDs

Every Trustpilot company profile has a businessUnitId embedded in the Next.js page data. It's a 24-character hex string that identifies the company uniquely across Trustpilot's API. You extract it from the __NEXT_DATA__ script block:

import httpx
import json
import re
import time
import random
import sqlite3
from datetime import datetime
from curl_cffi import requests as cffi_requests

# ThorData residential proxy — required for Akamai Bot Manager bypass at scale
# https://thordata.partnerstack.com/partner/0a0x4nzq (or [Oxylabs](https://oxylabs.go2cloud.org/aff_c?offer_id=7&aff_id=2066&url_id=174))
PROXY = "http://USERNAME:[email protected]:7777"

def get_business_unit_id(company_slug: str, proxy: str = None) -> str | None:
    """
    Extract Trustpilot business unit ID from a company URL slug.

    company_slug examples:
      - "amazon.com" → reviews at trustpilot.com/review/amazon.com
      - "netflix.com"
      - "airbnb.com"
      - "booking.com"
    """
    url = f"https://www.trustpilot.com/review/{company_slug}"
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Sec-Fetch-Dest": "document",
        "Sec-Fetch-Mode": "navigate",
        "Sec-Fetch-Site": "none",
    }

    session = cffi_requests.Session(impersonate="chrome124")
    if proxy:
        session.proxies = {"http": proxy, "https": proxy}

    try:
        resp = session.get(url, headers=headers, timeout=20)
        if resp.status_code != 200:
            print(f"Failed to load {url}: HTTP {resp.status_code}")
            return None

        # Method 1: __NEXT_DATA__ JSON block (most reliable)
        match = re.search(
            r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
            resp.text,
            re.DOTALL,
        )
        if match:
            try:
                data = json.loads(match.group(1))
                buid = (
                    data.get("props", {})
                    .get("pageProps", {})
                    .get("businessUnit", {})
                    .get("id")
                )
                if buid:
                    return buid
            except (json.JSONDecodeError, AttributeError):
                pass

        # Method 2: Regex scan for business unit ID pattern
        match = re.search(r'"businessUnitId"\s*:\s*"([a-f0-9]{24})"', resp.text)
        if match:
            return match.group(1)

        # Method 3: Look in Apollo state
        match = re.search(r'"id"\s*:\s*"([a-f0-9]{24})"', resp.text)
        if match:
            return match.group(1)

        print(f"Could not find business unit ID for {company_slug}")
        return None

    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None

def get_company_info(company_slug: str, proxy: str = None) -> dict:
    """Get company info including business unit ID and basic stats."""
    url = f"https://www.trustpilot.com/review/{company_slug}"
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
        "Accept-Language": "en-US,en;q=0.9",
    }

    session = cffi_requests.Session(impersonate="chrome124")
    if proxy:
        session.proxies = {"http": proxy, "https": proxy}

    resp = session.get(url, headers=headers, timeout=20)
    if resp.status_code != 200:
        return {"error": f"HTTP {resp.status_code}"}

    match = re.search(
        r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>',
        resp.text, re.DOTALL
    )
    if not match:
        return {"error": "No __NEXT_DATA__ found"}

    try:
        data = json.loads(match.group(1))
        page_props = data.get("props", {}).get("pageProps", {})
        bu = page_props.get("businessUnit", {})

        return {
            "business_unit_id": bu.get("id"),
            "display_name": bu.get("displayName"),
            "name": bu.get("name"),
            "website": bu.get("websiteUrl"),
            "category": bu.get("categories", [{}])[0].get("categoryId") if bu.get("categories") else None,
            "trust_score": bu.get("trustScore"),
            "stars": bu.get("stars"),
            "number_of_reviews": bu.get("numberOfReviews"),
            "review_distribution": bu.get("reviewDistribution", {}),
            "verified": bu.get("verified", False),
            "claimed": bu.get("claimed", False),
            "logo_url": bu.get("logo", {}).get("url") if isinstance(bu.get("logo"), dict) else None,
        }
    except (json.JSONDecodeError, AttributeError, TypeError) as e:
        return {"error": f"Parse error: {e}"}

# Look up several companies
companies_to_scrape = ["amazon.com", "netflix.com", "airbnb.com", "uber.com", "doordash.com"]
for slug in companies_to_scrape:
    info = get_company_info(slug, proxy=PROXY)
    if "error" not in info:
        print(f"{info.get('display_name')}: {info.get('trust_score')}/5.0 "
              f"({info.get('number_of_reviews', 0):,} reviews) — ID: {info.get('business_unit_id')}")
    time.sleep(random.uniform(2, 4))

Querying the Reviews API

With a business unit ID in hand, hit the public reviews endpoint. This endpoint returns JSON without requiring authentication under normal traffic patterns:

API_BASE = "https://www.trustpilot.com/api/v1/business-units"

def fetch_reviews_page(
    business_unit_id: str,
    page: int = 1,
    stars: int = None,
    language: str = "en",
    proxy: str = None,
) -> dict:
    """
    Fetch one page of reviews from Trustpilot's public API.

    stars: Filter by star rating (1-5). None returns all stars.
    language: ISO 639-1 language code ("en", "de", "fr", etc.)
    """
    url = f"{API_BASE}/{business_unit_id}/reviews"
    params = {
        "page": page,
        "perPage": 20,  # Maximum allowed by API
        "language": language,
        "orderBy": "publishedDate",
        "order": "desc",
    }
    if stars is not None:
        params["stars"] = stars

    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
        "Accept": "application/json",
        "Accept-Language": "en-US,en;q=0.9",
        "Referer": f"https://www.trustpilot.com/review/",
    }

    proxies = None
    if proxy:
        proxies = {"http://": proxy, "https://": proxy}

    for attempt in range(5):
        try:
            with httpx.Client(proxies=proxies, timeout=25) as client:
                resp = client.get(url, params=params, headers=headers)

            if resp.status_code == 200:
                return resp.json()

            if resp.status_code == 429:
                # Respect Retry-After header
                retry_after = int(resp.headers.get("Retry-After", 2 ** attempt))
                print(f"Rate limited on page {page}. Waiting {retry_after}s...")
                time.sleep(retry_after)
                continue

            if resp.status_code == 404:
                print(f"Business unit {business_unit_id} not found")
                return {}

            if resp.status_code == 403:
                print(f"403 on page {page} — IP may be blocked. Try a different proxy.")
                time.sleep(30)
                continue

            print(f"Unexpected status {resp.status_code} on page {page}")
            time.sleep(2 ** attempt)

        except httpx.ProxyError:
            print(f"Proxy error on attempt {attempt+1}")
            time.sleep(5)
        except httpx.TimeoutException:
            print(f"Timeout on page {page}, attempt {attempt+1}")
            time.sleep(10 * (attempt + 1))
        except Exception as e:
            print(f"Error on page {page}: {e}")
            if attempt == 4:
                break
            time.sleep(5)

    return {}

def parse_review(review: dict) -> dict:
    """Normalize a raw Trustpilot review object."""
    consumer = review.get("consumer", {})
    company_reply = review.get("companyReply")

    return {
        "id": review.get("id"),
        "title": review.get("title", ""),
        "text": review.get("text", ""),
        "rating": review.get("rating"),
        "language": review.get("language"),
        "date_published": review.get("dates", {}).get("publishedDate"),
        "date_experience": review.get("dates", {}).get("experiencedDate"),
        "verified": review.get("labels", {}).get("verified", {}).get("isVerified", False),
        "has_reply": company_reply is not None,
        "reply_text": company_reply.get("text") if company_reply else None,
        "reply_date": company_reply.get("publishedDate") if company_reply else None,
        "reviewer_name": consumer.get("displayName"),
        "reviewer_country": consumer.get("countryCode"),
        "reviewer_review_count": consumer.get("numberOfReviews"),
        "reviewer_is_verified": consumer.get("isVerified", False),
        "like_count": review.get("likeCount", 0),
        "images": [img.get("url") for img in review.get("images", []) if isinstance(img, dict)],
        "update_notice": review.get("updateNotice"),
    }

Paginating Through All Reviews

The API caps at 20 reviews per page and shows approximately 100 pages per filter before the pagination stops. Run separate passes by star rating to maximize coverage:

def scrape_all_reviews(
    business_unit_id: str,
    max_pages: int = 50,
    language: str = "en",
    proxy: str = None,
    delay: float = 1.2,
) -> list[dict]:
    """
    Scrape all reviews for a business unit across all star ratings.
    Returns normalized review dicts.
    """
    all_reviews = []
    seen_ids = set()

    for page in range(1, max_pages + 1):
        data = fetch_reviews_page(business_unit_id, page, language=language, proxy=proxy)
        batch = data.get("reviews", [])

        if not batch:
            print(f"No more reviews at page {page}. Total: {len(all_reviews)}")
            break

        for raw_review in batch:
            review = parse_review(raw_review)
            if review["id"] and review["id"] not in seen_ids:
                seen_ids.add(review["id"])
                all_reviews.append(review)

        print(f"Page {page}: +{len(batch)} reviews (total: {len(all_reviews)})")

        if not data.get("pagination", {}).get("nextPage"):
            print(f"Reached end of pagination at page {page}")
            break

        time.sleep(random.uniform(delay * 0.8, delay * 1.4))

    return all_reviews

def scrape_reviews_by_star(
    business_unit_id: str,
    stars: int,
    max_pages: int = 100,
    proxy: str = None,
) -> list[dict]:
    """Collect all reviews for a specific star rating."""
    reviews = []
    seen_ids = set()

    for page in range(1, max_pages + 1):
        data = fetch_reviews_page(
            business_unit_id, page, stars=stars, proxy=proxy
        )
        batch = data.get("reviews", [])

        if not batch:
            break

        for raw in batch:
            review = parse_review(raw)
            if review["id"] and review["id"] not in seen_ids:
                seen_ids.add(review["id"])
                reviews.append(review)

        if not data.get("pagination", {}).get("nextPage"):
            break

        time.sleep(random.uniform(1.0, 1.8))

    return reviews

def scrape_complete_review_set(
    business_unit_id: str,
    max_per_star: int = 100,
    proxy: str = None,
) -> list[dict]:
    """
    Scrape up to max_per_star pages per star rating.
    Running separate passes per star gives up to 5 * 100 * 20 = 10,000 reviews.
    """
    all_reviews = []

    for stars in range(1, 6):
        print(f"\nScraping {stars}★ reviews...")
        star_reviews = scrape_reviews_by_star(
            business_unit_id, stars=stars, max_pages=max_per_star, proxy=proxy
        )
        all_reviews.extend(star_reviews)
        print(f"  Got {len(star_reviews)} {stars}★ reviews")

        # Pause between star rating batches
        time.sleep(random.uniform(10, 20))

    return all_reviews

Fake Review Detection

Trustpilot has its own moderation layer, but coordinated review campaigns still make it through. Multiple weak signals combine into reliable detection:

from collections import Counter
import statistics

def analyze_review_authenticity(reviews: list[dict]) -> dict:
    """
    Analyze a set of reviews for authenticity signals.
    Returns scored reviews plus an aggregate analysis report.
    """
    if not reviews:
        return {"scored_reviews": [], "analysis": {}}

    # Build context for scoring
    date_counts = Counter(
        r["date_published"][:10] if r.get("date_published") else ""
        for r in reviews
    )

    # Group by reviewer review count buckets
    novice_reviewers = sum(
        1 for r in reviews
        if (r.get("reviewer_review_count") or 0) <= 2
    )

    def score_review(r: dict) -> float:
        """Score a single review for suspiciousness (0.0=clean, 1.0=very suspicious)."""
        score = 0.0
        text = ((r.get("title") or "") + " " + (r.get("text") or "")).strip()

        # Signal 1: First-time reviewer
        if (r.get("reviewer_review_count") or 0) <= 1:
            score += 0.35

        # Signal 2: Very short text
        if len(text) < 40:
            score += 0.25

        # Signal 3: Generic praise phrases (common in fake 5-star reviews)
        generic_phrases = [
            "great service", "highly recommend", "five stars",
            "best ever", "amazing service", "excellent service",
            "fantastic", "wonderful experience", "very happy",
            "would recommend", "great experience", "outstanding",
        ]
        if any(p in text.lower() for p in generic_phrases) and len(text) < 100:
            score += 0.15

        # Signal 4: Burst posting pattern
        date_str = r.get("date_published", "")[:10]
        if date_str and date_counts.get(date_str, 0) >= 8:
            score += 0.20

        # Signal 5: Unverified 5-star
        if not r.get("verified") and r.get("rating") == 5:
            score += 0.10

        # Signal 6: No country info (bots often omit this)
        if not r.get("reviewer_country"):
            score += 0.05

        # Signal 7: Experience date far before publish date (unusual for genuine reviews)
        pub_date = r.get("date_published", "")
        exp_date = r.get("date_experience", "")
        if pub_date and exp_date:
            try:
                pub_dt = datetime.fromisoformat(pub_date[:19])
                exp_dt = datetime.fromisoformat(exp_date[:19])
                gap_days = abs((pub_dt - exp_dt).days)
                if gap_days > 365:
                    score += 0.10
            except ValueError:
                pass

        return min(round(score, 2), 1.0)

    # Score all reviews
    scored = [{**r, "fake_score": score_review(r)} for r in reviews]

    # Aggregate analysis
    all_scores = [r["fake_score"] for r in scored]
    high_risk = [r for r in scored if r["fake_score"] >= 0.5]
    five_star_high_risk = [r for r in high_risk if r.get("rating") == 5]
    one_star_high_risk = [r for r in high_risk if r.get("rating") == 1]

    # Date burst analysis
    burst_dates = {date: count for date, count in date_counts.items() if count >= 10}

    analysis = {
        "total_reviews": len(reviews),
        "high_risk_count": len(high_risk),
        "high_risk_pct": round(len(high_risk) / max(len(reviews), 1) * 100, 1),
        "avg_fake_score": round(statistics.mean(all_scores), 3) if all_scores else 0,
        "median_fake_score": round(statistics.median(all_scores), 3) if all_scores else 0,
        "novice_reviewer_pct": round(novice_reviewers / max(len(reviews), 1) * 100, 1),
        "five_star_high_risk": len(five_star_high_risk),
        "one_star_high_risk": len(one_star_high_risk),
        "burst_dates": burst_dates,
        "verdict": (
            "HIGH RISK — likely coordinated campaign" if len(high_risk) / max(len(reviews), 1) > 0.3
            else "MODERATE RISK — some suspicious patterns" if len(high_risk) / max(len(reviews), 1) > 0.1
            else "LOW RISK — mostly authentic-looking reviews"
        ),
    }

    return {"scored_reviews": scored, "analysis": analysis}

def print_authenticity_report(analysis_result: dict, company_name: str = ""):
    """Print a formatted authenticity analysis report."""
    a = analysis_result["analysis"]
    print(f"\n{'='*60}")
    print(f"Trustpilot Review Authenticity Report: {company_name}")
    print(f"{'='*60}")
    print(f"Total reviews analyzed:  {a['total_reviews']:,}")
    print(f"High-risk reviews:       {a['high_risk_count']:,} ({a['high_risk_pct']}%)")
    print(f"Average fake score:      {a['avg_fake_score']:.3f}")
    print(f"Novice reviewers:        {a['novice_reviewer_pct']}%")
    print(f"Suspicious 5★ reviews:  {a['five_star_high_risk']:,}")
    print(f"Suspicious 1★ reviews:  {a['one_star_high_risk']:,}")
    print(f"Verdict: {a['verdict']}")

    if a["burst_dates"]:
        print(f"\nSuspicious burst dates (10+ reviews/day):")
        for date, count in sorted(a["burst_dates"].items(), key=lambda x: -x[1])[:5]:
            print(f"  {date}: {count} reviews")

SQLite Storage and Querying

def setup_trustpilot_database(db_path: str) -> sqlite3.Connection:
    """Create optimized SQLite schema for Trustpilot data."""
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")

    conn.execute("""
        CREATE TABLE IF NOT EXISTS companies (
            business_unit_id TEXT PRIMARY KEY,
            slug TEXT,
            display_name TEXT,
            trust_score REAL,
            stars REAL,
            number_of_reviews INTEGER,
            website TEXT,
            category TEXT,
            claimed INTEGER,
            verified INTEGER,
            scraped_at TEXT
        )
    """)

    conn.execute("""
        CREATE TABLE IF NOT EXISTS reviews (
            id TEXT PRIMARY KEY,
            business_unit_id TEXT,
            title TEXT,
            text TEXT,
            rating INTEGER,
            language TEXT,
            date_published TEXT,
            date_experience TEXT,
            verified INTEGER DEFAULT 0,
            has_reply INTEGER DEFAULT 0,
            reply_text TEXT,
            reply_date TEXT,
            reviewer_name TEXT,
            reviewer_country TEXT,
            reviewer_review_count INTEGER,
            reviewer_is_verified INTEGER DEFAULT 0,
            like_count INTEGER DEFAULT 0,
            fake_score REAL DEFAULT 0.0,
            scraped_at TEXT,
            FOREIGN KEY (business_unit_id) REFERENCES companies(business_unit_id)
        )
    """)

    # Useful indexes for analytics queries
    conn.execute("CREATE INDEX IF NOT EXISTS idx_reviews_buid ON reviews(business_unit_id)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_reviews_rating ON reviews(rating)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_reviews_date ON reviews(date_published)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_reviews_fake ON reviews(fake_score)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_reviews_verified ON reviews(verified)")

    conn.commit()
    return conn

def save_company(conn: sqlite3.Connection, slug: str, info: dict):
    """Save company info to database."""
    conn.execute("""
        INSERT OR REPLACE INTO companies
        (business_unit_id, slug, display_name, trust_score, stars,
         number_of_reviews, website, category, claimed, verified, scraped_at)
        VALUES (?,?,?,?,?,?,?,?,?,?,?)
    """, (
        info.get("business_unit_id"),
        slug,
        info.get("display_name"),
        info.get("trust_score"),
        info.get("stars"),
        info.get("number_of_reviews"),
        info.get("website"),
        info.get("category"),
        int(info.get("claimed", False)),
        int(info.get("verified", False)),
        datetime.now().isoformat(),
    ))

def save_reviews(conn: sqlite3.Connection, reviews: list[dict], business_unit_id: str):
    """Save reviews to database in a batch."""
    now = datetime.now().isoformat()
    saved = 0

    for r in reviews:
        try:
            conn.execute("""
                INSERT OR IGNORE INTO reviews
                (id, business_unit_id, title, text, rating, language,
                 date_published, date_experience, verified, has_reply,
                 reply_text, reply_date, reviewer_name, reviewer_country,
                 reviewer_review_count, reviewer_is_verified, like_count,
                 fake_score, scraped_at)
                VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
            """, (
                r["id"], business_unit_id,
                r.get("title"), r.get("text"),
                r.get("rating"), r.get("language"),
                r.get("date_published"), r.get("date_experience"),
                int(r.get("verified", False)),
                int(r.get("has_reply", False)),
                r.get("reply_text"), r.get("reply_date"),
                r.get("reviewer_name"), r.get("reviewer_country"),
                r.get("reviewer_review_count"),
                int(r.get("reviewer_is_verified", False)),
                r.get("like_count", 0),
                r.get("fake_score", 0.0),
                now,
            ))
            saved += 1
        except sqlite3.IntegrityError:
            pass  # Duplicate ID, skip

    conn.commit()
    return saved

def query_review_stats(conn: sqlite3.Connection, business_unit_id: str) -> None:
    """Print rating distribution and fake score stats for a company."""
    stats = conn.execute("""
        SELECT
            rating,
            COUNT(*) as n,
            ROUND(AVG(fake_score), 3) as avg_fake,
            SUM(CASE WHEN verified = 1 THEN 1 ELSE 0 END) as verified_count,
            ROUND(100.0 * SUM(CASE WHEN verified = 1 THEN 1 ELSE 0 END) / COUNT(*), 1) as verified_pct
        FROM reviews
        WHERE business_unit_id = ?
        GROUP BY rating
        ORDER BY rating DESC
    """, (business_unit_id,)).fetchall()

    print(f"\n{'Stars':<8} {'Count':<8} {'Avg Fake':>10} {'Verified':>10} {'Verified%':>10}")
    print("-" * 50)
    for row in stats:
        print(f"{row[0]:<8} {row[1]:<8} {row[2]:>10.3f} {row[3]:>10} {row[4]:>9.1f}%")

    total = conn.execute(
        "SELECT COUNT(*), ROUND(AVG(fake_score), 3) FROM reviews WHERE business_unit_id = ?",
        (business_unit_id,)
    ).fetchone()
    print(f"\nTotal: {total[0]:,} reviews | Overall avg fake score: {total[1]}")

def get_suspicious_reviews(
    conn: sqlite3.Connection,
    business_unit_id: str,
    threshold: float = 0.5,
    limit: int = 20,
) -> list[dict]:
    """Get the most suspicious reviews above a fake score threshold."""
    cursor = conn.execute("""
        SELECT id, title, text, rating, date_published, reviewer_name,
               reviewer_review_count, verified, fake_score
        FROM reviews
        WHERE business_unit_id = ? AND fake_score >= ?
        ORDER BY fake_score DESC, date_published DESC
        LIMIT ?
    """, (business_unit_id, threshold, limit))

    rows = cursor.fetchall()
    columns = ["id", "title", "text", "rating", "date_published",
               "reviewer_name", "reviewer_review_count", "verified", "fake_score"]
    return [dict(zip(columns, row)) for row in rows]

Full Pipeline: Multi-Company Competitive Analysis

def run_competitive_analysis(
    companies: dict,  # {slug: display_name}
    db_path: str = "trustpilot_competitive.db",
    max_per_star: int = 50,
    proxy: str = None,
) -> dict:
    """
    Full pipeline: scrape reviews for multiple companies, score for authenticity,
    store in SQLite, and return comparative stats.

    companies example:
        {
            "amazon.com": "Amazon",
            "ebay.com": "eBay",
            "walmart.com": "Walmart",
        }
    """
    conn = setup_trustpilot_database(db_path)
    results = {}

    for slug, display_name in companies.items():
        print(f"\n{'='*60}")
        print(f"Processing: {display_name} ({slug})")
        print(f"{'='*60}")

        # Get company info and business unit ID
        info = get_company_info(slug, proxy=proxy)
        if "error" in info or not info.get("business_unit_id"):
            print(f"  Could not get company info: {info.get('error', 'No ID found')}")
            results[slug] = {"error": "failed"}
            continue

        buid = info["business_unit_id"]
        print(f"  Business Unit ID: {buid}")
        print(f"  Trust Score: {info.get('trust_score')}/5.0 | "
              f"Reviews: {info.get('number_of_reviews', 0):,}")

        save_company(conn, slug, info)

        # Scrape reviews per star rating
        all_reviews = scrape_complete_review_set(
            buid,
            max_per_star=max_per_star,
            proxy=proxy,
        )
        print(f"\n  Total reviews collected: {len(all_reviews)}")

        # Score for authenticity
        analysis = analyze_review_authenticity(all_reviews)
        scored_reviews = analysis["scored_reviews"]

        # Save to database
        saved_count = save_reviews(conn, scored_reviews, buid)
        print(f"  Saved {saved_count} reviews to database")

        # Print authenticity report
        print_authenticity_report(analysis, display_name)

        results[slug] = {
            "business_unit_id": buid,
            "trust_score": info.get("trust_score"),
            "total_reviews_scraped": len(all_reviews),
            "authenticity_analysis": analysis["analysis"],
        }

        # Pause between companies
        time.sleep(random.uniform(30, 60))

    # Print comparative summary
    print(f"\n{'='*60}")
    print("COMPARATIVE SUMMARY")
    print(f"{'='*60}")
    print(f"{'Company':<25} {'Score':>6} {'Reviews':>8} {'High Risk%':>10}")
    print("-" * 55)
    for slug, result in results.items():
        if "error" not in result:
            a = result["authenticity_analysis"]
            print(
                f"{companies[slug]:<25} "
                f"{result['trust_score']:>6.1f} "
                f"{result['total_reviews_scraped']:>8,} "
                f"{a['high_risk_pct']:>9.1f}%"
            )

    conn.close()
    return results

# Run competitive analysis
companies = {
    "amazon.com": "Amazon",
    "ebay.com": "eBay",
    "etsy.com": "Etsy",
}

results = run_competitive_analysis(
    companies,
    db_path="ecommerce_review_analysis.db",
    max_per_star=25,
    proxy=PROXY,
)

Scaling with Proxies

Trustpilot's infrastructure sits behind Akamai Bot Manager. At the single-IP level: - Initial page visits trigger a JavaScript challenge (handled by curl-cffi) - The /api/v1/business-units endpoint is more permissive but still tracks IP patterns - Sustained traffic from a datacenter IP triggers 403 responses after 30-50 requests

ThorData's rotating residential proxy pool addresses this:

def get_rotating_proxy(session_id: str = None) -> str:
    """
    Get a ThorData proxy URL.
    Use session_id for sticky routing (same IP per session).
    Omit for rotation (new IP each request).
    """
    base = "http://USERNAME:PASSWORD"
    host = "gate.thordata.com:7777"

    if session_id:
        return f"{base}-session-{session_id}@{host}"
    return f"{base}@{host}"

# For scraping one company: sticky session (same IP throughout)
company_session = str(random.randint(10000, 99999))
proxy = get_rotating_proxy(company_session)

buid = get_business_unit_id("booking.com", proxy=proxy)
reviews = scrape_all_reviews(buid, max_pages=100, proxy=proxy)

# For multi-company scraping: rotate between companies
for slug in ["amazon.com", "ebay.com", "walmart.com"]:
    # Fresh session per company
    session = str(random.randint(10000, 99999))
    proxy = get_rotating_proxy(session)
    # ... scrape company

Sentiment Analysis on Review Text

With a dataset of review text, you can run basic sentiment analysis without the OpenAI API:

def simple_sentiment_analysis(text: str) -> dict:
    """
    Basic rule-based sentiment analysis for review text.
    Returns polarity scores without requiring external APIs.
    """
    text_lower = text.lower()

    positive_words = [
        "excellent", "amazing", "great", "good", "fantastic", "wonderful",
        "love", "perfect", "best", "awesome", "outstanding", "helpful",
        "fast", "reliable", "recommend", "satisfied", "happy", "pleased",
    ]

    negative_words = [
        "terrible", "awful", "horrible", "bad", "worst", "poor", "waste",
        "scam", "fraud", "never", "disappointed", "useless", "broken",
        "refused", "ignored", "slow", "late", "rude", "dishonest", "lie",
    ]

    intensifiers = ["very", "extremely", "absolutely", "completely", "totally"]

    pos_count = 0
    neg_count = 0

    words = text_lower.split()
    for i, word in enumerate(words):
        multiplier = 2 if (i > 0 and words[i-1] in intensifiers) else 1
        if word in positive_words:
            pos_count += multiplier
        elif word in negative_words:
            neg_count += multiplier

    total = pos_count + neg_count
    if total == 0:
        sentiment = "neutral"
        score = 0.0
    else:
        score = (pos_count - neg_count) / total
        sentiment = "positive" if score > 0.2 else ("negative" if score < -0.2 else "neutral")

    return {
        "sentiment": sentiment,
        "score": round(score, 3),
        "positive_signals": pos_count,
        "negative_signals": neg_count,
    }

def batch_sentiment_analysis(conn: sqlite3.Connection, business_unit_id: str):
    """Run sentiment analysis on all reviews and update database."""
    # Add sentiment columns if they don't exist
    try:
        conn.execute("ALTER TABLE reviews ADD COLUMN sentiment TEXT")
        conn.execute("ALTER TABLE reviews ADD COLUMN sentiment_score REAL")
        conn.commit()
    except sqlite3.OperationalError:
        pass  # Columns already exist

    reviews = conn.execute(
        "SELECT id, title, text FROM reviews WHERE business_unit_id = ? AND sentiment IS NULL",
        (business_unit_id,)
    ).fetchall()

    updated = 0
    for review_id, title, text in reviews:
        combined = f"{title or ''} {text or ''}"
        result = simple_sentiment_analysis(combined)
        conn.execute(
            "UPDATE reviews SET sentiment = ?, sentiment_score = ? WHERE id = ?",
            (result["sentiment"], result["score"], review_id)
        )
        updated += 1

    conn.commit()
    print(f"Updated sentiment for {updated} reviews")

Trustpilot reviews are publicly visible, but bulk automated scraping violates their Terms of Service. For commercial applications at scale, Trustpilot offers a licensed API with official rate limits. The techniques here are appropriate for research, personal projects, and competitive analysis where you are not republishing the content.

Do not: - Republish raw review content at scale - Use scraped data to manipulate or deceive consumers - Attempt to remove or flag legitimate competitor reviews based on scraped data - Redistribute the data commercially without a Trustpilot data license

Key Takeaways