Products Sales Analysis

L3
ModelContextProtocolPlaywrightShopping Admin

Generate comprehensive sales performance reports by extracting product metrics, analyzing revenue trends, identifying top performers, evaluating inventory turnover, and creating actionable insights.

Created by Fanqing Meng
2025-08-17
Data ExtractionComparative AnalysisContent Submission

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-4-1-opus
0
/1
--
115.0s
8.0
71,720
1,140
72,860
Claude
claude-4-sonnet
0
/4
138.7s
8.5
291,568
1,153
292,721
DeepSeek
deepseek-chat
0
/4
175.9s
13.3
152,628
1,218
153,846
Gemini
gemini-2-5-pro
0
/4
132.8s
8.8
2,269,537
1,962
2,271,499
OpenAI
gpt-5
0
/4
134.3s
9.0
367,260
3,804
371,064
Grok
grok-4
0
/4
22.1s
0.8
-
-
-
MoonshotAI
k2
0
/4
130.2s
8.5
72,917
493
73,410
OpenAI
o3
0
/4
104.1s
9.0
293,430
1,997
295,427
Qwen
qwen-3-coder
0
/4
91.4s
9.3
321,643
388
322,030

Task State

WebArena
view WebArena environment setup for this task

Instruction



Verify

*.py
Python
import asyncio
import sys
import re
import os
import json
from pathlib import Path


def get_model_response():
    """
    Get the model's response from the MCP_MESSAGES environment variable.
    Returns the last assistant message text.
    """
    messages_path = os.getenv("MCP_MESSAGES")
    print(f"MCP_MESSAGES: {messages_path}")
    if not messages_path:
        print("Warning: MCP_MESSAGES environment variable not set", file=sys.stderr)
        return None

    try:
        with open(messages_path, "r") as f:
            messages = json.load(f)

        # Find the last assistant message
        for message in reversed(messages):
            if (
                message.get("role") == "assistant"
                and message.get("status") == "completed"
            ):
                content = message.get("content", [])
                for item in content:
                    if item.get("type") == "output_text":
                        return item.get("text", "")

        print("Warning: No assistant response found in messages", file=sys.stderr)
        return None
    except Exception as e:
        print(f"Error reading messages file: {str(e)}", file=sys.stderr)
        return None


def parse_answer_format(text):
    """
    Parse the <answer>...</answer> format from the agent's output.
    Returns a dictionary with the parsed values.
    """
    if not text:
        print("Error: No text provided to parse", file=sys.stderr)
        return None

    # Look for <answer>...</answer> pattern
    match = re.search(r"<answer>(.*?)</answer>", text, re.IGNORECASE | re.DOTALL)
    if not match:
        print("Error: No <answer>...</answer> tags found in response", file=sys.stderr)
        return None

    answer_content = match.group(1).strip()
    if not answer_content:
        print("Error: Empty answer content", file=sys.stderr)
        return None

    # Parse each line
    result = {}
    lines = [line.strip() for line in answer_content.split("\n") if line.strip()]

    if len(lines) != 10:
        print(f"Error: Expected 10 lines in answer, got {len(lines)}", file=sys.stderr)
        print(f"Lines found: {lines}", file=sys.stderr)
        return None

    # Expected keys for validation
    expected_keys = [
        "YogaProducts", "WH11Price", "ZeroQuantityProducts", "LowestProduct",
        "QuestLumaflexQuantity", "DashboardRevenue", "SarahMillerEmail",
        "TotalCustomers", "PendingOrders", "GraceNguyenOrderID"
    ]

    for line in lines:
        if "|" not in line:
            print(f"Error: Line missing '|' separator: {line}", file=sys.stderr)
            return None
        
        parts = line.split("|", 1)
        if len(parts) != 2:
            print(f"Error: Invalid line format: {line}", file=sys.stderr)
            return None
            
        key, value = parts[0].strip(), parts[1].strip()
        
        if not key or not value:
            print(f"Error: Empty key or value in line: {line}", file=sys.stderr)
            return None
            
        result[key] = value

    # Validate all expected keys are present
    missing_keys = set(expected_keys) - set(result.keys())
    if missing_keys:
        print(f"Error: Missing required keys: {missing_keys}", file=sys.stderr)
        return None

    return result


def load_expected_answer(label_path):
    """
    Load the expected answer from label.txt file.
    Returns a dictionary with the expected values.
    """
    try:
        with open(label_path, "r") as f:
            lines = f.read().strip().split("\n")

        expected = {}
        for line in lines:
            if "|" in line:
                key, value = line.split("|", 1)
                expected[key.strip()] = value.strip()

        return expected
    except Exception as e:
        print(f"Error reading label file: {str(e)}", file=sys.stderr)
        return None


def compare_answers(model_answer, expected_answer):
    """
    Compare the model's answer with the expected answer.
    Returns True if all key information matches, False otherwise.
    """
    if not model_answer or not expected_answer:
        return False

    # Check each expected key
    mismatches = []
    for key, expected_value in expected_answer.items():
        model_value = model_answer.get(key, "")

        # Special handling for different types of values
        if key == "LowestProduct":
            # Check if product name and quantity match (format: "Product Name:quantity")
            if ":" in expected_value and ":" in model_value:
                expected_name, expected_qty = expected_value.rsplit(":", 1)
                model_name, model_qty = model_value.rsplit(":", 1)
                if expected_name != model_name or expected_qty != model_qty:
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )
            else:
                if expected_value != model_value:
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )

        elif key in ["WH11Price", "DashboardRevenue"]:
            # For price/amount fields, normalize format
            expected_clean = expected_value.replace("$", "").replace(",", "")
            model_clean = model_value.replace("$", "").replace(",", "")
            if expected_clean != model_clean:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "SarahMillerEmail":
            # Email should match exactly
            if model_value.lower() != expected_value.lower():
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        else:
            # Exact match for other fields
            if model_value != expected_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

    if mismatches:
        print("\n=== Answer Comparison Mismatches ===", file=sys.stderr)
        for mismatch in mismatches:
            print(f"✗ {mismatch}", file=sys.stderr)
        return False

    print("\n=== Answer Comparison ===", file=sys.stderr)
    print("✓ All key information matches the expected answer", file=sys.stderr)
    return True


async def verify() -> bool:
    """
    Verifies that the products and sales analysis task has been completed correctly.
    First checks the model's answer against the expected label,
    then optionally verifies the actual state in the Magento Admin.
    """
    # Get the label file path
    label_path = Path(__file__).parent / "label.txt"

    # Load expected answer
    expected_answer = load_expected_answer(label_path)
    if not expected_answer:
        print("Error: Could not load expected answer from label.txt", file=sys.stderr)
        return False

    # Get model's response from MCP_MESSAGES
    model_response = get_model_response()
    if model_response:
        print("Found model response, parsing answer format...", file=sys.stderr)
        model_answer = parse_answer_format(model_response)

        if model_answer:
            print("\n=== Model Answer Parsed ===", file=sys.stderr)
            for key, value in model_answer.items():
                print(f"{key}: {value}", file=sys.stderr)

            # Compare answers
            answer_match = compare_answers(model_answer, expected_answer)
            if not answer_match:
                print("\nModel answer does not match expected answer", file=sys.stderr)
                return False
            print("\n✓ Model answer matches expected answer", file=sys.stderr)
            return True
        else:
            print(
                "Warning: Could not parse answer format from model response",
                file=sys.stderr,
            )
            return False
    else:
        print("No model response found", file=sys.stderr)
        return False


def main():
    """
    Executes the verification process and exits with a status code.
    """
    result = asyncio.run(verify())
    sys.exit(0 if result else 1)


if __name__ == "__main__":
    main()