Customer Segmentation Setup

L3
ModelContextProtocolPlaywrightShopping Admin

Configure customer segmentation system in admin panel by defining demographic criteria, creating behavior-based segments, implementing targeting rules, and setting up automated marketing workflows.

Created by Fanqing Meng
2025-08-17
Content SubmissionInventory Management

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-sonnet-4-high
4
/4
138.3s
17.5
579,578
2,707
582,284
Claude
claude-sonnet-4-low
4
/4
132.8s
16.3
545,513
2,667
548,180
Z.ai
glm-4-5
4
/4
129.2s
16.5
497,070
1,854
498,924
OpenAI
gpt-5-low
4
/4
204.4s
20.3
548,232
6,215
554,447
OpenAI
gpt-5-medium
4
/4
189.7s
17.8
485,163
6,028
491,191
OpenAI
gpt-5-mini-medium
4
/4
103.3s
20.8
640,093
3,066
643,159
MoonshotAI
kimi-k2-0711
4
/4
173.2s
19.0
597,540
1,275
598,815
MoonshotAI
kimi-k2-0905
4
/4
155.8s
15.5
424,117
1,363
425,479
OpenAI
o3
4
/4
139.3s
23.3
764,199
2,017
766,216
Qwen
qwen-3-max
4
/4
474.1s
27.5
2,327,116
1,059
2,328,174
Claude
claude-sonnet-4
3
/4
262.2s
20.3
771,491
2,921
774,412
DeepSeek
deepseek-chat
3
/4
468.0s
23.0
808,574
1,188
809,761
OpenAI
gpt-5-high
3
/4
333.4s
16.8
394,407
10,960
405,367
OpenAI
gpt-oss-120b
3
/4
38.1s
10.5
120,462
921
121,382
Grok
grok-4
3
/4
122.5s
15.3
426,727
3,219
429,946
Grok
grok-code-fast-1
3
/4
63.9s
14.3
469,290
2,784
472,074
OpenAI
o4-mini
3
/4
184.1s
14.3
245,778
8,793
254,570
Qwen
qwen-3-coder-plus
3
/4
125.5s
16.5
535,865
1,495
537,360
Gemini
gemini-2-5-flash
2
/4
166.0s
49.3
1,289,120
7,658
1,296,778
Gemini
gemini-2-5-pro
2
/4
129.6s
16.8
860,164
4,207
864,371
OpenAI
gpt-5-mini-high
2
/4
108.2s
19.3
394,862
5,970
400,831
Claude
claude-opus-4-1
1
/1
--
301.3s
20.0
641,339
2,500
643,839
OpenAI
gpt-4-1
1
/4
57.0s
11.5
311,815
501
312,316
OpenAI
gpt-4-1-mini
0
/4
95.6s
28.5
646,959
1,421
648,380
OpenAI
gpt-4-1-nano
0
/4
35.6s
10.3
150,725
478
151,203
OpenAI
gpt-5-mini-low
0
/4
48.7s
11.0
127,731
1,103
128,834
OpenAI
gpt-5-nano-high
0
/4
229.5s
39.8
1,280,598
27,468
1,308,066
OpenAI
gpt-5-nano-low
0
/4
189.4s
38.0
1,430,332
12,708
1,443,039
OpenAI
gpt-5-nano-medium
0
/4
138.1s
23.5
1,107,821
13,025
1,120,846

Task State

WebArena
view WebArena environment setup for this task

Instruction

Perform customer segmentation setup and analysis in the Magento Admin panel to establish new customer groups and manage customer profiles.

Task Requirements:

  1. Access the Magento Admin panel to begin customer segmentation setup. if need to login, login with username 'admin' and password 'admin1234'

  2. Establish baseline metrics for customer groups:

    • Record the exact number shown in "records found" at the top of the grid
    • This will be your initial groups count
  3. Create a specialized customer group for European premium customers:

    • Group Name: Premium Europe
    • Tax Class: Retail Customer
    • Save the group
  4. Verify the customer group creation was successful:

    • After saving, return to Customer Groups list
    • Record the new total shown in "records found"
  5. Establish baseline metrics for all customers database:

    • Record the exact number shown in "records found" at the top of the grid
    • This will be your initial customers count
  6. Add a representative customer to the new premium group:

    • Create a new customer with the following details:
    • First Name: Isabella
    • Last Name: Romano
    • Email: isabella.romano@premium.eu
    • Associate to Website: Main Website
    • Group: The group you just created
    • Save the customer
  7. Verify the customer creation was successful:

    • After saving, return to All Customers list
    • Record the new total shown in "records found"
  8. Analyze recent customer activity patterns:

    • Navigate to Dashboard
    • Look at the "Last Orders" section
    • Record the customer name in the last row of the table
  9. Compile all your findings and output them in the following exact format:

Plaintext
<answer>
InitialGroups|count
FinalGroups|count  
InitialCustomers|count
FinalCustomers|count
LastOrderCustomer|name
</answer>

Example Output:

Plaintext
<answer>
InitialGroups|XX
FinalGroups|XX
InitialCustomers|XXX
FinalCustomers|XXX
LastOrderCustomer|XXX
</answer>


Verify

*.py
Python
import asyncio
import sys
import re
import os
import json
from pathlib import Path
from playwright.async_api import (
    async_playwright,
    TimeoutError as PlaywrightTimeoutError,
)

# 从环境变量读取 base_url(shopping_admin 会注入 http://localhost:7780/admin),默认回退到本地
BASE_URL = os.getenv("WEBARENA_BASE_URL", "http://localhost:7780/admin").rstrip("/")


def get_model_response():
    """
    Get the model's response from the MCP_MESSAGES environment variable.
    Returns the last assistant message text.
    """
    messages_path = os.getenv("MCP_MESSAGES")
    print(f"MCP_MESSAGES: {messages_path}")
    if not messages_path:
        print("Warning: MCP_MESSAGES environment variable not set", file=sys.stderr)
        return None

    try:
        with open(messages_path, "r") as f:
            messages = json.load(f)

        # Find the last assistant message
        for message in reversed(messages):
            if (
                message.get("role") == "assistant"
                and message.get("status") == "completed"
            ):
                content = message.get("content", [])
                for item in content:
                    if item.get("type") == "output_text":
                        return item.get("text", "")

        print("Warning: No assistant response found in messages", file=sys.stderr)
        return None
    except Exception as e:
        print(f"Error reading messages file: {str(e)}", file=sys.stderr)
        return None


def parse_answer_format(text):
    """
    Parse the <answer>...</answer> format from the agent's output.
    Returns a dictionary with the parsed values.
    """
    if not text:
        return None

    # Look for <answer>...</answer> pattern
    match = re.search(r"<answer>(.*?)</answer>", text, re.IGNORECASE | re.DOTALL)
    if not match:
        return None

    answer_content = match.group(1).strip()

    # Parse each line
    result = {}
    lines = answer_content.split("\n")

    if len(lines) != 5:
        print(f"Error: Expected 5 lines in answer, got {len(lines)}", file=sys.stderr)
        return None

    for line in lines:
        if "|" in line:
            key, value = line.split("|", 1)
            result[key.strip()] = value.strip()

    return result


def load_expected_answer(label_path):
    """
    Load the expected answer from label.txt file.
    Returns a dictionary with the expected values.
    """
    try:
        with open(label_path, "r") as f:
            lines = f.read().strip().split("\n")

        expected = {}
        for line in lines:
            if "|" in line:
                key, value = line.split("|", 1)
                expected[key.strip()] = value.strip()

        return expected
    except Exception as e:
        print(f"Error reading label file: {str(e)}", file=sys.stderr)
        return None


def compare_answers(model_answer, expected_answer):
    """
    Compare the model's answer with the expected answer.
    Returns True if all key information matches, False otherwise.
    """
    if not model_answer or not expected_answer:
        return False

    # Check each expected key
    mismatches = []
    for key, expected_value in expected_answer.items():
        model_value = model_answer.get(key, "")

        # Exact match for all fields
        if model_value != expected_value:
            mismatches.append(
                f"{key}: expected '{expected_value}', got '{model_value}'"
            )

    if mismatches:
        print("\n=== Answer Comparison Mismatches ===", file=sys.stderr)
        for mismatch in mismatches:
            print(f"✗ {mismatch}", file=sys.stderr)
        return False

    print("\n=== Answer Comparison ===", file=sys.stderr)
    print("✓ All key information matches the expected answer", file=sys.stderr)
    return True


async def verify() -> bool:
    """
    Verifies that the customer segmentation setup task has been completed correctly.
    First checks the model's answer against the expected label,
    then verifies the actual state in the Magento Admin.
    """
    # Get the label file path
    label_path = Path(__file__).parent / "label.txt"

    # Load expected answer
    expected_answer = load_expected_answer(label_path)
    if not expected_answer:
        print("Error: Could not load expected answer from label.txt", file=sys.stderr)
        return False

    # Get model's response from MCP_MESSAGES
    model_response = get_model_response()
    if model_response:
        print("Found model response, parsing answer format...", file=sys.stderr)
        model_answer = parse_answer_format(model_response)

        if model_answer:
            print("\n=== Model Answer Parsed ===", file=sys.stderr)
            for key, value in model_answer.items():
                print(f"{key}: {value}", file=sys.stderr)

            # Compare answers
            answer_match = compare_answers(model_answer, expected_answer)
            if not answer_match:
                print("\nModel answer does not match expected answer", file=sys.stderr)
                return False
            print("\n✓ Model answer matches expected answer", file=sys.stderr)
        else:
            print(
                "Warning: Could not parse answer format from model response",
                file=sys.stderr,
            )
            print("Will proceed with browser verification only", file=sys.stderr)
    else:
        print(
            "No model response found, proceeding with browser verification",
            file=sys.stderr,
        )

    # Browser verification for actual state
    print("\n=== Starting Browser Verification ===", file=sys.stderr)
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context()
        page = await context.new_page()

        try:
            # Navigate to Magento Admin
            print("Navigating to Magento Admin...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/", wait_until="networkidle"
            )

            # Check if already logged in, if not, login
            if "dashboard" not in page.url.lower():
                print("Logging into Magento Admin...", file=sys.stderr)
                await page.fill('input[name="login[username]"]', "admin")
                await page.fill('input[name="login[password]"]', "admin1234")
                await page.click('button:has-text("Sign in")')
                await page.wait_for_load_state("networkidle")

                if "dashboard" not in page.url.lower():
                    print("Error: Login failed", file=sys.stderr)
                    return False

            print("Successfully logged into Magento Admin", file=sys.stderr)

            # 1. Verify Customer Groups
            print("\nVerifying Customer Groups...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/customer/group/",
                wait_until="networkidle",
            )
            await page.wait_for_timeout(2000)  # Wait for grid to load

            # Check for Premium Europe group
            premium_europe_exists = (
                await page.locator("text=Premium Europe").count() > 0
            )
            if premium_europe_exists:
                print("✓ Found 'Premium Europe' customer group", file=sys.stderr)

                # Check if it has Retail Customer tax class
                # Look for Premium Europe row and check its tax class
                premium_row = page.locator('tr:has-text("Premium Europe")')
                if await premium_row.count() > 0:
                    tax_class_text = await premium_row.locator("td").nth(2).inner_text()
                    if "Retail Customer" in tax_class_text:
                        print(
                            "✓ Premium Europe has 'Retail Customer' tax class",
                            file=sys.stderr,
                        )
                    else:
                        print(
                            f"Warning: Premium Europe tax class is '{tax_class_text}'",
                            file=sys.stderr,
                        )
            else:
                print("✗ 'Premium Europe' customer group not found", file=sys.stderr)
                return False

            # Check total groups count
            records_found = page.locator("text=records found").first
            if await records_found.count() > 0:
                count_text = await records_found.inner_text()
                print(f"Customer Groups count: {count_text}", file=sys.stderr)

                # Extract number
                import re

                match = re.search(r"(\d+)\s+records found", count_text)
                if match:
                    groups_count = int(match.group(1))
                    print(f"✓ Customer groups count is {groups_count}", file=sys.stderr)

            # 2. Verify Customer
            print("\nVerifying Customer Isabella Romano...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/customer/index/",
                wait_until="networkidle",
            )
            await page.wait_for_timeout(3000)  # Wait for grid to load

            # Check total customers count
            customer_records = page.locator("text=records found").first
            if await customer_records.count() > 0:
                count_text = await customer_records.inner_text()
                print(f"Customers count: {count_text}", file=sys.stderr)

                # Extract number
                match = re.search(r"(\d+)\s+records found", count_text)
                if match:
                    customers_count = int(match.group(1))
                    print(
                        f"✓ Total customers count is {customers_count}", file=sys.stderr
                    )

                    # Verify against expected answer if available
                    if expected_answer and "FinalCustomers" in expected_answer:
                        expected_final = int(expected_answer["FinalCustomers"])
                        if customers_count == expected_final:
                            print(
                                f"✓ Customer count matches expected: {customers_count}",
                                file=sys.stderr,
                            )
                        else:
                            print(
                                f"✗ Customer count mismatch: Expected {expected_final} customers, found {customers_count}",
                                file=sys.stderr,
                            )
                            return False

            # Wait for the customer grid to load properly
            await page.wait_for_timeout(5000)
            
            # Check if Isabella Romano exists - first wait for grid to load
            grid_loaded = False
            for i in range(3):
                # Look for grid container and wait for it to populate
                grid_container = page.locator(".admin__data-grid-outer-wrap, .data-grid, table").first
                if await grid_container.count() > 0:
                    # Check if there are customer rows loaded
                    customer_rows = page.locator("td[data-column='email'], td:has-text('@')")
                    if await customer_rows.count() > 0:
                        grid_loaded = True
                        break
                await page.wait_for_timeout(2000)
            
            if not grid_loaded:
                print("✗ Customer grid failed to load properly", file=sys.stderr)
                return False
            
            # Now check if Isabella Romano exists in the loaded grid
            isabella_exists = (
                await page.locator("text=isabella.romano@premium.eu").count() > 0
            )
            
            if not isabella_exists:
                # Try searching for the customer to be more thorough
                try:
                    search_box = page.locator('input[placeholder*="Search by keyword"], input[name="search"], [data-role="search"]').first
                    if await search_box.count() > 0:
                        await search_box.clear()
                        await search_box.fill("isabella.romano@premium.eu")
                        await page.keyboard.press("Enter")
                        await page.wait_for_load_state("networkidle")
                        await page.wait_for_timeout(3000)
                        
                        # Check again after search
                        isabella_exists = (
                            await page.locator("text=isabella.romano@premium.eu").count() > 0
                        )
                        
                        # Also check for "No records found" message
                        no_records = await page.locator("text=We couldn't find any records., text=No records found").count() > 0
                        if no_records:
                            print(
                                "✗ Customer 'isabella.romano@premium.eu' not found - search returned no results",
                                file=sys.stderr,
                            )
                            return False
                except Exception as e:
                    print(f"✗ Search failed: {str(e)}", file=sys.stderr)
            
            if isabella_exists:
                print(
                    "✓ Found customer with email 'isabella.romano@premium.eu'",
                    file=sys.stderr,
                )
            else:
                print(
                    "✗ Customer 'isabella.romano@premium.eu' not found",
                    file=sys.stderr,
                )
                return False

            # 3. Verify Dashboard Last Orders
            print("\nVerifying Dashboard Last Orders...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/admin/dashboard/",
                wait_until="networkidle",
            )
            await page.wait_for_timeout(2000)

            # Check for Last Orders section
            last_orders_exists = await page.locator("text=Last Orders").count() > 0
            if last_orders_exists:
                print("✓ Found 'Last Orders' section on dashboard", file=sys.stderr)

                # Find the first customer in the table
                # Look for the table after "Last Orders" heading
                orders_table = (
                    page.locator("text=Last Orders")
                    .locator("..")
                    .locator("table")
                    .first
                )
                if await orders_table.count() > 0:
                    # Get the last row in tbody
                    last_row = orders_table.locator("tbody tr").last
                    if await last_row.count() > 0:
                        last_customer = await last_row.locator(
                            "td"
                        ).first.inner_text()
                        print(
                            f"✓ Last customer in Last Orders: {last_customer}",
                            file=sys.stderr,
                        )

                        # Verify against expected answer if available
                        if expected_answer and "LastOrderCustomer" in expected_answer:
                            if last_customer == expected_answer["LastOrderCustomer"]:
                                print(
                                    f"✓ Last Order Customer matches expected: {last_customer}",
                                    file=sys.stderr,
                                )
                            else:
                                print(
                                    f"✗ Last Order Customer mismatch: Expected '{expected_answer['LastOrderCustomer']}' but actual is '{last_customer}'",
                                    file=sys.stderr,
                                )
                                return False
            else:
                print(
                    "Warning: 'Last Orders' section not found on dashboard",
                    file=sys.stderr,
                )

            # Summary of verification - only print if we reach this point (all checks passed)
            print("\n=== Browser Verification Summary ===", file=sys.stderr)
            print("✓ Magento Admin login successful", file=sys.stderr)
            print(
                "✓ Customer group 'Premium Europe' exists with correct tax class",
                file=sys.stderr,
            )
            print("✓ Customer 'isabella.romano@premium.eu' found in system", file=sys.stderr)
            print("✓ Customer counts verified", file=sys.stderr)
            print("✓ Dashboard Last Orders section accessible", file=sys.stderr)

            return True

        except PlaywrightTimeoutError as e:
            print(f"Error: Timeout occurred - {str(e)}", file=sys.stderr)
            return False
        except Exception as e:
            print(f"Error: Unexpected error - {str(e)}", file=sys.stderr)
            return False
        finally:
            await browser.close()


def main():
    """
    Executes the verification process and exits with a status code.
    """
    result = asyncio.run(verify())
    sys.exit(0 if result else 1)


if __name__ == "__main__":
    main()