RLS Business Access

L3
ModelContextProtocolPostgresSecurity

Implement Row Level Security policies for social platform with proper access control for posts, comments, and channels.

Created by Fanshi Zhang
2025-08-17
Security And Access ControlStored Procedures And FunctionsSchema Design

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Grok
grok-4
4
/4
246.5s
15.8
-
-
-
OpenAI
gpt-5
3
/4
253.8s
10.8
42,253
17,689
59,942
Claude
claude-4-1-opus
1
/1
--
1202.6s
41.0
699,229
22,522
721,751
Claude
claude-4-sonnet
1
/4
324.2s
26.3
280,479
8,322
288,801
OpenAI
o3
1
/4
67.0s
7.8
25,827
5,323
31,150
Qwen
qwen-3-coder
1
/4
172.0s
29.0
252,608
6,195
258,802
DeepSeek
deepseek-chat
0
/4
29.8s
0.8
2,124
57
2,181
Gemini
gemini-2-5-pro
0
/4
60.3s
3.3
9,495
5,957
15,452
MoonshotAI
k2
0
/4
306.9s
31.0
260,922
6,600
267,522

Task State

Table "users" { "id" uuid [pk, not null, default: `gen_random_uuid()`] "username" varchar(50) [unique, not null] "email" varchar(100) [unique, not null] "is_public" bool [default: false] "created_at" timestamp [default: `CURRENT_TIMESTAMP`] Indexes { is_public [type: btree, name: "idx_users_is_public"] } } Table "channels" { "id" uuid [pk, not null, default: `gen_random_uuid()`] "name" varchar(100) [not null] "description" text "is_public" bool [default: true] "owner_id" uuid "created_at" timestamp [default: `CURRENT_TIMESTAMP`] Indexes { is_public [type: btree, name: "idx_channels_is_public"] owner_id [type: btree, name: "idx_channels_owner_id"] } } Table "channel_moderators" { "channel_id" uuid [not null] "user_id" uuid [not null] "created_at" timestamp [default: `CURRENT_TIMESTAMP`] Indexes { (channel_id, user_id) [type: btree, name: "channel_moderators_pkey"] (channel_id, user_id) [type: btree, name: "idx_channel_moderators_channel_user"] user_id [type: btree, name: "idx_channel_moderators_user"] } } Table "posts" { "id" uuid [pk, not null, default: `gen_random_uuid()`] "channel_id" uuid "author_id" uuid "title" varchar(200) [not null] "content" text "created_at" timestamp [default: `CURRENT_TIMESTAMP`] "updated_at" timestamp [default: `CURRENT_TIMESTAMP`] Indexes { author_id [type: btree, name: "idx_posts_author_id"] channel_id [type: btree, name: "idx_posts_channel_id"] created_at [type: btree, name: "idx_posts_created_at"] } } Table "comments" { "id" uuid [pk, not null, default: `gen_random_uuid()`] "post_id" uuid "author_id" uuid "content" text [not null] "created_at" timestamp [default: `CURRENT_TIMESTAMP`] "updated_at" timestamp [default: `CURRENT_TIMESTAMP`] Indexes { author_id [type: btree, name: "idx_comments_author_id"] created_at [type: btree, name: "idx_comments_created_at"] post_id [type: btree, name: "idx_comments_post_id"] } } Ref "channel_moderators_channel_id_fkey":"channels"."id" < "channel_moderators"."channel_id" [delete: cascade] Ref "channel_moderators_user_id_fkey":"users"."id" < "channel_moderators"."user_id" [delete: cascade] Ref "channels_owner_id_fkey":"users"."id" < "channels"."owner_id" [delete: cascade] Ref "comments_author_id_fkey":"users"."id" < "comments"."author_id" [delete: cascade] Ref "comments_post_id_fkey":"posts"."id" < "comments"."post_id" [delete: cascade] Ref "posts_author_id_fkey":"users"."id" < "posts"."author_id" [delete: cascade] Ref "posts_channel_id_fkey":"channels"."id" < "posts"."channel_id" [delete: cascade]

Instruction



Verify

*.py
Python
#!/usr/bin/env python3

import os
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import sys

def verify_rls_implementation():
    """
    Verify that Row Level Security policies have been properly implemented
    for the social media platform with Users, Posts, Comments, and Channels.
    """

    # Database connection parameters from environment
    admin_db_params = {
        'host': os.getenv('POSTGRES_HOST', 'localhost'),
        'port': os.getenv('POSTGRES_PORT', '5432'),
        'user': os.getenv('POSTGRES_USERNAME', 'postgres'),
        'password': os.getenv('POSTGRES_PASSWORD', 'password'),
        'database': os.getenv('POSTGRES_DATABASE', 'postgres')
    }

    # Test user parameters (non-superuser for proper RLS testing)
    test_db_params = {
        'host': os.getenv('POSTGRES_HOST', 'localhost'),
        'port': os.getenv('POSTGRES_PORT', '5432'),
        'user': 'test_user',
        'password': 'testpass',
        'database': os.getenv('POSTGRES_DATABASE', 'postgres')
    }

    try:
        # First connect as admin to ensure test user exists
        admin_conn = psycopg2.connect(**admin_db_params)
        admin_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        admin_cur = admin_conn.cursor()

        # Create test user if it doesn't exist
        try:
            admin_cur.execute("CREATE ROLE test_user LOGIN PASSWORD 'testpass';")
        except psycopg2.Error:
            pass  # User already exists

        # Grant necessary permissions to test user on the current database
        admin_cur.execute("SELECT current_database();")
        current_db_name = admin_cur.fetchone()[0]

        admin_cur.execute(f"GRANT CONNECT ON DATABASE \"{current_db_name}\" TO test_user;")
        admin_cur.execute("GRANT USAGE ON SCHEMA public TO test_user;")
        admin_cur.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO test_user;")
        admin_cur.execute("GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO test_user;")
        admin_cur.execute("GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO test_user;")

        admin_cur.close()
        admin_conn.close()

        # Update test_db_params with the correct database name
        test_db_params['database'] = current_db_name

        # Now connect as test user for RLS verification
        conn = psycopg2.connect(**test_db_params)
        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        cur = conn.cursor()

        print("Verifying...")

        test_results = []

        # Test 1: Check if RLS is enabled on all tables
        print("\n1. Checking RLS enablement...")
        expected_tables = ['users', 'channels', 'channel_moderators', 'posts', 'comments']

        for table in expected_tables:
            cur.execute("""
                SELECT relrowsecurity
                FROM pg_class
                WHERE relname = %s AND relkind = 'r'
            """, (table,))
            result = cur.fetchone()

            if result and result[0]:
                test_results.append(f"✓ RLS enabled on {table}")
            else:
                test_results.append(f"✗ RLS NOT enabled on {table}")

        # Test 2: Users can only update their own profile
        print("\n2. Testing user profile access control...")

        # Alice tries to update her own profile (should work)
        try:
            cur.execute("SET app.current_user_id = '11111111-1111-1111-1111-111111111111';")  # Alice
            cur.execute("""
                UPDATE users
                SET email = 'alice.updated@example.com'
                WHERE id = '11111111-1111-1111-1111-111111111111'
            """)
            test_results.append("✓ Users can update their own profile")
        except Exception as e:
            test_results.append(f"✗ User cannot update own profile: {e}")

        # Alice tries to update Bob's profile (should fail)
        try:
            cur.execute("SET app.current_user_id = '11111111-1111-1111-1111-111111111111';")  # Alice
            cur.execute("""
                UPDATE users
                SET email = 'bob.hacked@example.com'
                WHERE id = '22222222-2222-2222-2222-222222222222'
            """)
            # Check if the update actually affected any rows (RLS blocks by affecting 0 rows)
            if cur.rowcount == 0:
                test_results.append("✓ Users blocked from updating other users' profiles")
            else:
                test_results.append("✗ User was able to update another user's profile (should be blocked)")
        except psycopg2.Error:
            test_results.append("✓ Users blocked from updating other users' profiles")

        # Test 3: Channel ownership controls
        print("\n3. Testing channel ownership controls...")

        # Alice (owner of general channel) tries to update her channel
        try:
            cur.execute("SET app.current_user_id = '11111111-1111-1111-1111-111111111111';")  # Alice
            cur.execute("""
                UPDATE channels
                SET description = 'Updated by Alice'
                WHERE id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
            """)
            test_results.append("✓ Channel owners can update their channels")
        except Exception as e:
            test_results.append(f"✗ Channel owner cannot update channel: {e}")

        # Charlie tries to update Alice's channel (should fail)
        try:
            cur.execute("SET app.current_user_id = '33333333-3333-3333-3333-333333333333';")  # Charlie
            cur.execute("""
                UPDATE channels
                SET description = 'Hacked by Charlie'
                WHERE id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
            """)
            # Check if the update actually affected any rows (RLS blocks by affecting 0 rows)
            if cur.rowcount == 0:
                test_results.append("✓ Non-owners blocked from updating channels")
            else:
                test_results.append("✗ Non-owner was able to update channel (should be blocked)")
        except psycopg2.Error:
            test_results.append("✓ Non-owners blocked from updating channels")

        # Test 4: Post authorship and moderation controls
        print("\n4. Testing post access controls...")

        # Alice (author) tries to update her own post
        try:
            cur.execute("SET app.current_user_id = '11111111-1111-1111-1111-111111111111';")  # Alice
            cur.execute("""
                UPDATE posts
                SET title = 'Updated by Alice'
                WHERE id = 'dddddddd-dddd-dddd-dddd-dddddddddddd'
            """)
            test_results.append("✓ Post authors can update their posts")
        except Exception as e:
            test_results.append(f"✗ Post author cannot update post: {e}")

        # Bob (moderator of general) tries to update Alice's post (should work)
        try:
            cur.execute("SET app.current_user_id = '22222222-2222-2222-2222-222222222222';")  # Bob (moderator)
            cur.execute("""
                UPDATE posts
                SET content = 'Moderated by Bob'
                WHERE id = 'dddddddd-dddd-dddd-dddd-dddddddddddd'
            """)
            test_results.append("✓ Channel moderators can update posts in their channels")
        except Exception as e:
            test_results.append(f"✗ Channel moderator cannot update post: {e}")

        # Eve tries to update Alice's post (should fail - not author, owner, or moderator)
        try:
            cur.execute("SET app.current_user_id = '55555555-5555-5555-5555-555555555555';")  # Eve
            cur.execute("""
                UPDATE posts
                SET content = 'Hacked by Eve'
                WHERE id = 'dddddddd-dddd-dddd-dddd-dddddddddddd'
            """)
            # Check if the update actually affected any rows (RLS blocks by affecting 0 rows)
            if cur.rowcount == 0:
                test_results.append("✓ Unauthorized users blocked from updating posts")
            else:
                test_results.append("✗ Unauthorized user was able to update post (should be blocked)")
        except psycopg2.Error:
            test_results.append("✓ Unauthorized users blocked from updating posts")

        # Test 5: Comment access controls
        print("\n5. Testing comment access controls...")

        # Bob (comment author) tries to update his own comment
        try:
            cur.execute("SET app.current_user_id = '22222222-2222-2222-2222-222222222222';")  # Bob
            cur.execute("""
                UPDATE comments
                SET content = 'Updated by Bob himself'
                WHERE id = '99999999-9999-9999-9999-999999999999'
            """)
            test_results.append("✓ Comment authors can update their comments")
        except Exception as e:
            test_results.append(f"✗ Comment author cannot update comment: {e}")

        # Alice (post author) tries to update Bob's comment on her post (should work)
        try:
            cur.execute("SET app.current_user_id = '11111111-1111-1111-1111-111111111111';")  # Alice (post author)
            cur.execute("""
                UPDATE comments
                SET content = 'Moderated by post author Alice'
                WHERE id = '99999999-9999-9999-9999-999999999999'
            """)
            test_results.append("✓ Post authors can moderate comments on their posts")
        except Exception as e:
            test_results.append(f"✗ Post author cannot moderate comment: {e}")

        # Test 6: Channel moderator assignment controls
        print("\n6. Testing moderator assignment controls...")

        # Alice (channel owner) tries to add a moderator
        try:
            cur.execute("SET app.current_user_id = '11111111-1111-1111-1111-111111111111';")  # Alice (owner of general)
            cur.execute("""
                INSERT INTO channel_moderators (channel_id, user_id)
                VALUES ('aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', '33333333-3333-3333-3333-333333333333')
            """)
            test_results.append("✓ Channel owners can add moderators")
        except Exception as e:
            test_results.append(f"✗ Channel owner cannot add moderator: {e}")

        # Charlie tries to add himself as moderator to Bob's channel (should fail)
        try:
            cur.execute("SET app.current_user_id = '33333333-3333-3333-3333-333333333333';")  # Charlie
            cur.execute("""
                INSERT INTO channel_moderators (channel_id, user_id)
                VALUES ('bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb', '33333333-3333-3333-3333-333333333333')
            """)
            # Check if the insert actually affected any rows (RLS blocks by affecting 0 rows)
            if cur.rowcount == 0:
                test_results.append("✓ Non-owners blocked from adding moderators")
            else:
                test_results.append("✗ Non-owner was able to add moderator (should be blocked)")
        except psycopg2.Error:
            test_results.append("✓ Non-owners blocked from adding moderators")

        # Test 7: Content visibility based on user context
        print("\n7. Testing content visibility...")

        # Count posts visible to Alice
        cur.execute("SET app.current_user_id = '11111111-1111-1111-1111-111111111111';")  # Alice
        cur.execute("SELECT COUNT(*) FROM posts;")
        alice_posts = cur.fetchone()[0]

        # Count posts visible to Eve
        cur.execute("SET app.current_user_id = '55555555-5555-5555-5555-555555555555';")  # Eve
        cur.execute("SELECT COUNT(*) FROM posts;")
        eve_posts = cur.fetchone()[0]

        if alice_posts >= 2 and eve_posts >= 1:  # Alice should see posts in channels she has access to
            test_results.append("✓ Content visibility varies correctly based on user context")
        else:
            test_results.append(f"✗ Content visibility issue: Alice sees {alice_posts}, Eve sees {eve_posts}")

        # Test 8: Anonymous user access
        print("\n8. Testing anonymous user restrictions...")

        try:
            cur.execute("SET app.current_user_id = '';")  # Anonymous user
            cur.execute("SELECT COUNT(*) FROM users;")
            anon_users = cur.fetchone()[0]

            # Anonymous users should be able to see public user profiles per requirements
            # Count public users that should be visible
            cur.execute("SELECT COUNT(*) FROM users WHERE is_public = true;")
            public_users = cur.fetchone()[0] if cur.rowcount > 0 else 0

            if anon_users == public_users and anon_users > 0:
                test_results.append(f"✓ Anonymous users can see {anon_users} public user profiles (correct)")
            elif anon_users == 0:
                test_results.append("✗ Anonymous users cannot see any users (should see public profiles)")
            else:
                test_results.append(f"✗ Anonymous users can see {anon_users} users but expected {public_users} public users")
        except Exception as e:
            test_results.append("✓ Anonymous users properly restricted")

        # Print results
        print("\n" + "="*60)
        print("RLS VERIFICATION RESULTS - SOCIAL MEDIA PLATFORM")
        print("="*60)

        passed = sum(1 for result in test_results if result.startswith("✓"))
        failed = sum(1 for result in test_results if result.startswith("✗"))

        for result in test_results:
            print(result)

        print(f"\nSummary: {passed} passed, {failed} failed")

        cur.close()
        conn.close()

        if failed == 0:
            print("\nAll tests passed.")
            return True
        else:
            print(f"\n{failed} test(s) failed.")
            return False

    except Exception as e:
        print(f"Error during verification: {e}")
        return False

if __name__ == "__main__":
    success = verify_rls_implementation()
    sys.exit(0 if success else 1)