Customer Segmentation Setup

L3
ModelContextProtocolPlaywrightShopping Admin

Configure customer segmentation system in admin panel by defining demographic criteria, creating behavior-based segments, implementing targeting rules, and setting up automated marketing workflows.

Created by Fanqing Meng
2025-08-17
Content SubmissionInventory Management

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
OpenAI
gpt-5
4
/4
204.4s
20.3
548,232
6,215
554,447
MoonshotAI
k2
4
/4
173.2s
19.0
597,540
1,275
598,815
OpenAI
o3
4
/4
139.3s
23.3
764,199
2,017
766,216
Claude
claude-4-sonnet
3
/4
262.2s
20.3
771,491
2,921
774,412
DeepSeek
deepseek-chat
3
/4
468.0s
23.0
808,574
1,188
809,761
Qwen
qwen-3-coder
3
/4
137.7s
20.5
713,508
1,509
715,017
Gemini
gemini-2-5-pro
2
/4
129.6s
16.8
860,164
4,207
864,371
Claude
claude-4-1-opus
1
/1
--
301.3s
20.0
641,339
2,500
643,839
Grok
grok-4
0
/4
24.1s
1.0
-
-
-

Task State

WebArena
view WebArena environment setup for this task

Instruction



Verify

*.py
Python
import asyncio
import sys
import re
import os
import json
from pathlib import Path
from playwright.async_api import (
    async_playwright,
    TimeoutError as PlaywrightTimeoutError,
)

# 从环境变量读取 base_url(shopping_admin 会注入 http://localhost:7780/admin),默认回退到本地
BASE_URL = os.getenv("WEBARENA_BASE_URL", "http://localhost:7780/admin").rstrip("/")


def get_model_response():
    """
    Get the model's response from the MCP_MESSAGES environment variable.
    Returns the last assistant message text.
    """
    messages_path = os.getenv("MCP_MESSAGES")
    print(f"MCP_MESSAGES: {messages_path}")
    if not messages_path:
        print("Warning: MCP_MESSAGES environment variable not set", file=sys.stderr)
        return None

    try:
        with open(messages_path, "r") as f:
            messages = json.load(f)

        # Find the last assistant message
        for message in reversed(messages):
            if (
                message.get("role") == "assistant"
                and message.get("status") == "completed"
            ):
                content = message.get("content", [])
                for item in content:
                    if item.get("type") == "output_text":
                        return item.get("text", "")

        print("Warning: No assistant response found in messages", file=sys.stderr)
        return None
    except Exception as e:
        print(f"Error reading messages file: {str(e)}", file=sys.stderr)
        return None


def parse_answer_format(text):
    """
    Parse the <answer>...</answer> format from the agent's output.
    Returns a dictionary with the parsed values.
    """
    if not text:
        return None

    # Look for <answer>...</answer> pattern
    match = re.search(r"<answer>(.*?)</answer>", text, re.IGNORECASE | re.DOTALL)
    if not match:
        return None

    answer_content = match.group(1).strip()

    # Parse each line
    result = {}
    lines = answer_content.split("\n")

    if len(lines) != 5:
        print(f"Error: Expected 5 lines in answer, got {len(lines)}", file=sys.stderr)
        return None

    for line in lines:
        if "|" in line:
            key, value = line.split("|", 1)
            result[key.strip()] = value.strip()

    return result


def load_expected_answer(label_path):
    """
    Load the expected answer from label.txt file.
    Returns a dictionary with the expected values.
    """
    try:
        with open(label_path, "r") as f:
            lines = f.read().strip().split("\n")

        expected = {}
        for line in lines:
            if "|" in line:
                key, value = line.split("|", 1)
                expected[key.strip()] = value.strip()

        return expected
    except Exception as e:
        print(f"Error reading label file: {str(e)}", file=sys.stderr)
        return None


def compare_answers(model_answer, expected_answer):
    """
    Compare the model's answer with the expected answer.
    Returns True if all key information matches, False otherwise.
    """
    if not model_answer or not expected_answer:
        return False

    # Check each expected key
    mismatches = []
    for key, expected_value in expected_answer.items():
        model_value = model_answer.get(key, "")

        # Exact match for all fields
        if model_value != expected_value:
            mismatches.append(
                f"{key}: expected '{expected_value}', got '{model_value}'"
            )

    if mismatches:
        print("\n=== Answer Comparison Mismatches ===", file=sys.stderr)
        for mismatch in mismatches:
            print(f"✗ {mismatch}", file=sys.stderr)
        return False

    print("\n=== Answer Comparison ===", file=sys.stderr)
    print("✓ All key information matches the expected answer", file=sys.stderr)
    return True


async def verify() -> bool:
    """
    Verifies that the customer segmentation setup task has been completed correctly.
    First checks the model's answer against the expected label,
    then verifies the actual state in the Magento Admin.
    """
    # Get the label file path
    label_path = Path(__file__).parent / "label.txt"

    # Load expected answer
    expected_answer = load_expected_answer(label_path)
    if not expected_answer:
        print("Error: Could not load expected answer from label.txt", file=sys.stderr)
        return False

    # Get model's response from MCP_MESSAGES
    model_response = get_model_response()
    if model_response:
        print("Found model response, parsing answer format...", file=sys.stderr)
        model_answer = parse_answer_format(model_response)

        if model_answer:
            print("\n=== Model Answer Parsed ===", file=sys.stderr)
            for key, value in model_answer.items():
                print(f"{key}: {value}", file=sys.stderr)

            # Compare answers
            answer_match = compare_answers(model_answer, expected_answer)
            if not answer_match:
                print("\nModel answer does not match expected answer", file=sys.stderr)
                return False
            print("\n✓ Model answer matches expected answer", file=sys.stderr)
        else:
            print(
                "Warning: Could not parse answer format from model response",
                file=sys.stderr,
            )
            print("Will proceed with browser verification only", file=sys.stderr)
    else:
        print(
            "No model response found, proceeding with browser verification",
            file=sys.stderr,
        )

    # Browser verification for actual state
    print("\n=== Starting Browser Verification ===", file=sys.stderr)
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context()
        page = await context.new_page()

        try:
            # Navigate to Magento Admin
            print("Navigating to Magento Admin...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/", wait_until="networkidle"
            )

            # Check if already logged in, if not, login
            if "dashboard" not in page.url.lower():
                print("Logging into Magento Admin...", file=sys.stderr)
                await page.fill('input[name="login[username]"]', "admin")
                await page.fill('input[name="login[password]"]', "admin1234")
                await page.click('button:has-text("Sign in")')
                await page.wait_for_load_state("networkidle")

                if "dashboard" not in page.url.lower():
                    print("Error: Login failed", file=sys.stderr)
                    return False

            print("Successfully logged into Magento Admin", file=sys.stderr)

            # 1. Verify Customer Groups
            print("\nVerifying Customer Groups...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/customer/group/",
                wait_until="networkidle",
            )
            await page.wait_for_timeout(2000)  # Wait for grid to load

            # Check for Premium Europe group
            premium_europe_exists = (
                await page.locator("text=Premium Europe").count() > 0
            )
            if premium_europe_exists:
                print("✓ Found 'Premium Europe' customer group", file=sys.stderr)

                # Check if it has Retail Customer tax class
                # Look for Premium Europe row and check its tax class
                premium_row = page.locator('tr:has-text("Premium Europe")')
                if await premium_row.count() > 0:
                    tax_class_text = await premium_row.locator("td").nth(2).inner_text()
                    if "Retail Customer" in tax_class_text:
                        print(
                            "✓ Premium Europe has 'Retail Customer' tax class",
                            file=sys.stderr,
                        )
                    else:
                        print(
                            f"Warning: Premium Europe tax class is '{tax_class_text}'",
                            file=sys.stderr,
                        )
            else:
                print("✗ 'Premium Europe' customer group not found", file=sys.stderr)
                return False

            # Check total groups count
            records_found = page.locator("text=records found").first
            if await records_found.count() > 0:
                count_text = await records_found.inner_text()
                print(f"Customer Groups count: {count_text}", file=sys.stderr)

                # Extract number
                import re

                match = re.search(r"(\d+)\s+records found", count_text)
                if match:
                    groups_count = int(match.group(1))
                    print(f"✓ Customer groups count is {groups_count}", file=sys.stderr)

            # 2. Verify Customer
            print("\nVerifying Customer Isabella Romano...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/customer/index/",
                wait_until="networkidle",
            )
            await page.wait_for_timeout(3000)  # Wait for grid to load

            # Check total customers count
            customer_records = page.locator("text=records found").first
            if await customer_records.count() > 0:
                count_text = await customer_records.inner_text()
                print(f"Customers count: {count_text}", file=sys.stderr)

                # Extract number
                match = re.search(r"(\d+)\s+records found", count_text)
                if match:
                    customers_count = int(match.group(1))
                    print(
                        f"✓ Total customers count is {customers_count}", file=sys.stderr
                    )

                    # Verify against expected answer if available
                    if expected_answer and "FinalCustomers" in expected_answer:
                        expected_final = int(expected_answer["FinalCustomers"])
                        if customers_count == expected_final:
                            print(
                                f"✓ Customer count matches expected: {customers_count}",
                                file=sys.stderr,
                            )
                        else:
                            print(
                                f"✗ Customer count mismatch: Expected {expected_final} customers, found {customers_count}",
                                file=sys.stderr,
                            )
                            return False

            # Wait for the customer grid to load properly
            await page.wait_for_timeout(5000)
            
            # Check if Isabella Romano exists - first wait for grid to load
            grid_loaded = False
            for i in range(3):
                # Look for grid container and wait for it to populate
                grid_container = page.locator(".admin__data-grid-outer-wrap, .data-grid, table").first
                if await grid_container.count() > 0:
                    # Check if there are customer rows loaded
                    customer_rows = page.locator("td[data-column='email'], td:has-text('@')")
                    if await customer_rows.count() > 0:
                        grid_loaded = True
                        break
                await page.wait_for_timeout(2000)
            
            if not grid_loaded:
                print("✗ Customer grid failed to load properly", file=sys.stderr)
                return False
            
            # Now check if Isabella Romano exists in the loaded grid
            isabella_exists = (
                await page.locator("text=isabella.romano@premium.eu").count() > 0
            )
            
            if not isabella_exists:
                # Try searching for the customer to be more thorough
                try:
                    search_box = page.locator('input[placeholder*="Search by keyword"], input[name="search"], [data-role="search"]').first
                    if await search_box.count() > 0:
                        await search_box.clear()
                        await search_box.fill("isabella.romano@premium.eu")
                        await page.keyboard.press("Enter")
                        await page.wait_for_load_state("networkidle")
                        await page.wait_for_timeout(3000)
                        
                        # Check again after search
                        isabella_exists = (
                            await page.locator("text=isabella.romano@premium.eu").count() > 0
                        )
                        
                        # Also check for "No records found" message
                        no_records = await page.locator("text=We couldn't find any records., text=No records found").count() > 0
                        if no_records:
                            print(
                                "✗ Customer 'isabella.romano@premium.eu' not found - search returned no results",
                                file=sys.stderr,
                            )
                            return False
                except Exception as e:
                    print(f"✗ Search failed: {str(e)}", file=sys.stderr)
            
            if isabella_exists:
                print(
                    "✓ Found customer with email 'isabella.romano@premium.eu'",
                    file=sys.stderr,
                )
            else:
                print(
                    "✗ Customer 'isabella.romano@premium.eu' not found",
                    file=sys.stderr,
                )
                return False

            # 3. Verify Dashboard Last Orders
            print("\nVerifying Dashboard Last Orders...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/admin/dashboard/",
                wait_until="networkidle",
            )
            await page.wait_for_timeout(2000)

            # Check for Last Orders section
            last_orders_exists = await page.locator("text=Last Orders").count() > 0
            if last_orders_exists:
                print("✓ Found 'Last Orders' section on dashboard", file=sys.stderr)

                # Find the first customer in the table
                # Look for the table after "Last Orders" heading
                orders_table = (
                    page.locator("text=Last Orders")
                    .locator("..")
                    .locator("table")
                    .first
                )
                if await orders_table.count() > 0:
                    # Get the last row in tbody
                    last_row = orders_table.locator("tbody tr").last
                    if await last_row.count() > 0:
                        last_customer = await last_row.locator(
                            "td"
                        ).first.inner_text()
                        print(
                            f"✓ Last customer in Last Orders: {last_customer}",
                            file=sys.stderr,
                        )

                        # Verify against expected answer if available
                        if expected_answer and "LastOrderCustomer" in expected_answer:
                            if last_customer == expected_answer["LastOrderCustomer"]:
                                print(
                                    f"✓ Last Order Customer matches expected: {last_customer}",
                                    file=sys.stderr,
                                )
                            else:
                                print(
                                    f"✗ Last Order Customer mismatch: Expected '{expected_answer['LastOrderCustomer']}' but actual is '{last_customer}'",
                                    file=sys.stderr,
                                )
                                return False
            else:
                print(
                    "Warning: 'Last Orders' section not found on dashboard",
                    file=sys.stderr,
                )

            # Summary of verification - only print if we reach this point (all checks passed)
            print("\n=== Browser Verification Summary ===", file=sys.stderr)
            print("✓ Magento Admin login successful", file=sys.stderr)
            print(
                "✓ Customer group 'Premium Europe' exists with correct tax class",
                file=sys.stderr,
            )
            print("✓ Customer 'isabella.romano@premium.eu' found in system", file=sys.stderr)
            print("✓ Customer counts verified", file=sys.stderr)
            print("✓ Dashboard Last Orders section accessible", file=sys.stderr)

            return True

        except PlaywrightTimeoutError as e:
            print(f"Error: Timeout occurred - {str(e)}", file=sys.stderr)
            return False
        except Exception as e:
            print(f"Error: Unexpected error - {str(e)}", file=sys.stderr)
            return False
        finally:
            await browser.close()


def main():
    """
    Executes the verification process and exits with a status code.
    """
    result = asyncio.run(verify())
    sys.exit(0 if result else 1)


if __name__ == "__main__":
    main()