Consistency Enforcement

L3
ModelContextProtocolPostgresLego

Implement data consistency system ensuring reported part counts match actual inventory using triggers and constraint enforcement.

Created by Jiawei Wang
2025-08-15
Data Integrity EnforcementStored Procedures And FunctionsTransactional Operations

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-sonnet-4
4
/4
379.1s
30.3
436,036
6,292
442,329
Claude
claude-sonnet-4-high
4
/4
214.1s
28.3
976,767
6,537
983,304
Claude
claude-sonnet-4-low
4
/4
165.9s
26.0
558,081
6,112
564,193
OpenAI
gpt-5-high
4
/4
681.2s
10.8
176,820
21,729
198,549
OpenAI
gpt-5-medium
4
/4
237.6s
10.5
152,526
11,777
164,303
OpenAI
gpt-5-mini-high
4
/4
116.1s
9.0
101,660
13,358
115,018
Grok
grok-4
4
/4
131.4s
14.0
214,084
5,849
219,933
Grok
grok-code-fast-1
4
/4
49.0s
15.3
409,428
4,662
414,091
MoonshotAI
kimi-k2-0711
4
/4
309.1s
27.3
711,787
3,540
715,326
MoonshotAI
kimi-k2-0905
4
/4
527.9s
30.5
1,508,337
4,639
1,512,976
Qwen
qwen-3-coder-plus
4
/4
274.9s
30.8
2,304,664
4,922
2,309,586
DeepSeek
deepseek-chat
3
/4
337.1s
24.3
655,240
4,323
659,563
Gemini
gemini-2-5-pro
3
/4
100.1s
7.0
36,491
9,220
45,710
OpenAI
gpt-5-low
3
/4
279.4s
8.3
70,265
12,025
82,290
OpenAI
gpt-5-mini-medium
3
/4
58.9s
5.5
67,427
5,777
73,204
Qwen
qwen-3-max
3
/4
92.7s
20.0
165,164
2,124
167,288
Z.ai
glm-4-5
2
/4
218.3s
42.8
437,004
7,629
444,632
Claude
claude-opus-4-1
1
/1
--
393.3s
25.0
172,724
7,067
179,791
Gemini
gemini-2-5-flash
0
/4
54.9s
7.3
22,652
9,301
31,953
OpenAI
gpt-4-1
0
/4
18.0s
1.8
3,507
1,062
4,569
OpenAI
gpt-4-1-mini
0
/4
15.9s
5.5
12,723
416
13,138
OpenAI
gpt-4-1-nano
0
/4
10.1s
3.0
5,984
127
6,111
OpenAI
gpt-5-mini-low
0
/4
38.7s
3.3
6,330
3,736
10,065
OpenAI
gpt-5-nano-high
0
/4
97.8s
1.5
3,339
20,344
23,683
OpenAI
gpt-5-nano-low
0
/4
36.3s
1.0
1,950
7,203
9,153
OpenAI
gpt-5-nano-medium
0
/4
39.0s
1.0
1,989
7,226
9,215
OpenAI
gpt-oss-120b
0
/4
39.2s
4.3
27,492
2,736
30,228
OpenAI
o3
0
/4
30.6s
2.3
5,075
2,532
7,607
OpenAI
o4-mini
0
/4
44.9s
2.3
5,037
3,676
8,714

Task State

Table "lego_colors" { "id" int4 [pk, not null, increment] "name" varchar(255) [not null] "rgb" varchar(6) [not null] "is_trans" bpchar(1) [not null] } Table "lego_inventories" { "id" int4 [pk, not null, increment] "version" int4 [not null] "set_num" varchar(255) [not null] } Table "lego_inventory_parts" { "inventory_id" int4 [not null] "part_num" varchar(255) [not null] "color_id" int4 [not null] "quantity" int4 [not null] "is_spare" bool [not null] } Table "lego_inventory_sets" { "inventory_id" int4 [not null] "set_num" varchar(255) [not null] "quantity" int4 [not null] } Table "lego_part_categories" { "id" int4 [pk, not null, increment] "name" varchar(255) [not null] } Table "lego_parts" { "part_num" varchar(255) [pk, not null] "name" text [not null] "part_cat_id" int4 [not null] } Table "lego_sets" { "set_num" varchar(255) [pk, not null] "name" varchar(255) [not null] "year" int4 "theme_id" int4 "num_parts" int4 } Table "lego_themes" { "id" int4 [pk, not null, increment] "name" varchar(255) [not null] "parent_id" int4 }

Instruction

Implement a data consistency enforcement system for the LEGO database. The system must ensure that the reported part count in the lego_sets table matches the actual sum of non-spare parts in the latest inventory version. This involves a three-step process: identifying existing inconsistencies, fixing them, and creating a trigger-based constraint system to prevent future issues.

Consistency Rule

For any given set_num, the following invariant must be maintained: lego_sets.num_parts = SUM(quantity) FROM lego_inventory_parts WHERE inventory_id IN (latest inventory for that set) AND is_spare = false

Important: If a set has no inventory records, the consistency check should be skipped.

Your Tasks:

Task 1: Identify Data Inconsistencies

Objective

Write a single SELECT query to find all sets where the stored num_parts does not match the actual calculated number of parts from the latest inventory.

  1. Find the Latest Inventory: For each set_num, find its latest inventory id by getting the MAX(version) from the lego_inventories table.
  2. Calculate Actual Part Count: For these latest inventories, join with lego_inventory_parts and calculate the SUM(quantity), but only for parts where is_spare is false.
  3. Compare and Filter: Join this calculated result back to the lego_sets table and return the rows where lego_sets.num_parts is different from your calculated sum.

Task 2: Fix Existing Inconsistencies

Objective

Correct all mismatched num_parts values using a clear, multi-step process with a temporary table. This approach is designed to be robust against all edge cases.

Step 1: Create a Temporary Table

Create a temporary table (e.g., correct_counts) with two columns: set_num (text) and actual_parts (integer).

Step 2: Populate the Temporary Table

This is the most critical step. Write an INSERT statement that calculates the correct part count for every single set listed in the lego_sets table.

  • The query must start by selecting from public.lego_sets.
  • It must then LEFT JOIN to a subquery that contains the part-counting logic (finding the latest inventory version and summing the non-spare parts).
  • Use COALESCE on the final result from the subquery to ensure that any set without parts or without an inventory record gets a value of 0, not NULL.

Step 3: Update from the Temporary Table

Write a final, simple UPDATE statement that joins the lego_sets table with your temporary table on set_num and sets num_parts to the actual_parts value.

Task 3: Create Constraint Enforcement System

Objective

Implement a deferrable constraint trigger system to enforce the consistency rule automatically for all future INSERT and UPDATE operations.

Part A: Create the Trigger Function

Create a single PL/pgSQL function, preferably named check_set_parts_consistency(), that performs the core validation.

Function Requirements:

  • Returns trigger.
  • Accepts no arguments.
  • Contains the core validation logic:
    • Identify the set_num to check. This is the most critical part. The set_num must be retrieved based on which table fired the trigger (TG_TABLE_NAME):
      • If lego_sets or lego_inventories: get the set_num directly from NEW.set_num.
      • If lego_inventory_parts: you must first query lego_inventories using NEW.inventory_id to find the corresponding set_num.
    • Perform the check. For the identified set_num, execute the same core logic from Task 1 to get the actual_parts count and the stored_num_parts from the lego_sets table.
    • Raise an exception on failure. If actual_parts does not equal stored_num_parts, the function must raise an exception to block the transaction (e.g., RAISE EXCEPTION 'Inconsistent part count for set %', relevant_set_num;).
    • Return NEW on success. If the check passes or is skipped, the function should RETURN NEW.

Part B: Create the Constraint Triggers

Create three separate CONSTRAINT TRIGGER statements that attach the function from Part A to the following tables:

  • public.lego_sets
  • public.lego_inventories
  • public.lego_inventory_parts

Crucial Trigger Requirements:

  • Each trigger must fire AFTER INSERT OR UPDATE.
  • Each trigger MUST be DEFERRABLE and INITIALLY IMMEDIATE. This is non-negotiable for the verification to pass.
  • Each trigger must execute the function FOR EACH ROW.


Verify

*.py
Python
"""
Verification script for PostgreSQL LEGO Task 1: Parts Consistency Fix & Constraints
Version 2.1: Relaxed consistency check to allow for one known corner case mismatch.
"""

import os
import sys
import psycopg2
import psycopg2.errors
from typing import Optional, Tuple, List


def get_connection_params() -> dict:
    """Get database connection parameters from environment variables."""
    return {
        "host": os.getenv("POSTGRES_HOST", "localhost"),
        "port": int(os.getenv("POSTGRES_PORT", 5432)),
        "database": os.getenv("POSTGRES_DATABASE"),
        "user": os.getenv("POSTGRES_USERNAME"),
        "password": os.getenv("POSTGRES_PASSWORD"),
    }


def fetch_candidate_part_row(cur) -> Optional[Tuple[int, str, str, int]]:
    """
    Picks a concrete, non-spare inventory part from the latest inventory of any set.
    This provides a reliable target for testing update and insert triggers.

    Returns a tuple: (inventory_id, set_num, part_num, color_id) or None.
    """
    cur.execute(
        """
        WITH latest_inv AS (
            SELECT set_num, MAX(version) AS max_version
            FROM public.lego_inventories
            GROUP BY set_num
        ), inv AS (
            SELECT li.id, li.set_num
            FROM public.lego_inventories li
            JOIN latest_inv lv ON lv.set_num = li.set_num AND lv.max_version = li.version
        )
        SELECT i.id AS inventory_id, i.set_num, lip.part_num, lip.color_id
        FROM inv i
        JOIN public.lego_inventory_parts lip ON lip.inventory_id = i.id
        WHERE lip.is_spare = false AND lip.quantity > 0
        LIMIT 1;
        """
    )
    return cur.fetchone()


def get_mismatch_count(cur) -> int:
    """Returns the number of sets where num_parts mismatches the computed actual sum."""
    cur.execute(
        """
        WITH latest_inv AS (
            SELECT set_num, MAX(version) AS max_version
            FROM public.lego_inventories
            GROUP BY set_num
        ), inv_latest AS (
            SELECT li.set_num, li.id
            FROM public.lego_inventories li
            JOIN latest_inv lv ON lv.set_num = li.set_num AND lv.max_version = li.version
        ), parts_agg AS (
            SELECT
                i.set_num,
                SUM(lip.quantity) AS actual_parts
            FROM inv_latest i
            JOIN public.lego_inventory_parts lip ON lip.inventory_id = i.id
            WHERE lip.is_spare = false
            GROUP BY i.set_num
        )
        SELECT COUNT(*)
        FROM public.lego_sets s
        LEFT JOIN parts_agg pa ON s.set_num = pa.set_num
        WHERE s.num_parts <> COALESCE(pa.actual_parts, 0);
        """
    )
    return cur.fetchone()[0]


def verify_data_consistency(conn) -> bool:
    """
    TASK 1 VERIFICATION: Checks if the initial data fix was successful.
    (Relaxed: Allows for one corner-case mismatch).
    """
    print("\n-- Verifying Task 1: Data Consistency Fix (Relaxed) --")
    with conn.cursor() as cur:
        count = get_mismatch_count(cur)
        # RELAXED CONDITION: Allow 0 or 1 mismatch to pass.
        if count > 1:
            print(f"❌ FAIL: Found {count} sets with inconsistent part counts. Expected 0 or 1 after fix.")
            return False
        
        print("✅ PASS: Data consistency check passed (allowing for one known mismatch).")
        return True


def verify_constraint_triggers_exist(conn) -> bool:
    """
    TASK 2 VERIFICATION (Part A): Checks if constraint triggers are attached to all required tables.
    This is more robust than checking names or a total count.
    """
    print("\n-- Verifying Task 2: Constraint Trigger Existence --")
    tables_to_check = [
        'public.lego_inventory_parts',
        'public.lego_inventories',
        'public.lego_sets'
    ]
    all_triggers_found = True
    with conn.cursor() as cur:
        for table in tables_to_check:
            cur.execute(
                """
                SELECT COUNT(*)
                FROM pg_trigger
                WHERE tgrelid = %s::regclass AND tgconstraint <> 0;
                """,
                (table,)
            )
            trigger_count = cur.fetchone()[0]
            if trigger_count == 0:
                print(f"❌ FAIL: No constraint trigger found on table '{table}'.")
                all_triggers_found = False
            else:
                print(f"✅ OK: Found constraint trigger(s) on table '{table}'.")

    if all_triggers_found:
        print("✅ PASS: Constraint triggers are attached to all required tables.")
    return all_triggers_found


def verify_violation_is_blocked(conn) -> bool:
    """
    TASK 2 VERIFICATION (Part B): Checks if triggers block a direct, inconsistent write.
    An attempt to increment a part quantity without updating the set's total should fail.
    """
    print("\n-- Verifying Task 2: Immediate Constraint Enforcement --")
    with conn.cursor() as cur:
        candidate = fetch_candidate_part_row(cur)
        if not candidate:
            print("⚠️ SKIP: No candidate part row found to test constraints. Cannot verify.")
            return True # Skip if no data to test

        inventory_id, _, part_num, color_id = candidate
        try:
            # This transaction should fail due to the trigger
            cur.execute(
                """
                UPDATE public.lego_inventory_parts
                SET quantity = quantity + 1
                WHERE inventory_id = %s AND part_num = %s AND color_id = %s;
                """,
                (inventory_id, part_num, color_id),
            )
            # If we reach here, the trigger failed to block the update.
            conn.rollback()
            print("❌ FAIL: An inconsistent write was NOT blocked by the trigger.")
            return False
        except psycopg2.Error as e:
            # We expect an error. Specifically, a constraint violation error.
            conn.rollback()
            # 23514 is check_violation, but custom triggers might raise others.
            # Any error here is considered a success as the transaction was blocked.
            print(f"✅ PASS: Inconsistent write was correctly blocked by the trigger. (Error: {e.pgcode})")
            return True


def verify_deferred_transaction_is_allowed(conn) -> bool:
    """
    TASK 2 VERIFICATION (Part C): Checks if a coordinated, consistent update is allowed
    when constraints are deferred.
    """
    print("\n-- Verifying Task 2: Deferred Constraint Enforcement --")
    with conn.cursor() as cur:
        candidate = fetch_candidate_part_row(cur)
        if not candidate:
            print("⚠️ SKIP: No candidate part row found. Cannot test deferred transaction.")
            return True # Skip if no data to test

    inventory_id, set_num, part_num, color_id = candidate

    try:
        # This multi-statement transaction should succeed with deferred constraints
        with conn.cursor() as cur:
            cur.execute("BEGIN;")
            cur.execute("SET CONSTRAINTS ALL DEFERRED;")
            cur.execute(
                "UPDATE public.lego_inventory_parts SET quantity = quantity + 1 WHERE inventory_id = %s AND part_num = %s AND color_id = %s;",
                (inventory_id, part_num, color_id),
            )
            cur.execute(
                "UPDATE public.lego_sets SET num_parts = num_parts + 1 WHERE set_num = %s;",
                (set_num,),
            )
            cur.execute("COMMIT;") # This will fail if constraints are not deferrable or logic is wrong
        print("✅ PASS: Coordinated update with deferred constraints committed successfully.")

        # Revert changes to leave DB in its original state
        with conn.cursor() as cur:
            cur.execute("BEGIN;")
            cur.execute("SET CONSTRAINTS ALL DEFERRED;")
            cur.execute(
                "UPDATE public.lego_inventory_parts SET quantity = quantity - 1 WHERE inventory_id = %s AND part_num = %s AND color_id = %s;",
                (inventory_id, part_num, color_id),
            )
            cur.execute(
                "UPDATE public.lego_sets SET num_parts = num_parts - 1 WHERE set_num = %s;",
                (set_num,),
            )
            cur.execute("COMMIT;")
        print("INFO: Test changes were successfully reverted.")
        return True

    except psycopg2.Error as e:
        conn.rollback()
        print(f"❌ FAIL: Deferred transaction failed to commit. Error: {e}")
        return False


def main():
    """Main verification function."""
    print("=" * 60)
    print("LEGO Database Consistency Verification Script")
    print("=" * 60)

    conn_params = get_connection_params()
    if not conn_params.get("database"):
        print("❌ CRITICAL: POSTGRES_DATABASE environment variable not set.")
        sys.exit(1)

    try:
        with psycopg2.connect(**conn_params) as conn:
            conn.autocommit = False # Ensure we control transactions

            # Run all verification steps
            results = [
                verify_data_consistency(conn),
                verify_constraint_triggers_exist(conn),
                verify_violation_is_blocked(conn),
                verify_deferred_transaction_is_allowed(conn),
            ]

            if all(results):
                print("\n🎉 Overall Result: PASS - All tasks verified successfully!")
                sys.exit(0)
            else:
                print("\n❌ Overall Result: FAIL - One or more verification steps failed.")
                sys.exit(1)

    except psycopg2.OperationalError as e:
        print(f"❌ CRITICAL: Could not connect to the database. Details: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"❌ CRITICAL: An unexpected error occurred during verification. Details: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()