Marketing Customer Analysis

L3
ModelContextProtocolPlaywrightShopping Admin

Analyze customer behavior patterns using admin analytics, segment user demographics, track purchase histories, evaluate campaign effectiveness, and generate comprehensive marketing intelligence reports.

Created by Fanqing Meng
2025-08-17
Data ExtractionComparative AnalysisContent Submission

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-sonnet-4-high
4
/4
204.3s
22.3
1,139,907
3,836
1,143,743
Claude
claude-sonnet-4-low
4
/4
195.4s
22.0
1,137,276
3,695
1,140,970
OpenAI
gpt-5-low
4
/4
415.4s
27.8
1,306,204
14,219
1,320,423
OpenAI
gpt-5-medium
4
/4
385.3s
21.5
845,115
15,783
860,898
OpenAI
gpt-5-mini-high
4
/4
191.3s
25.0
846,710
13,709
860,418
Grok
grok-4
4
/4
194.1s
19.8
856,131
4,338
860,469
Grok
grok-code-fast-1
4
/4
92.5s
23.3
1,013,627
4,440
1,018,067
Claude
claude-sonnet-4
3
/4
369.2s
27.0
1,660,816
3,963
1,664,780
MoonshotAI
kimi-k2-0711
3
/4
325.5s
27.8
1,397,929
1,864
1,399,794
MoonshotAI
kimi-k2-0905
3
/4
258.8s
24.3
1,095,860
1,898
1,097,758
Qwen
qwen-3-coder-plus
3
/4
185.1s
21.5
1,084,258
1,938
1,086,196
DeepSeek
deepseek-chat
2
/4
425.2s
28.3
1,513,122
1,993
1,515,115
Z.ai
glm-4-5
2
/4
145.8s
20.5
730,169
2,079
732,247
OpenAI
gpt-5-mini-medium
2
/4
141.8s
25.5
787,941
6,298
794,239
Claude
claude-opus-4-1
1
/1
--
522.9s
28.0
1,620,249
3,647
1,623,896
OpenAI
gpt-5-high
1
/4
703.3s
21.5
853,283
27,161
880,444
OpenAI
o4-mini
1
/4
471.6s
19.0
558,339
24,406
582,745
Qwen
qwen-3-max
1
/4
869.2s
78.0
8,071,234
2,362
8,073,596
Gemini
gemini-2-5-flash
0
/4
167.0s
45.3
1,323,233
8,711
1,331,944
Gemini
gemini-2-5-pro
0
/4
116.7s
16.8
981,059
2,334
983,392
OpenAI
gpt-4-1
0
/4
75.2s
15.3
472,715
805
473,519
OpenAI
gpt-4-1-mini
0
/4
60.4s
18.8
132,719
2,094
134,813
OpenAI
gpt-4-1-nano
0
/4
33.9s
10.8
178,147
478
178,625
OpenAI
gpt-5-mini-low
0
/4
65.5s
11.8
226,661
2,047
228,708
OpenAI
gpt-5-nano-high
0
/4
331.3s
42.8
2,619,375
37,141
2,656,516
OpenAI
gpt-5-nano-low
0
/4
204.8s
27.8
1,363,178
14,600
1,377,779
OpenAI
gpt-5-nano-medium
0
/4
166.2s
21.5
998,679
11,723
1,010,402
OpenAI
gpt-oss-120b
0
/4
27.1s
7.0
47,187
859
48,046
OpenAI
o3
0
/4
232.3s
24.0
751,441
10,555
761,997

Task State

WebArena
view WebArena environment setup for this task

Instruction

Perform a comprehensive marketing and customer analysis workflow in the Magento Admin panel to understand search behavior patterns and promotional effectiveness.

Task Requirements:

  1. First, we need to access the system to begin our comprehensive analysis: if need to login, login with username 'admin' and password 'admin1234'

  2. Let's start by analyzing customer search behavior to understand what customers are looking for: Go to Search Terms in Reports and analyze the search data:

    • Identify the TOP 2 search terms with the highest number of hits (record exact terms and hit counts)
    • Find a search term that has 0 results but still has search hits (record exact term and hit count)
    • Count the total number of search terms displayed in the report
  3. Next, we'll examine our promotional strategies to understand current marketing efforts: Navigate to Cart Price Rules and identify:

    • Find ALL rules that contain a coupon code
    • Record the exact coupon codes and the complete rule names for each
    • Count how many active rules exist in total
  4. Now let's analyze our email marketing reach and subscriber engagement: Go to Newsletter Subscribers:

    • Apply filter to show only 'Subscribed' status
    • Count the total number of subscribed users showing after filter
    • Verify whether these TWO emails appear in the subscribed list:
  5. To support our analysis, we need to create test customer profiles for different segments: Create TWO new customers with the following details:

    Customer 1:

    Customer 2:

  6. Finally, let's review overall business performance metrics from the main dashboard: Go to Dashboard and identify:

    • The names and sales quantities of the products that are both the best-selling and most expensive
    • The total revenue displayed on the dashboard
  7. Compile all your findings and must output them in the following exact format at last:

Plaintext
<answer>
Top2SearchTerms|term1:hits1,term2:hits2
ZeroResultTerm|term:hits
TotalSearchTerms|count
CouponCodes|code1:rulename1,code2:rulename2
ActiveRulesCount|count
SubscribedCount|count
EmailVerification|john.smith.xyz@gmail.com:yes/no,admin@magento.com:yes/no
TopProduct|name:quantity
TotalRevenue|amount
</answer>

Example Output:

Plaintext
<answer>
Top2SearchTerms|term1:XX,term2:XX
ZeroResultTerm|term:XX
TotalSearchTerms|XX
CouponCodes|CODE:Rule Name Here
ActiveRulesCount|X
SubscribedCount|XX
EmailVerification|john.smith.xyz@gmail.com:yes/no,admin@magento.com:yes/no
TopProduct|Product Name:XX
TotalRevenue|$XX.XX
</answer>

Success Criteria:

  • Successfully logged into Magento Admin
  • Navigated to Search Terms Report and identified top 2 terms
  • Found search term with 0 results but has hits
  • Counted total search terms in report
  • Located all Cart Price Rules with coupon codes
  • Extracted exact coupon codes and rule names
  • Counted active rules
  • Filtered Newsletter Subscribers by 'Subscribed' status
  • Counted total subscribed users
  • Verified presence of two specific email addresses
  • Created two new customers successfully
  • Found top bestselling product from dashboard
  • Identified total revenue from dashboard
  • Output answer in exact format with 9 data lines
  • Answer wrapped in <answer> tags


Verify

*.py
Python
import asyncio
import sys
import re
import os
import json
from pathlib import Path
from playwright.async_api import (
    async_playwright,
    TimeoutError as PlaywrightTimeoutError,
)

# 从环境变量读取 base_url(shopping_admin 会注入 http://localhost:7780/admin),默认回退到本地
BASE_URL = os.getenv("WEBARENA_BASE_URL", "http://localhost:7780/admin").rstrip("/")


def get_model_response():
    """
    Get the model's response from the MCP_MESSAGES environment variable.
    Returns the last assistant message text.
    """
    messages_path = os.getenv("MCP_MESSAGES")
    print(f"MCP_MESSAGES: {messages_path}")
    if not messages_path:
        print("Warning: MCP_MESSAGES environment variable not set", file=sys.stderr)
        return None

    try:
        with open(messages_path, "r") as f:
            messages = json.load(f)

        # Find the last assistant message
        for message in reversed(messages):
            if (
                message.get("role") == "assistant"
                and message.get("status") == "completed"
            ):
                content = message.get("content", [])
                for item in content:
                    if item.get("type") == "output_text":
                        return item.get("text", "")

        print("Warning: No assistant response found in messages", file=sys.stderr)
        return None
    except Exception as e:
        print(f"Error reading messages file: {str(e)}", file=sys.stderr)
        return None


def parse_answer_format(text):
    """
    Parse the new multi-line <answer>xxx</answer> format from the agent's output.
    Returns a dictionary with the parsed values.
    """
    if not text:
        return None

    # Look for <answer>...</answer> pattern
    match = re.search(r"<answer>(.*?)</answer>", text, re.IGNORECASE | re.DOTALL)
    if not match:
        return None

    answer_content = match.group(1).strip()

    # Parse each line
    result = {}
    lines = answer_content.split("\n")

    if len(lines) != 9:
        print(f"Error: Expected 9 lines in answer, got {len(lines)}", file=sys.stderr)
        return None

    for line in lines:
        if "|" in line:
            key, value = line.split("|", 1)
            result[key.strip()] = value.strip()

    return result


def load_expected_answer(label_path):
    """
    Load the expected answer from label.txt file.
    Returns a dictionary with the expected values.
    """
    try:
        with open(label_path, "r") as f:
            lines = f.read().strip().split("\n")

        expected = {}
        for line in lines:
            if "|" in line:
                key, value = line.split("|", 1)
                expected[key.strip()] = value.strip()

        return expected
    except Exception as e:
        print(f"Error reading label file: {str(e)}", file=sys.stderr)
        return None


def compare_answers(model_answer, expected_answer):
    """
    Compare the model's answer with the expected answer.
    Returns True if all key information matches, False otherwise.
    """
    if not model_answer or not expected_answer:
        return False

    # Check each expected key
    mismatches = []
    for key, expected_value in expected_answer.items():
        model_value = model_answer.get(key, "")

        # Special handling for different types of values
        if key == "Top2SearchTerms":
            # Check if both search terms are present with correct counts
            expected_terms = expected_value.split(",")
            model_terms = model_value.split(",")
            if set(expected_terms) != set(model_terms):
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "EmailVerification":
            # Check email verification status
            expected_emails = dict(
                item.split(":") for item in expected_value.split(",")
            )
            model_emails = dict(
                item.split(":") for item in model_value.split(",") if ":" in item
            )
            if expected_emails != model_emails:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "CouponCodes":
            # Check if coupon code and rule name are present
            if "H20" not in model_value or "Luma water bottle" not in model_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "TopProduct":
            # Check if product name and quantity match
            if expected_value != model_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        else:
            # Exact match for other fields
            if model_value != expected_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

    if mismatches:
        print("\n=== Answer Comparison Mismatches ===", file=sys.stderr)
        for mismatch in mismatches:
            print(f"✗ {mismatch}", file=sys.stderr)
        return False

    print("\n=== Answer Comparison ===", file=sys.stderr)
    print("✓ All key information matches the expected answer", file=sys.stderr)
    return True


async def verify() -> bool:
    """
    Verifies that the marketing analysis task has been completed correctly.
    First checks the model's answer against the expected label,
    then optionally verifies the actual state in the Magento Admin.
    """
    # Get the label file path
    label_path = Path(__file__).parent / "label.txt"

    # Load expected answer
    expected_answer = load_expected_answer(label_path)
    if not expected_answer:
        print("Error: Could not load expected answer from label.txt", file=sys.stderr)
        return False

    # Get model's response from MCP_MESSAGES
    model_response = get_model_response()
    if model_response:
        print("Found model response, parsing answer format...", file=sys.stderr)
        model_answer = parse_answer_format(model_response)

        if model_answer:
            print("\n=== Model Answer Parsed ===", file=sys.stderr)
            for key, value in model_answer.items():
                print(f"{key}: {value}", file=sys.stderr)

            # Compare answers
            answer_match = compare_answers(model_answer, expected_answer)
            if not answer_match:
                print("\nModel answer does not match expected answer", file=sys.stderr)
                return False
            print("\n✓ Model answer matches expected answer", file=sys.stderr)
        else:
            print(
                "Warning: Could not parse answer format from model response",
                file=sys.stderr,
            )
            print("Will proceed with browser verification only", file=sys.stderr)
    else:
        print(
            "No model response found, proceeding with browser verification",
            file=sys.stderr,
        )

    # Browser verification - only check customer creation (the critical task requirement)
    print("\n=== Starting Browser Verification ===", file=sys.stderr)
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context()
        page = await context.new_page()

        try:
            # Navigate to Magento Admin
            print("Navigating to Magento Admin...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/", wait_until="networkidle"
            )

            # Check if already logged in, if not, login
            if "dashboard" not in page.url.lower():
                print("Logging into Magento Admin...", file=sys.stderr)
                await page.fill('input[name="login[username]"]', "admin")
                await page.fill('input[name="login[password]"]', "admin1234")
                await page.click('button:has-text("Sign in")')
                await page.wait_for_load_state("networkidle")

                if "dashboard" not in page.url.lower():
                    print("Error: Login failed", file=sys.stderr)
                    return False

            print("Successfully logged into Magento Admin", file=sys.stderr)

            # Verify Customer Creation (the only critical check for task completion)
            print("Verifying Customer Creation...", file=sys.stderr)
            await page.goto(
                f"{BASE_URL}/customer/index/",
                wait_until="networkidle",
            )

            # Wait for the customer grid to load
            try:
                await page.wait_for_selector("table", timeout=15000)
            except PlaywrightTimeoutError:
                print("Table not found, trying to proceed anyway...", file=sys.stderr)

            # Define customer requirements
            customer1_requirements = {
                "email": "marketdata1.analysis@magento.com",
                "first_name": "Marketing1",
                "last_name": "Analy",
                "group": "General",
                "website": "Main Website"
            }
            
            customer2_requirements = {
                "email": "analytics1.report@magento.com", 
                "first_name": "Analytics1",
                "last_name": "Report",
                "group": "Wholesale",
                "website": "Main Website"
            }

            async def check_customer_exists(customer_requirements):
                """Check if a customer exists by looking for their details in the customer grid"""
                email = customer_requirements["email"]
                first_name = customer_requirements["first_name"]
                last_name = customer_requirements["last_name"]
                group = customer_requirements["group"]
                
                # First check if email exists in current page without searching
                email_found = await page.locator(f"*:has-text('{email}')").count() > 0
                
                if not email_found:
                    # Try searching for the customer
                    try:
                        search_box = page.locator('input[placeholder*="Search by keyword"]').first
                        await search_box.clear()
                        await search_box.fill(email)
                        await page.keyboard.press("Enter")
                        await page.wait_for_load_state("networkidle")
                        await page.wait_for_timeout(2000)
                        
                        # Check again after search
                        email_found = await page.locator(f"*:has-text('{email}')").count() > 0
                    except:
                        pass
                
                if not email_found:
                    return False, f"Email {email} not found"
                
                # More precise validation: find the row containing this customer's email
                # Then check if the required fields are in the same row or nearby context
                try:
                    # Find the specific row containing this email
                    email_cell = page.locator(f"td:has-text('{email}')").first
                    if await email_cell.count() == 0:
                        # Fall back to broader search
                        email_cell = page.locator(f"*:has-text('{email}')").first
                    
                    # Get the parent row or container
                    row = email_cell.locator("xpath=ancestor::tr[1]")
                    if await row.count() == 0:
                        # Fall back to getting nearby content
                        row = email_cell.locator("xpath=..")
                    
                    # Get the text content of the row/container
                    row_text = await row.text_content() if await row.count() > 0 else ""
                    
                    # If we can't get a specific row, fall back to broader validation
                    if not row_text or len(row_text.strip()) < 10:
                        # Search in nearby cells or elements
                        nearby_elements = page.locator(f"*:has-text('{email}')").locator("xpath=../following-sibling::* | xpath=../preceding-sibling::*")
                        nearby_count = await nearby_elements.count()
                        nearby_text = ""
                        for i in range(min(nearby_count, 5)):  # Check up to 5 nearby elements
                            element_text = await nearby_elements.nth(i).text_content()
                            if element_text:
                                nearby_text += element_text + " "
                        row_text = row_text + " " + nearby_text
                    
                    # Check if required fields are present in the row/context
                    required_fields = [first_name, last_name, group]
                    found_fields = [email]  # Email is already confirmed
                    missing_fields = []
                    
                    for field in required_fields:
                        if field in row_text:
                            found_fields.append(field)
                        else:
                            missing_fields.append(field)
                    
                    if missing_fields:
                        return False, f"Customer found but missing fields in row context: {', '.join(missing_fields)}. Row text: {row_text[:100]}..."
                    
                    return True, f"Customer verified with all required fields: {', '.join(found_fields)}"
                    
                except Exception as e:
                    # Fall back to original simple validation
                    page_content = await page.content()
                    required_fields = [first_name, last_name, group, email]
                    found_fields = []
                    missing_fields = []
                    
                    for field in required_fields:
                        if field in page_content:
                            found_fields.append(field)
                        else:
                            missing_fields.append(field)
                    
                    if missing_fields:
                        return False, f"Customer found but missing fields (fallback): {', '.join(missing_fields)}"
                    
                    return True, f"Customer verified with all required fields (fallback): {', '.join(found_fields)}"

            # Check both customers
            customer1_exists, customer1_msg = await check_customer_exists(customer1_requirements)
            customer2_exists, customer2_msg = await check_customer_exists(customer2_requirements)

            print(
                f"Customer 1 (marketdata1.analysis@magento.com): {'Found' if customer1_exists else 'Not Found'} - {customer1_msg}",
                file=sys.stderr,
            )
            print(
                f"Customer 2 (analytics1.report@magento.com): {'Found' if customer2_exists else 'Not Found'} - {customer2_msg}",
                file=sys.stderr,
            )

            if not (customer1_exists and customer2_exists):
                print("Error: Required customers were not found in the system", file=sys.stderr)
                return False

            print("✓ Both required customers found in the system", file=sys.stderr)
            return True

        except PlaywrightTimeoutError as e:
            print(f"Error: Timeout occurred - {str(e)}", file=sys.stderr)
            return False
        except Exception as e:
            print(f"Error: Unexpected error - {str(e)}", file=sys.stderr)
            return False
        finally:
            await browser.close()


def main():
    """
    Executes the verification process and exits with a status code.
    """
    result = asyncio.run(verify())
    sys.exit(0 if result else 1)


if __name__ == "__main__":
    main()