Consistency Enforcement

L3
ModelContextProtocolPostgresLego

Implement data consistency system ensuring reported part counts match actual inventory using triggers and constraint enforcement.

Created by Jiawei Wang
2025-08-15
Data Integrity EnforcementStored Procedures And FunctionsTransactional Operations

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-4-sonnet
4
/4
379.1s
30.3
436,036
6,292
442,329
MoonshotAI
k2
4
/4
309.1s
27.3
711,787
3,540
715,326
DeepSeek
deepseek-chat
3
/4
337.1s
24.3
655,240
4,323
659,563
Gemini
gemini-2-5-pro
3
/4
100.1s
7.0
36,491
9,220
45,710
OpenAI
gpt-5
3
/4
279.4s
8.3
70,265
12,025
82,290
Qwen
qwen-3-coder
3
/4
147.3s
22.8
517,688
3,510
521,198
Grok
grok-4
2
/4
98.0s
9.8
-
-
-
Claude
claude-4-1-opus
1
/1
--
393.3s
25.0
172,724
7,067
179,791
OpenAI
o3
0
/4
30.6s
2.3
5,075
2,532
7,607

Task State

Table "lego_colors" { "id" int4 [pk, not null, increment] "name" varchar(255) [not null] "rgb" varchar(6) [not null] "is_trans" bpchar(1) [not null] } Table "lego_inventories" { "id" int4 [pk, not null, increment] "version" int4 [not null] "set_num" varchar(255) [not null] } Table "lego_inventory_parts" { "inventory_id" int4 [not null] "part_num" varchar(255) [not null] "color_id" int4 [not null] "quantity" int4 [not null] "is_spare" bool [not null] } Table "lego_inventory_sets" { "inventory_id" int4 [not null] "set_num" varchar(255) [not null] "quantity" int4 [not null] } Table "lego_part_categories" { "id" int4 [pk, not null, increment] "name" varchar(255) [not null] } Table "lego_parts" { "part_num" varchar(255) [pk, not null] "name" text [not null] "part_cat_id" int4 [not null] } Table "lego_sets" { "set_num" varchar(255) [pk, not null] "name" varchar(255) [not null] "year" int4 "theme_id" int4 "num_parts" int4 } Table "lego_themes" { "id" int4 [pk, not null, increment] "name" varchar(255) [not null] "parent_id" int4 }

Instruction



Verify

*.py
Python
"""
Verification script for PostgreSQL LEGO Task 1: Parts Consistency Fix & Constraints
Version 2.1: Relaxed consistency check to allow for one known corner case mismatch.
"""

import os
import sys
import psycopg2
import psycopg2.errors
from typing import Optional, Tuple, List


def get_connection_params() -> dict:
    """Get database connection parameters from environment variables."""
    return {
        "host": os.getenv("POSTGRES_HOST", "localhost"),
        "port": int(os.getenv("POSTGRES_PORT", 5432)),
        "database": os.getenv("POSTGRES_DATABASE"),
        "user": os.getenv("POSTGRES_USERNAME"),
        "password": os.getenv("POSTGRES_PASSWORD"),
    }


def fetch_candidate_part_row(cur) -> Optional[Tuple[int, str, str, int]]:
    """
    Picks a concrete, non-spare inventory part from the latest inventory of any set.
    This provides a reliable target for testing update and insert triggers.

    Returns a tuple: (inventory_id, set_num, part_num, color_id) or None.
    """
    cur.execute(
        """
        WITH latest_inv AS (
            SELECT set_num, MAX(version) AS max_version
            FROM public.lego_inventories
            GROUP BY set_num
        ), inv AS (
            SELECT li.id, li.set_num
            FROM public.lego_inventories li
            JOIN latest_inv lv ON lv.set_num = li.set_num AND lv.max_version = li.version
        )
        SELECT i.id AS inventory_id, i.set_num, lip.part_num, lip.color_id
        FROM inv i
        JOIN public.lego_inventory_parts lip ON lip.inventory_id = i.id
        WHERE lip.is_spare = false AND lip.quantity > 0
        LIMIT 1;
        """
    )
    return cur.fetchone()


def get_mismatch_count(cur) -> int:
    """Returns the number of sets where num_parts mismatches the computed actual sum."""
    cur.execute(
        """
        WITH latest_inv AS (
            SELECT set_num, MAX(version) AS max_version
            FROM public.lego_inventories
            GROUP BY set_num
        ), inv_latest AS (
            SELECT li.set_num, li.id
            FROM public.lego_inventories li
            JOIN latest_inv lv ON lv.set_num = li.set_num AND lv.max_version = li.version
        ), parts_agg AS (
            SELECT
                i.set_num,
                SUM(lip.quantity) AS actual_parts
            FROM inv_latest i
            JOIN public.lego_inventory_parts lip ON lip.inventory_id = i.id
            WHERE lip.is_spare = false
            GROUP BY i.set_num
        )
        SELECT COUNT(*)
        FROM public.lego_sets s
        LEFT JOIN parts_agg pa ON s.set_num = pa.set_num
        WHERE s.num_parts <> COALESCE(pa.actual_parts, 0);
        """
    )
    return cur.fetchone()[0]


def verify_data_consistency(conn) -> bool:
    """
    TASK 1 VERIFICATION: Checks if the initial data fix was successful.
    (Relaxed: Allows for one corner-case mismatch).
    """
    print("\n-- Verifying Task 1: Data Consistency Fix (Relaxed) --")
    with conn.cursor() as cur:
        count = get_mismatch_count(cur)
        # RELAXED CONDITION: Allow 0 or 1 mismatch to pass.
        if count > 1:
            print(f"❌ FAIL: Found {count} sets with inconsistent part counts. Expected 0 or 1 after fix.")
            return False
        
        print("✅ PASS: Data consistency check passed (allowing for one known mismatch).")
        return True


def verify_constraint_triggers_exist(conn) -> bool:
    """
    TASK 2 VERIFICATION (Part A): Checks if constraint triggers are attached to all required tables.
    This is more robust than checking names or a total count.
    """
    print("\n-- Verifying Task 2: Constraint Trigger Existence --")
    tables_to_check = [
        'public.lego_inventory_parts',
        'public.lego_inventories',
        'public.lego_sets'
    ]
    all_triggers_found = True
    with conn.cursor() as cur:
        for table in tables_to_check:
            cur.execute(
                """
                SELECT COUNT(*)
                FROM pg_trigger
                WHERE tgrelid = %s::regclass AND tgconstraint <> 0;
                """,
                (table,)
            )
            trigger_count = cur.fetchone()[0]
            if trigger_count == 0:
                print(f"❌ FAIL: No constraint trigger found on table '{table}'.")
                all_triggers_found = False
            else:
                print(f"✅ OK: Found constraint trigger(s) on table '{table}'.")

    if all_triggers_found:
        print("✅ PASS: Constraint triggers are attached to all required tables.")
    return all_triggers_found


def verify_violation_is_blocked(conn) -> bool:
    """
    TASK 2 VERIFICATION (Part B): Checks if triggers block a direct, inconsistent write.
    An attempt to increment a part quantity without updating the set's total should fail.
    """
    print("\n-- Verifying Task 2: Immediate Constraint Enforcement --")
    with conn.cursor() as cur:
        candidate = fetch_candidate_part_row(cur)
        if not candidate:
            print("⚠️ SKIP: No candidate part row found to test constraints. Cannot verify.")
            return True # Skip if no data to test

        inventory_id, _, part_num, color_id = candidate
        try:
            # This transaction should fail due to the trigger
            cur.execute(
                """
                UPDATE public.lego_inventory_parts
                SET quantity = quantity + 1
                WHERE inventory_id = %s AND part_num = %s AND color_id = %s;
                """,
                (inventory_id, part_num, color_id),
            )
            # If we reach here, the trigger failed to block the update.
            conn.rollback()
            print("❌ FAIL: An inconsistent write was NOT blocked by the trigger.")
            return False
        except psycopg2.Error as e:
            # We expect an error. Specifically, a constraint violation error.
            conn.rollback()
            # 23514 is check_violation, but custom triggers might raise others.
            # Any error here is considered a success as the transaction was blocked.
            print(f"✅ PASS: Inconsistent write was correctly blocked by the trigger. (Error: {e.pgcode})")
            return True


def verify_deferred_transaction_is_allowed(conn) -> bool:
    """
    TASK 2 VERIFICATION (Part C): Checks if a coordinated, consistent update is allowed
    when constraints are deferred.
    """
    print("\n-- Verifying Task 2: Deferred Constraint Enforcement --")
    with conn.cursor() as cur:
        candidate = fetch_candidate_part_row(cur)
        if not candidate:
            print("⚠️ SKIP: No candidate part row found. Cannot test deferred transaction.")
            return True # Skip if no data to test

    inventory_id, set_num, part_num, color_id = candidate

    try:
        # This multi-statement transaction should succeed with deferred constraints
        with conn.cursor() as cur:
            cur.execute("BEGIN;")
            cur.execute("SET CONSTRAINTS ALL DEFERRED;")
            cur.execute(
                "UPDATE public.lego_inventory_parts SET quantity = quantity + 1 WHERE inventory_id = %s AND part_num = %s AND color_id = %s;",
                (inventory_id, part_num, color_id),
            )
            cur.execute(
                "UPDATE public.lego_sets SET num_parts = num_parts + 1 WHERE set_num = %s;",
                (set_num,),
            )
            cur.execute("COMMIT;") # This will fail if constraints are not deferrable or logic is wrong
        print("✅ PASS: Coordinated update with deferred constraints committed successfully.")

        # Revert changes to leave DB in its original state
        with conn.cursor() as cur:
            cur.execute("BEGIN;")
            cur.execute("SET CONSTRAINTS ALL DEFERRED;")
            cur.execute(
                "UPDATE public.lego_inventory_parts SET quantity = quantity - 1 WHERE inventory_id = %s AND part_num = %s AND color_id = %s;",
                (inventory_id, part_num, color_id),
            )
            cur.execute(
                "UPDATE public.lego_sets SET num_parts = num_parts - 1 WHERE set_num = %s;",
                (set_num,),
            )
            cur.execute("COMMIT;")
        print("INFO: Test changes were successfully reverted.")
        return True

    except psycopg2.Error as e:
        conn.rollback()
        print(f"❌ FAIL: Deferred transaction failed to commit. Error: {e}")
        return False


def main():
    """Main verification function."""
    print("=" * 60)
    print("LEGO Database Consistency Verification Script")
    print("=" * 60)

    conn_params = get_connection_params()
    if not conn_params.get("database"):
        print("❌ CRITICAL: POSTGRES_DATABASE environment variable not set.")
        sys.exit(1)

    try:
        with psycopg2.connect(**conn_params) as conn:
            conn.autocommit = False # Ensure we control transactions

            # Run all verification steps
            results = [
                verify_data_consistency(conn),
                verify_constraint_triggers_exist(conn),
                verify_violation_is_blocked(conn),
                verify_deferred_transaction_is_allowed(conn),
            ]

            if all(results):
                print("\n🎉 Overall Result: PASS - All tasks verified successfully!")
                sys.exit(0)
            else:
                print("\n❌ Overall Result: FAIL - One or more verification steps failed.")
                sys.exit(1)

    except psycopg2.OperationalError as e:
        print(f"❌ CRITICAL: Could not connect to the database. Details: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"❌ CRITICAL: An unexpected error occurred during verification. Details: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()