Database Security Policies
L3
PostgresLego
Implement Row-Level Security policies with role-based access control for theme-based data isolation in LEGO database.
Created by Jiawei Wang
2025-08-15
Security And Access ControlStored Procedures And Functions
Model Ranking
Click on the dots to view the trajectory of each task run
Model | Run Results | Pass@4 | Pass^4 | Avg Time | Avg Turns | Input Tokens | Output Tokens | Total Tokens |
---|---|---|---|---|---|---|---|---|
gpt-5 | 4 /4 | 370.7s | 17.5 | 315,174 | 17,984 | 333,158 | ||
gemini-2-5-pro | 3 /4 | 71.6s | 6.5 | 17,917 | 6,357 | 24,274 | ||
grok-4 | 3 /4 | 200.3s | 45.5 | - | - | - | ||
qwen-3-coder | 3 /4 | 326.0s | 33.5 | 1,137,994 | 3,625 | 1,141,619 | ||
o3 | 2 /4 | 102.8s | 16.5 | 257,129 | 5,396 | 262,525 | ||
deepseek-chat | 1 /4 | 543.5s | 39.8 | 318,401 | 7,928 | 326,329 | ||
k2 | 1 /4 | 645.1s | 34.8 | 472,740 | 5,515 | 478,255 | ||
claude-4-1-opus | 0 /1 | - | - | 888.4s | 30.0 | 353,413 | 18,779 | 372,192 |
claude-4-sonnet | 0 /4 | 224.6s | 35.3 | 275,000 | 8,479 | 283,479 |
Task State
Table "lego_colors" {
"id" int4 [pk, not null, increment]
"name" varchar(255) [not null]
"rgb" varchar(6) [not null]
"is_trans" bpchar(1) [not null]
}
Table "lego_inventories" {
"id" int4 [pk, not null, increment]
"version" int4 [not null]
"set_num" varchar(255) [not null]
}
Table "lego_inventory_parts" {
"inventory_id" int4 [not null]
"part_num" varchar(255) [not null]
"color_id" int4 [not null]
"quantity" int4 [not null]
"is_spare" bool [not null]
}
Table "lego_inventory_sets" {
"inventory_id" int4 [not null]
"set_num" varchar(255) [not null]
"quantity" int4 [not null]
}
Table "lego_part_categories" {
"id" int4 [pk, not null, increment]
"name" varchar(255) [not null]
}
Table "lego_parts" {
"part_num" varchar(255) [pk, not null]
"name" text [not null]
"part_cat_id" int4 [not null]
}
Table "lego_sets" {
"set_num" varchar(255) [pk, not null]
"name" varchar(255) [not null]
"year" int4
"theme_id" int4
"num_parts" int4
}
Table "lego_themes" {
"id" int4 [pk, not null, increment]
"name" varchar(255) [not null]
"parent_id" int4
}
Instruction
Verify
Python
"""
Verification script for PostgreSQL LEGO Task 4: Database Security and RLS Implementation
(Version 2 - Improved Robustness)
"""
import os
import sys
import psycopg2
import psycopg2.errors
from typing import Dict
def get_connection_params() -> Dict[str, any]:
"""Get database connection parameters from environment variables."""
return {
"host": os.getenv("POSTGRES_HOST", "localhost"),
"port": int(os.getenv("POSTGRES_PORT", 5432)),
"database": os.getenv("POSTGRES_DATABASE"),
"user": os.getenv("POSTGRES_USERNAME"),
"password": os.getenv("POSTGRES_PASSWORD"),
}
def verify_role_creation(conn) -> bool:
"""
TASK 1 VERIFICATION: Check if theme_analyst role was created with proper permissions.
"""
print("\n-- Verifying Task 1: Role Creation and Permissions --")
with conn.cursor() as cur:
# Check if role exists
cur.execute("SELECT 1 FROM pg_roles WHERE rolname = 'theme_analyst';")
if not cur.fetchone():
print("❌ FAIL: The 'theme_analyst' role was not created.")
return False
print("✅ OK: Role 'theme_analyst' exists.")
# Check SELECT permissions on reference and main tables
all_tables = [
'lego_themes', 'lego_colors', 'lego_parts', 'lego_part_categories',
'lego_sets', 'lego_inventories', 'lego_inventory_parts'
]
for table in all_tables:
cur.execute(
"""
SELECT has_table_privilege('theme_analyst', %s, 'SELECT');
""",
(table,)
)
if not cur.fetchone()[0]:
print(f"❌ FAIL: 'theme_analyst' role is missing SELECT permission on '{table}'.")
return False
print("✅ OK: Role has correct SELECT permissions on all required tables.")
# Check that no INSERT/UPDATE/DELETE permissions exist
for table in all_tables:
cur.execute(
"""
SELECT
has_table_privilege('theme_analyst', %s, 'INSERT') OR
has_table_privilege('theme_analyst', %s, 'UPDATE') OR
has_table_privilege('theme_analyst', %s, 'DELETE');
""",
(table, table, table)
)
if cur.fetchone()[0]:
print(f"❌ FAIL: 'theme_analyst' role has unauthorized INSERT, UPDATE, or DELETE permission on '{table}'.")
return False
print("✅ OK: Role does not have modification permissions.")
print("✅ PASS: 'theme_analyst' role created with correct permissions.")
return True
def verify_rls_enabled(conn) -> bool:
"""
TASK 2 VERIFICATION: Check if Row-Level Security is enabled on required tables.
"""
print("\n-- Verifying Task 2: Row-Level Security Enablement --")
tables_to_check = ['lego_sets', 'lego_inventories', 'lego_inventory_parts']
with conn.cursor() as cur:
for table in tables_to_check:
cur.execute(
"SELECT relrowsecurity FROM pg_class WHERE relname = %s;", (table,)
)
rls_enabled = cur.fetchone()
if not rls_enabled or not rls_enabled[0]:
print(f"❌ FAIL: RLS is not enabled on table '{table}'.")
return False
print(f"✅ OK: RLS is enabled on table '{table}'.")
print("✅ PASS: Row-Level Security is enabled on all required tables.")
return True
def verify_rls_policies(conn) -> bool:
"""
TASK 3 VERIFICATION: Check if RLS policies were created on required tables.
"""
print("\n-- Verifying Task 3: RLS Policy Creation --")
expected_policies = {
'lego_sets': 'theme_sets_policy',
'lego_inventories': 'theme_inventories_policy',
'lego_inventory_parts': 'theme_inventory_parts_policy'
}
with conn.cursor() as cur:
for table, policy_name in expected_policies.items():
cur.execute(
"SELECT 1 FROM pg_policies WHERE tablename = %s AND policyname = %s;",
(table, policy_name)
)
if not cur.fetchone():
print(f"❌ FAIL: RLS policy '{policy_name}' not found on table '{table}'.")
return False
print(f"✅ OK: RLS policy '{policy_name}' found on table '{table}'.")
print("✅ PASS: All required RLS policies are created.")
return True
def verify_theme_function(conn) -> bool:
"""
TASK 4 VERIFICATION: Check if get_user_theme_id() function was created and works correctly.
"""
print("\n-- Verifying Task 4: Theme Assignment Function --")
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM pg_proc WHERE proname = 'get_user_theme_id';"
)
if not cur.fetchone():
print("❌ FAIL: The 'get_user_theme_id' function was not created.")
return False
print("✅ OK: Function 'get_user_theme_id' exists.")
try:
# Test the function's output specifically for the 'theme_analyst' role
cur.execute("SET ROLE theme_analyst;")
cur.execute("SELECT get_user_theme_id();")
theme_id = cur.fetchone()[0]
cur.execute("RESET ROLE;") # IMPORTANT: Switch back
if theme_id != 18:
print(f"❌ FAIL: get_user_theme_id() returned {theme_id} for 'theme_analyst', but expected 18.")
return False
print("✅ OK: Function returns correct theme_id (18) for 'theme_analyst'.")
print("✅ PASS: Theme assignment function is correct.")
return True
except Exception as e:
conn.rollback() # Rollback any failed transaction state
print(f"❌ FAIL: Error testing get_user_theme_id() function: {e}")
return False
def test_theme_analyst_access(conn) -> bool:
"""
TASK 5 VERIFICATION: Test data access by assuming the theme_analyst role.
"""
print("\n-- Verifying Task 5: Theme-Based Data Access --")
try:
with conn.cursor() as cur:
# Assume the role of theme_analyst for this session
cur.execute("SET ROLE theme_analyst;")
# Test 1: Check Star Wars sets access (should return 2 sets)
cur.execute("SELECT set_num FROM lego_sets ORDER BY set_num;")
star_wars_sets = [row[0] for row in cur.fetchall()]
expected_sets = ['65081-1', 'K8008-1']
if sorted(star_wars_sets) != sorted(expected_sets):
print(f"❌ FAIL: Expected Star Wars sets {expected_sets}, but got {star_wars_sets}.")
cur.execute("RESET ROLE;")
return False
print("✅ PASS: Star Wars sets access is correct (2 sets returned).")
# Test 2: Check that Technic sets are not accessible (should return 0)
cur.execute("SELECT COUNT(*) FROM lego_sets WHERE theme_id = 1;")
technic_count = cur.fetchone()[0]
if technic_count != 0:
print(f"❌ FAIL: Technic sets should be blocked, but query returned {technic_count} sets.")
cur.execute("RESET ROLE;")
return False
print("✅ PASS: Technic theme is correctly blocked (0 sets returned).")
# Test 3: Check reference tables are fully accessible
cur.execute("SELECT COUNT(*) > 10 FROM lego_themes;") # Check for a reasonable number
if not cur.fetchone()[0]:
print("❌ FAIL: 'lego_themes' table seems inaccessible or empty.")
cur.execute("RESET ROLE;")
return False
print("✅ PASS: Reference tables appear to be accessible.")
# Test 4 & 5: Check related tables
cur.execute("SELECT COUNT(*) FROM lego_inventories;")
if cur.fetchone()[0] == 0:
print("❌ FAIL: No inventories are visible for the allowed sets.")
cur.execute("RESET ROLE;")
return False
cur.execute("SELECT COUNT(*) FROM lego_inventory_parts;")
if cur.fetchone()[0] == 0:
print("❌ FAIL: No inventory parts are visible for the allowed sets.")
cur.execute("RESET ROLE;")
return False
print("✅ PASS: Related tables (inventories, inventory_parts) are correctly filtered.")
# IMPORTANT: Always reset the role at the end
cur.execute("RESET ROLE;")
return True
except Exception as e:
conn.rollback() # Ensure transaction is clean
print(f"❌ FAIL: An error occurred while testing data access as 'theme_analyst': {e}")
# Try to reset role even on failure to clean up session state
try:
with conn.cursor() as cleanup_cur:
cleanup_cur.execute("RESET ROLE;")
except:
pass
return False
def main():
"""Main verification function."""
print("=" * 60)
print("LEGO Database Security and RLS Verification Script")
print("=" * 60)
conn_params = get_connection_params()
if not conn_params.get("database"):
print("❌ CRITICAL: POSTGRES_DATABASE environment variable not set.")
sys.exit(1)
conn = None
try:
conn = psycopg2.connect(**conn_params)
results = [
verify_role_creation(conn),
verify_rls_enabled(conn),
verify_rls_policies(conn),
verify_theme_function(conn),
test_theme_analyst_access(conn),
]
if all(results):
print("\n🎉 Overall Result: PASS - All security tasks verified successfully!")
sys.exit(0)
else:
print("\n❌ Overall Result: FAIL - One or more verification steps failed.")
sys.exit(1)
except psycopg2.OperationalError as e:
print(f"❌ CRITICAL: Could not connect to the database. Check credentials and host. Details: {e}")
sys.exit(1)
except Exception as e:
print(f"❌ CRITICAL: An unexpected error occurred. Details: {e}")
sys.exit(1)
finally:
if conn:
conn.close()
if __name__ == "__main__":
main()