Sales Inventory Analysis

L3
ModelContextProtocolPlaywrightShopping Admin

Analyze sales patterns and inventory levels to optimize stock management, identify slow-moving items, predict demand trends, and generate restocking recommendations.

Created by Fanqing Meng
2025-08-17
Data ExtractionComparative AnalysisInventory Management

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-4-1-opus
0
/1
--
166.6s
9.0
426,336
1,046
427,382
Claude
claude-4-sonnet
0
/4
140.9s
9.0
429,762
1,134
430,896
DeepSeek
deepseek-chat
0
/4
158.2s
10.8
150,338
1,196
151,534
Gemini
gemini-2-5-pro
0
/4
100.2s
9.5
2,313,768
1,644
2,315,413
OpenAI
gpt-5
0
/4
150.1s
10.0
546,918
4,202
551,121
Grok
grok-4
0
/4
24.1s
0.5
-
-
-
MoonshotAI
k2
0
/4
138.1s
9.0
96,202
527
96,729
OpenAI
o3
0
/4
128.7s
10.5
531,636
1,618
533,254
Qwen
qwen-3-coder
0
/4
116.3s
11.5
545,833
679
546,512

Task State

WebArena
view WebArena environment setup for this task

Instruction



Verify

*.py
Python
import asyncio
import sys
import re
import os
import json
from pathlib import Path


def get_model_response():
    """
    Get the model's response from the MCP_MESSAGES environment variable.
    Returns the last assistant message text.
    """
    messages_path = os.getenv("MCP_MESSAGES")
    print(f"MCP_MESSAGES: {messages_path}")
    if not messages_path:
        print("Warning: MCP_MESSAGES environment variable not set", file=sys.stderr)
        return None

    try:
        with open(messages_path, "r") as f:
            messages = json.load(f)

        # Find the last assistant message with type='message', status='completed'
        for message in reversed(messages):
            if (
                message.get("role") == "assistant"
                and message.get("status") == "completed"
                and message.get("type") == "message"
            ):
                content = message.get("content", [])
                for item in content:
                    # Check for both 'text' and 'output_text' types
                    if item.get("type") in ["text", "output_text"]:
                        return item.get("text", "")

        print("Warning: No assistant response found in messages", file=sys.stderr)
        return None
    except Exception as e:
        print(f"Error reading messages file: {str(e)}", file=sys.stderr)
        return None


def parse_answer_format(text):
    """
    Parse the <answer>...</answer> format from the agent's output.
    Returns a dictionary with the parsed values.
    """
    if not text:
        print("ERROR: No text provided to parse", file=sys.stderr)
        return None

    # Look for <answer>...</answer> pattern
    match = re.search(r"<answer>(.*?)</answer>", text, re.IGNORECASE | re.DOTALL)
    if not match:
        print("ERROR: No <answer>...</answer> tags found in the response", file=sys.stderr)
        print("Response text preview (first 200 chars):", text[:200], file=sys.stderr)
        return None

    answer_content = match.group(1).strip()
    print(f"Found answer content with {len(answer_content)} characters", file=sys.stderr)

    # Parse each line
    result = {}
    lines = answer_content.split("\n")
    
    # Expected keys for this task
    expected_keys = [
        "SpriteProducts", "Quantity100Products", "WS12Info", "PendingOrders",
        "GraceOrderID", "HighestOrderInfo", "CheapProduct", "OvernightDufflePrice",
        "HollisterPosition", "CostelloCustomers", "SarahMillerInfo", 
        "PaidInvoices", "Invoice002BillTo"
    ]

    if len(lines) != 13:
        print(f"ERROR: Expected 13 lines in answer, got {len(lines)}", file=sys.stderr)
        print(f"Lines found: {lines}", file=sys.stderr)
        return None

    for i, line in enumerate(lines, 1):
        if "|" not in line:
            print(f"ERROR: Line {i} does not contain pipe separator '|': '{line}'", file=sys.stderr)
            return None
        
        parts = line.split("|", 1)
        if len(parts) != 2:
            print(f"ERROR: Line {i} could not be split into key|value: '{line}'", file=sys.stderr)
            return None
            
        key, value = parts
        result[key.strip()] = value.strip()
    
    # Check if all expected keys are present
    missing_keys = set(expected_keys) - set(result.keys())
    if missing_keys:
        print(f"ERROR: Missing expected keys: {missing_keys}", file=sys.stderr)
        print(f"Keys found: {list(result.keys())}", file=sys.stderr)
        return None
    
    # Check for unexpected keys
    extra_keys = set(result.keys()) - set(expected_keys)
    if extra_keys:
        print(f"WARNING: Unexpected keys found: {extra_keys}", file=sys.stderr)

    return result


def load_expected_answer(label_path):
    """
    Load the expected answer from label.txt file.
    Returns a dictionary with the expected values.
    """
    try:
        with open(label_path, "r") as f:
            lines = f.read().strip().split("\n")

        expected = {}
        for line in lines:
            if "|" in line:
                key, value = line.split("|", 1)
                expected[key.strip()] = value.strip()

        return expected
    except Exception as e:
        print(f"Error reading label file: {str(e)}", file=sys.stderr)
        return None


def compare_answers(model_answer, expected_answer):
    """
    Compare the model's answer with the expected answer.
    Returns True if all key information matches, False otherwise.
    """
    if not model_answer or not expected_answer:
        return False

    # Check each expected key
    mismatches = []
    for key, expected_value in expected_answer.items():
        model_value = model_answer.get(key, "")

        # Special handling for different types of values
        if key == "WS12Info":
            # Check if product name and price match (format: name:price)
            if ":" in expected_value and ":" in model_value:
                expected_name, expected_price = expected_value.rsplit(":", 1)
                model_name, model_price = model_value.rsplit(":", 1)
                # Normalize price format
                expected_price_clean = expected_price.replace("$", "").replace(",", "")
                model_price_clean = model_price.replace("$", "").replace(",", "")
                if (
                    expected_name != model_name
                    or expected_price_clean != model_price_clean
                ):
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )
            else:
                if expected_value != model_value:
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )

        elif key == "GraceOrderID":
            # Order ID should start with "000" and match exactly
            if not model_value.startswith("000"):
                mismatches.append(
                    f"{key}: expected to start with '000', got '{model_value}'"
                )
            elif model_value != expected_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "HighestOrderInfo":
            # Check format customer:amount
            if ":" in expected_value and ":" in model_value:
                expected_customer, expected_amount = expected_value.rsplit(":", 1)
                model_customer, model_amount = model_value.rsplit(":", 1)
                # Normalize amount format
                expected_amount_clean = expected_amount.replace("$", "").replace(
                    ",", ""
                )
                model_amount_clean = model_amount.replace("$", "").replace(",", "")
                if (
                    expected_customer != model_customer
                    or expected_amount_clean != model_amount_clean
                ):
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )
            else:
                if expected_value != model_value:
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )

        elif key == "Position2Product":
            # Check if product name and quantity match
            if ":" in expected_value and ":" in model_value:
                expected_name, expected_qty = expected_value.rsplit(":", 1)
                model_name, model_qty = model_value.rsplit(":", 1)
                if expected_name != model_name or expected_qty != model_qty:
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )
            else:
                if expected_value != model_value:
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )

        elif key == "OvernightDufflePrice":
            # Normalize price format
            expected_clean = expected_value.replace("$", "").replace(",", "")
            model_clean = model_value.replace("$", "").replace(",", "")
            if expected_clean != model_clean:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "HollisterPosition":
            # Position format (1st, 2nd, 3rd, etc.)
            if model_value.lower() != expected_value.lower():
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        elif key == "SarahMillerInfo":
            # Format: group:date
            if ":" in expected_value and ":" in model_value:
                expected_group, expected_date = expected_value.split(":", 1)
                model_group, model_date = model_value.split(":", 1)
                # Allow some flexibility in date format
                if expected_group != model_group:
                    mismatches.append(
                        f"{key}: expected group '{expected_group}', got '{model_group}'"
                    )
                # For date, check if key parts match
                if not (expected_date in model_date or model_date in expected_date):
                    mismatches.append(
                        f"{key}: expected date '{expected_date}', got '{model_date}'"
                    )
            else:
                if expected_value != model_value:
                    mismatches.append(
                        f"{key}: expected '{expected_value}', got '{model_value}'"
                    )

        elif key == "Invoice002BillTo":
            # Name should match exactly
            if model_value != expected_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

        else:
            # Exact match for count fields and other numeric values
            if model_value != expected_value:
                mismatches.append(
                    f"{key}: expected '{expected_value}', got '{model_value}'"
                )

    if mismatches:
        print("\n=== Answer Comparison Mismatches ===", file=sys.stderr)
        for mismatch in mismatches:
            print(f"✗ {mismatch}", file=sys.stderr)
        return False

    print("\n=== Answer Comparison ===", file=sys.stderr)
    print("✓ All key information matches the expected answer", file=sys.stderr)
    return True


async def verify() -> bool:
    """
    Verifies that the sales and inventory analysis task has been completed correctly.
    First checks the model's answer against the expected label,
    then optionally verifies the actual state in the Magento Admin.
    """
    print("\n" + "="*60, file=sys.stderr)
    print("Starting verification of Task 5", file=sys.stderr)
    print("="*60, file=sys.stderr)
    
    # Get the label file path
    label_path = Path(__file__).parent / "label.txt"

    # Load expected answer
    print("\n--- Loading Expected Answer ---", file=sys.stderr)
    expected_answer = load_expected_answer(label_path)
    if not expected_answer:
        print("FATAL ERROR: Could not load expected answer from label.txt", file=sys.stderr)
        return False
    print(f"Successfully loaded {len(expected_answer)} expected values", file=sys.stderr)

    # Get model's response from MCP_MESSAGES
    print("\n--- Loading Model Response ---", file=sys.stderr)
    model_response = get_model_response()
    if not model_response:
        print("FATAL ERROR: No model response found in MCP_MESSAGES", file=sys.stderr)
        return False
    
    print(f"Found model response ({len(model_response)} characters)", file=sys.stderr)
    
    print("\n--- Parsing Answer Format ---", file=sys.stderr)
    model_answer = parse_answer_format(model_response)
    
    if not model_answer:
        print("\nFATAL ERROR: Could not parse answer format from model response", file=sys.stderr)
        print("Verification FAILED", file=sys.stderr)
        return False
    
    print("\n=== Model Answer Successfully Parsed ===", file=sys.stderr)
    for key, value in model_answer.items():
        print(f"  {key}: {value}", file=sys.stderr)

    # Compare answers
    print("\n--- Comparing Answers ---", file=sys.stderr)
    answer_match = compare_answers(model_answer, expected_answer)
    
    if not answer_match:
        print("\n" + "="*60, file=sys.stderr)
        print("VERIFICATION FAILED: Model answer does not match expected answer", file=sys.stderr)
        print("="*60, file=sys.stderr)
        return False
    
    print("\n" + "="*60, file=sys.stderr)
    print("✓ VERIFICATION PASSED: Model answer matches expected answer", file=sys.stderr)
    print("="*60, file=sys.stderr)
    return True


def main():
    """
    Executes the verification process and exits with a status code.
    """
    result = asyncio.run(verify())
    sys.exit(0 if result else 1)


if __name__ == "__main__":
    main()