User Permission Audit

L3
ModelContextProtocolPostgresSecurity

Conduct comprehensive security audit identifying users with insufficient or dangling permissions in business database environment.

Created by Fanshi Zhang
2025-08-17
Security And Access ControlAudit And Compliance

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-4-sonnet
3
/4
93.3s
13.3
128,955
4,586
133,540
Grok
grok-4
3
/4
187.5s
10.8
-
-
-
MoonshotAI
k2
2
/4
198.1s
22.5
207,481
5,041
212,522
Claude
claude-4-1-opus
1
/1
--
403.8s
18.0
174,179
6,917
181,096
DeepSeek
deepseek-chat
0
/4
13.0s
1.0
3,167
61
3,228
Gemini
gemini-2-5-pro
0
/4
56.8s
4.0
12,770
5,089
17,860
OpenAI
gpt-5
0
/4
274.1s
8.3
63,498
18,847
82,345
OpenAI
o3
0
/4
90.3s
9.3
39,304
5,789
45,093
Qwen
qwen-3-coder
0
/4
144.8s
33.5
401,414
4,491
405,905

Task State

Table "user_profiles" { "user_id" int4 [pk, not null, increment] "username" varchar(50) [unique, not null] "email" varchar(100) [unique, not null] "first_name" varchar(50) [not null] "last_name" varchar(50) [not null] "phone" varchar(20) "address" text "city" varchar(50) "state" varchar(2) "zip_code" varchar(10) "date_created" timestamp [default: `CURRENT_TIMESTAMP`] "last_updated" timestamp [default: `CURRENT_TIMESTAMP`] "is_active" bool [default: true] "profile_picture_url" text "bio" text } Table "user_credentials" { "credential_id" int4 [pk, not null, increment] "user_id" int4 "password_hash" varchar(255) [not null] "salt" varchar(100) [not null] "login_attempts" int4 [default: 0] "last_login" timestamp "password_created" timestamp [default: `CURRENT_TIMESTAMP`] "password_expires" timestamp "is_locked" bool [default: false] "two_factor_enabled" bool [default: false] "two_factor_secret" varchar(32) "backup_codes" "text[]" "security_questions" jsonb } Table "user_stat_analysis" { "analysis_id" int4 [pk, not null, increment] "user_id" int4 "session_id" varchar(100) "page_views" int4 [default: 0] "time_spent_minutes" int4 [default: 0] "actions_performed" jsonb "device_info" jsonb "ip_address" inet "location_data" jsonb "referrer_url" text "conversion_events" jsonb "analysis_date" date [default: `CURRENT_DATE`] "created_at" timestamp [default: `CURRENT_TIMESTAMP`] } Table "product_catalog" { "product_id" int4 [pk, not null, increment] "product_name" varchar(100) [not null] "description" text "category" varchar(50) "price" numeric(10,2) [not null] "cost" numeric(10,2) "sku" varchar(50) [unique] "inventory_count" int4 [default: 0] "is_active" bool [default: true] "created_at" timestamp [default: `CURRENT_TIMESTAMP`] "updated_at" timestamp [default: `CURRENT_TIMESTAMP`] "supplier_info" jsonb "weight_kg" numeric(6,2) "dimensions" jsonb } Table "order_management" { "order_id" int4 [pk, not null, increment] "user_id" int4 "order_number" varchar(50) [unique, not null] "order_status" varchar(20) [default: 'pending'] "total_amount" numeric(12,2) [not null] "tax_amount" numeric(12,2) "shipping_amount" numeric(12,2) "discount_amount" numeric(12,2) [default: 0] "payment_method" varchar(50) "payment_status" varchar(20) [default: 'pending'] "shipping_address" jsonb "billing_address" jsonb "order_date" timestamp [default: `CURRENT_TIMESTAMP`] "shipped_date" timestamp "delivered_date" timestamp "tracking_number" varchar(100) } Table "financial_transactions" { "transaction_id" int4 [pk, not null, increment] "order_id" int4 "user_id" int4 "transaction_type" varchar(20) [not null] "amount" numeric(12,2) [not null] "currency" varchar(3) [default: 'USD'] "payment_gateway" varchar(50) "gateway_transaction_id" varchar(100) "credit_card_last_four" bpchar(4) "bank_account_last_four" bpchar(4) "transaction_status" varchar(20) [default: 'pending'] "processed_at" timestamp "created_at" timestamp [default: `CURRENT_TIMESTAMP`] "fee_amount" numeric(8,2) "refund_amount" numeric(12,2) [default: 0] "notes" text } Table "audit_logs" { "log_id" int4 [pk, not null, increment] "user_id" int4 "action_type" varchar(50) [not null] "table_name" varchar(50) "record_id" int4 "old_values" jsonb "new_values" jsonb "ip_address" inet "user_agent" text "session_id" varchar(100) "timestamp" timestamp [default: `CURRENT_TIMESTAMP`] "success" bool [default: true] "error_message" text } Ref "audit_logs_user_id_fkey":"user_profiles"."user_id" < "audit_logs"."user_id" Ref "financial_transactions_order_id_fkey":"order_management"."order_id" < "financial_transactions"."order_id" Ref "financial_transactions_user_id_fkey":"user_profiles"."user_id" < "financial_transactions"."user_id" Ref "order_management_user_id_fkey":"user_profiles"."user_id" < "order_management"."user_id" Ref "user_credentials_user_id_fkey":"user_profiles"."user_id" < "user_credentials"."user_id" [delete: cascade] Ref "user_stat_analysis_user_id_fkey":"user_profiles"."user_id" < "user_stat_analysis"."user_id" [delete: cascade]

Instruction



Verify

*.py
Python
import os
import psycopg2
import sys


def verify_security_audit():
    """
    Verify that the security audit correctly identified all permission issues.
    """

    # Database connection parameters from environment
    db_params = {
        'host': os.getenv('POSTGRES_HOST', 'localhost'),
        'port': os.getenv('POSTGRES_PORT', '5432'),
        'user': os.getenv('POSTGRES_USERNAME', 'postgres'),
        'password': os.getenv('POSTGRES_PASSWORD', 'password'),
        'database': os.getenv('POSTGRES_DATABASE', 'postgres')
    }

    try:
        conn = psycopg2.connect(**db_params)
        cur = conn.cursor()

        print("| Verifying security audit findings...")

        # Check if security_audit_results table exists
        cur.execute("""
            SELECT EXISTS (
                SELECT FROM information_schema.tables
                WHERE table_name = 'security_audit_results'
            );
        """)

        if not cur.fetchone()[0]:
            print("FAIL: security_audit_results table not found")
            return False

        # Check if security_audit_details table exists
        cur.execute("""
            SELECT EXISTS (
                SELECT FROM information_schema.tables
                WHERE table_name = 'security_audit_details'
            );
        """)

        if not cur.fetchone()[0]:
            print("FAIL: security_audit_details table not found")
            return False

        # Get all detailed findings
        cur.execute("SELECT * FROM security_audit_details ORDER BY detail_id;")
        findings = cur.fetchall()

        if not findings:
            print("FAIL: No findings in security_audit_details table")
            return False

        print(f"| Found {len(findings)} audit findings")

        # Expected findings based on the ground truth:
        expected_findings = {
            # Expected dangling users
            'dangling_users': {'temp_contractor', 'old_employee', 'test_account'},

            # Expected missing permissions (should be granted)
            'missing_permissions': {
                ('analytics_user', 'user_profiles', 'SELECT'),
                ('analytics_user', 'product_catalog', 'SELECT'),
                ('analytics_user', 'order_management', 'SELECT'),
                ('marketing_user', 'product_catalog', 'SELECT'),
                ('customer_service', 'product_catalog', 'SELECT'),
                ('finance_user', 'user_profiles', 'SELECT'),
                ('product_manager', 'user_stat_analysis', 'SELECT'),
                ('security_auditor', 'audit_logs', 'SELECT'),
                ('developer_user', 'product_catalog', 'SELECT'),
                ('backup_user', 'order_management', 'SELECT'),
                ('backup_user', 'financial_transactions', 'SELECT'),
                ('backup_user', 'user_stat_analysis', 'SELECT'),
                ('backup_user', 'user_credentials', 'SELECT')
            },

            # Expected excessive permissions (should be revoked)
            'excessive_permissions': {
                ('analytics_user', 'financial_transactions', 'SELECT'),
                ('marketing_user', 'financial_transactions', 'SELECT'),
                ('customer_service', 'user_credentials', 'SELECT'),
                ('product_manager', 'financial_transactions', 'SELECT'),
                ('security_auditor', 'financial_transactions', 'UPDATE'),
                ('developer_user', 'user_credentials', 'SELECT'),
                ('developer_user', 'order_management', 'UPDATE'),
                ('backup_user', 'product_catalog', 'DELETE'),
                ('temp_contractor', 'product_catalog', 'SELECT'),
                ('temp_contractor', 'user_profiles', 'SELECT'),
                ('old_employee', 'audit_logs', 'SELECT'),
                ('old_employee', 'user_stat_analysis', 'UPDATE'),
                ('test_account', 'user_profiles', 'SELECT')
            }
        }

        found_dangling = set()
        found_missing_permissions = set()
        found_excessive_permissions = set()

        # Analyze findings (detail_id, username, issue_type, table_name, permission_type, expected_access)
        for finding in findings:
            username = finding[1]
            issue_type = finding[2]
            table_name = finding[3]
            permission_type = finding[4]
            expected_access = finding[5]

            if issue_type == 'DANGLING_USER':
                found_dangling.add(username)
            elif issue_type == 'MISSING_PERMISSION' and expected_access:
                if table_name and permission_type:
                    found_missing_permissions.add((username, table_name, permission_type))
            elif issue_type == 'EXCESSIVE_PERMISSION' and not expected_access:
                if table_name and permission_type:
                    found_excessive_permissions.add((username, table_name, permission_type))

        # Verify dangling users
        missing_dangling = expected_findings['dangling_users'] - found_dangling
        extra_dangling = found_dangling - expected_findings['dangling_users']

        # Verify missing permissions
        missing_missing_perms = expected_findings['missing_permissions'] - found_missing_permissions
        extra_missing_perms = found_missing_permissions - expected_findings['missing_permissions']

        # Verify excessive permissions
        missing_excessive_perms = expected_findings['excessive_permissions'] - found_excessive_permissions
        extra_excessive_perms = found_excessive_permissions - expected_findings['excessive_permissions']

        # Validate structure
        structure_valid = True
        for i, finding in enumerate(findings):
            if len(finding) != 6:  # Should have 6 columns
                print(f"| FAIL: Finding {i + 1} has wrong number of columns (expected 6, got {len(finding)})")
                structure_valid = False
                continue

            detail_id, username, issue_type, table_name, permission_type, expected_access = finding

            if not username:
                print(f"| FAIL: Finding {i + 1} missing username")
                structure_valid = False

            if issue_type not in ['DANGLING_USER', 'MISSING_PERMISSION', 'EXCESSIVE_PERMISSION']:
                print(f"| FAIL: Finding {i + 1} invalid issue_type: {issue_type}")
                structure_valid = False

            if expected_access not in [True, False]:
                print(f"| FAIL: Finding {i + 1} invalid expected_access: {expected_access}")
                structure_valid = False

        if structure_valid:
            print(f"| ✓ structure is valid")

        # Check for missing findings
        all_correct = True

        print(f"| Expected dangling users: {expected_findings['dangling_users']} Found: {found_dangling}")
        if missing_dangling:
            print(f"| Missing dangling users: {missing_dangling}")
            all_correct = False

        print(
            f"| Expected missing permissions: {len(expected_findings['missing_permissions'])} Found: {len(found_missing_permissions)} Missing: {len(missing_missing_perms)}")
        if missing_missing_perms:
            print(f"| Missing 'missing permission' findings:")
            for perm in sorted(missing_missing_perms):
                print(f"|   - {perm[0]} should be granted {perm[2]} on {perm[1]}")
            all_correct = False

        print(
            f"| Expected excessive permissions: {len(expected_findings['excessive_permissions'])} Found: {len(found_excessive_permissions)} Missing: {len(missing_excessive_perms)}")
        if missing_excessive_perms:
            print(f"| Missing 'excessive permission' findings:")
            for perm in sorted(missing_excessive_perms):
                print(f"|   - {perm[0]} should have {perm[2]} revoked on {perm[1]}")
            all_correct = False

        # Check audit summary table
        cur.execute(
            "SELECT audit_type, total_issues, users_affected, tables_affected FROM security_audit_results ORDER BY audit_type;")
        summary_results = cur.fetchall()

        # Expected summary numbers based on ground truth
        expected_summary = {
            'DANGLING_USERS': (3, 3, 0),          # 3 issues, 3 users affected, 0 tables affected
            'EXCESSIVE_PERMISSIONS': (13, 10, 7), # 13 issues, 10 users affected, 7 tables affected
            'MISSING_PERMISSIONS': (13, 8, 7)     # 13 issues, 8 users affected, 7 tables affected
        }

        summary_correct = True
        for result in summary_results:
            audit_type, total_issues, users_affected, tables_affected = result
            print(f"| Summary result: [{audit_type}] {total_issues} issues, {users_affected} users affected, {tables_affected} tables affected")
            
            if audit_type in expected_summary:
                expected = expected_summary[audit_type]
                if (total_issues, users_affected, tables_affected) != expected:
                    print(f"| FAIL: {audit_type} summary mismatch - Expected: {expected}, Got: ({total_issues}, {users_affected}, {tables_affected})")
                    summary_correct = False
                else:
                    print(f"| ✓ {audit_type} summary matches expected values")

        # Assert exact counts match expected
        assert len(found_dangling) == 3, f"Expected 3 dangling users, found {len(found_dangling)}"
        assert len(found_missing_permissions) == 13, f"Expected 13 missing permissions, found {len(found_missing_permissions)}"
        assert len(found_excessive_permissions) == 13, f"Expected 13 excessive permissions, found {len(found_excessive_permissions)}"

        if all_correct and structure_valid and summary_correct:
            print("| ✓ All assertions passed")
            return True
        else:
            return False

    except Exception as e:
        print(f"FAIL: Error during verification: {e}")
        return False
    finally:
        if 'cur' in locals():
            cur.close()
        if 'conn' in locals():
            conn.close()


if __name__ == "__main__":
    success = verify_security_audit()
    sys.exit(0 if success else 1)