Marketing Customer Analysis

L3
ModelContextProtocolPlaywrightShopping Admin

Analyze customer behavior patterns using admin analytics, segment user demographics, track purchase histories, evaluate campaign effectiveness, and generate comprehensive marketing intelligence reports.

Created by Fanqing Meng
2025-08-17
Data ExtractionComparative AnalysisContent Submission

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
OpenAI
gpt-5
4
/4
415.4s
27.8
1,306,204
14,219
1,320,423
Claude
claude-4-sonnet
3
/4
369.2s
27.0
1,660,816
3,963
1,664,780
MoonshotAI
k2
3
/4
325.5s
27.8
1,397,929
1,864
1,399,794
DeepSeek
deepseek-chat
2
/4
446.6s
28.8
1,488,858
2,085
1,490,943
Claude
claude-4-1-opus
1
/1
--
522.9s
28.0
1,620,249
3,647
1,623,896
Grok
grok-4
1
/4
84.6s
7.3
-
-
-
Gemini
gemini-2-5-pro
0
/4
116.7s
16.8
981,059
2,334
983,392
OpenAI
o3
0
/4
232.3s
24.0
751,441
10,555
761,997
Qwen
qwen-3-coder
0
/4
252.2s
29.3
1,586,584
2,065
1,588,649

Task State

WebArena
view WebArena environment setup for this task

Instruction



Verify

*.py
Python
import asyncio
import sys
import re
import os
import json
from pathlib import Path
from playwright.async_api import (
    async_playwright,
    TimeoutError as PlaywrightTimeoutError,
)

# 从环境变量读取 base_url(shopping_admin 会注入 http://localhost:7780/admin),默认回退到本地
BASE_URL = os.getenv("WEBARENA_BASE_URL", "http://localhost:7780/admin").rstrip("/")


def get_model_response():
    """
    Get the model's response from the MCP_MESSAGES environment variable.
    Returns the last assistant message text.
    """
    messages_path = os.getenv("MCP_MESSAGES")
    print(f"MCP_MESSAGES: {messages_path}")
    if not messages_path:
        print("Warning: MCP_MESSAGES environment variable not set", file=sys.stderr)
        return None

    try:
        with open(messages_path, "r") as f:
            messages = json.load(f)

        # Find the last assistant message
        for message in reversed(messages):
            if (
                message.get("role") == "assistant"
                and message.get("status") == "completed"
            ):
                content = message.get("content", [])
                for item in content:
                    if item.get("type") == "output_text":
                        return item.get("text", "")

        print("Warning: No assistant response found in messages", file=sys.stderr)
        return None
    except Exception as e:
        print(f"Error reading messages file: {str(e)}", file=sys.stderr)
        return None


def parse_answer_format(text):
    """
    Parse the new multi-line <answer>xxx</answer> format from the agent's output.
    Returns a dictionary with the parsed values.
    """
    if not text:
        return None

    # Look for <answer>...</answer> pattern
    match = re.search(r"<answer>(.*?)</answer>", text, re.IGNORECASE | re.DOTALL)
    if not match:
        return None

    answer_content = match.group(1).strip()

    # Parse each line
    result = {}
    lines = answer_content.split("\n")

    if len(lines) != 9:
        print(f"Error: Expected 9 lines in answer, got {len(lines)}", file=sys.stderr)
        return None

    for line in lines:
        if "|" in line:
            key, value = line.split("|", 1)
            result[key.strip()] = value.strip()

    return result


def load_expected_answer(label_path):
    """
    Load the expected answer from label.txt file.
    Returns a dictionary with the expected values.
    """
    try:
        with open(label_path, "r") as f:
            lines = f.read().strip().split("\n")

        expected = {}
        for line in lines:
            if "|" in line:
                key, value = line.split("|", 1)
                expected[key.strip()] = value.strip()

        return expected
    except Exception as e:
        print(f"Error reading label file: {str(e)}", file=sys.stderr)
        return None


def compare_answers(model_answer, expected_answer):
    """
    Compare the model's answer with the expected answer.
    Returns True if all key information matches, False otherwise.
    """
    if not model_answer or not expected_answer:
        return False

    # Check each expected key
    mismatches = []
    for key, expected_value in expected_answer.items():
        model_value = model_answer.get(key, "")

        # Special handling for different types of values
        if key == "Top2SearchTerms":
            # Check if both search terms are present with correct counts
            expected_terms = expected_value.split(",")
            model_terms = model_value.split(",")
            if set(expected_terms) != set(model_terms):
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "EmailVerification":
            # Check email verification status
            expected_emails = dict(
                item.split(":") for item in expected_value.split(",")
            )
            model_emails = dict(
                item.split(":") for item in model_value.split(",") if ":" in item
            )
            if expected_emails != model_emails:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "CouponCodes":
            # Check if coupon code and rule name are present
            if "H20" not in model_value or "Luma water bottle" not in model_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "TopProduct":
            # Check if product name and quantity match
            if expected_value != model_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        else:
            # Exact match for other fields
            if model_value != expected_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

    if mismatches:
        print("\n=== Answer Comparison Mismatches ===", file=sys.stderr)
        for mismatch in mismatches:
            print(f"✗ {mismatch}", file=sys.stderr)
        return False

    print("\n=== Answer Comparison ===", file=sys.stderr)
    print("✓ All key information matches the expected answer", file=sys.stderr)
    return True


async def verify() -> bool:
    """
    Verifies that the marketing analysis task has been completed correctly.
    First checks the model's answer against the expected label,
    then optionally verifies the actual state in the Magento Admin.
    """
    # Get the label file path
    label_path = Path(__file__).parent / "label.txt"

    # Load expected answer
    expected_answer = load_expected_answer(label_path)
    if not expected_answer:
        print("Error: Could not load expected answer from label.txt", file=sys.stderr)
        return False

    # Get model's response from MCP_MESSAGES
    model_response = get_model_response()
    if model_response:
        print("Found model response, parsing answer format...", file=sys.stderr)
        model_answer = parse_answer_format(model_response)

        if model_answer:
            print("\n=== Model Answer Parsed ===", file=sys.stderr)
            for key, value in model_answer.items():
                print(f"{key}: {value}", file=sys.stderr)

            # Compare answers
            answer_match = compare_answers(model_answer, expected_answer)
            if not answer_match:
                print("\nModel answer does not match expected answer", file=sys.stderr)
                return False
            print("\n✓ Model answer matches expected answer", file=sys.stderr)
        else:
            print(
                "Warning: Could not parse answer format from model response",
                file=sys.stderr,
            )
            print("Will proceed with browser verification only", file=sys.stderr)
    else:
        print(
            "No model response found, proceeding with browser verification",
            file=sys.stderr,
        )

    # Browser verification - only check customer creation (the critical task requirement)
    print("\n=== Starting Browser Verification ===", file=sys.stderr)
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context()
        page = await context.new_page()

        try:
            # Navigate to Magento Admin
            print("Navigating to Magento Admin...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/", wait_until="networkidle"
            )

            # Check if already logged in, if not, login
            if "dashboard" not in page.url.lower():
                print("Logging into Magento Admin...", file=sys.stderr)
                await page.fill('input[name="login[username]"]', "admin")
                await page.fill('input[name="login[password]"]', "admin1234")
                await page.click('button:has-text("Sign in")')
                await page.wait_for_load_state("networkidle")

                if "dashboard" not in page.url.lower():
                    print("Error: Login failed", file=sys.stderr)
                    return False

            print("Successfully logged into Magento Admin", file=sys.stderr)

            # Verify Customer Creation (the only critical check for task completion)
            print("Verifying Customer Creation...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/customer/index/",
                wait_until="networkidle",
            )

            # Wait for the customer grid to load
            try:
                await page.wait_for_selector("table", timeout=15000)
            except PlaywrightTimeoutError:
                print("Table not found, trying to proceed anyway...", file=sys.stderr)

            # Define customer requirements
            customer1_requirements = {
                "email": "marketdata1.analysis@magento.com",
                "first_name": "Marketing1",
                "last_name": "Analy",
                "group": "General",
                "website": "Main Website"
            }
            
            customer2_requirements = {
                "email": "analytics1.report@magento.com", 
                "first_name": "Analytics1",
                "last_name": "Report",
                "group": "Wholesale",
                "website": "Main Website"
            }

            async def check_customer_exists(customer_requirements):
                """Check if a customer exists by looking for their details in the customer grid"""
                email = customer_requirements["email"]
                first_name = customer_requirements["first_name"]
                last_name = customer_requirements["last_name"]
                group = customer_requirements["group"]
                
                # First check if email exists in current page without searching
                email_found = await page.locator(f"*:has-text('{email}')").count() > 0
                
                if not email_found:
                    # Try searching for the customer
                    try:
                        search_box = page.locator('input[placeholder*="Search by keyword"]').first
                        await search_box.clear()
                        await search_box.fill(email)
                        await page.keyboard.press("Enter")
                        await page.wait_for_load_state("networkidle")
                        await page.wait_for_timeout(2000)
                        
                        # Check again after search
                        email_found = await page.locator(f"*:has-text('{email}')").count() > 0
                    except:
                        pass
                
                if not email_found:
                    return False, f"Email {email} not found"
                
                # More precise validation: find the row containing this customer's email
                # Then check if the required fields are in the same row or nearby context
                try:
                    # Find the specific row containing this email
                    email_cell = page.locator(f"td:has-text('{email}')").first
                    if await email_cell.count() == 0:
                        # Fall back to broader search
                        email_cell = page.locator(f"*:has-text('{email}')").first
                    
                    # Get the parent row or container
                    row = email_cell.locator("xpath=ancestor::tr[1]")
                    if await row.count() == 0:
                        # Fall back to getting nearby content
                        row = email_cell.locator("xpath=..")
                    
                    # Get the text content of the row/container
                    row_text = await row.text_content() if await row.count() > 0 else ""
                    
                    # If we can't get a specific row, fall back to broader validation
                    if not row_text or len(row_text.strip()) < 10:
                        # Search in nearby cells or elements
                        nearby_elements = page.locator(f"*:has-text('{email}')").locator("xpath=../following-sibling::* | xpath=../preceding-sibling::*")
                        nearby_count = await nearby_elements.count()
                        nearby_text = ""
                        for i in range(min(nearby_count, 5)):  # Check up to 5 nearby elements
                            element_text = await nearby_elements.nth(i).text_content()
                            if element_text:
                                nearby_text += element_text + " "
                        row_text = row_text + " " + nearby_text
                    
                    # Check if required fields are present in the row/context
                    required_fields = [first_name, last_name, group]
                    found_fields = [email]  # Email is already confirmed
                    missing_fields = []
                    
                    for field in required_fields:
                        if field in row_text:
                            found_fields.append(field)
                        else:
                            missing_fields.append(field)
                    
                    if missing_fields:
                        return False, f"Customer found but missing fields in row context: {', '.join(missing_fields)}. Row text: {row_text[:100]}..."
                    
                    return True, f"Customer verified with all required fields: {', '.join(found_fields)}"
                    
                except Exception as e:
                    # Fall back to original simple validation
                    page_content = await page.content()
                    required_fields = [first_name, last_name, group, email]
                    found_fields = []
                    missing_fields = []
                    
                    for field in required_fields:
                        if field in page_content:
                            found_fields.append(field)
                        else:
                            missing_fields.append(field)
                    
                    if missing_fields:
                        return False, f"Customer found but missing fields (fallback): {', '.join(missing_fields)}"
                    
                    return True, f"Customer verified with all required fields (fallback): {', '.join(found_fields)}"

            # Check both customers
            customer1_exists, customer1_msg = await check_customer_exists(customer1_requirements)
            customer2_exists, customer2_msg = await check_customer_exists(customer2_requirements)

            print(
                f"Customer 1 (marketdata1.analysis@magento.com): {'Found' if customer1_exists else 'Not Found'} - {customer1_msg}",
                file=sys.stderr,
            )
            print(
                f"Customer 2 (analytics1.report@magento.com): {'Found' if customer2_exists else 'Not Found'} - {customer2_msg}",
                file=sys.stderr,
            )

            if not (customer1_exists and customer2_exists):
                print("Error: Required customers were not found in the system", file=sys.stderr)
                return False

            print("✓ Both required customers found in the system", file=sys.stderr)
            return True

        except PlaywrightTimeoutError as e:
            print(f"Error: Timeout occurred - {str(e)}", file=sys.stderr)
            return False
        except Exception as e:
            print(f"Error: Unexpected error - {str(e)}", file=sys.stderr)
            return False
        finally:
            await browser.close()


def main():
    """
    Executes the verification process and exits with a status code.
    """
    result = asyncio.run(verify())
    sys.exit(0 if result else 1)


if __name__ == "__main__":
    main()