Pr Automation Workflow

L3
ModelContextProtocolGithubMCPMark CI/CD

Create comprehensive PR automation with parallel jobs for code quality, testing, security scanning, and build validation.

Created by Zijian Wu
2025-08-15
Pr WorkflowsCi Cd AutomationWorkflow Automation

Model Ranking

Click on the dots to view the trajectory of each task run
Model
Run Results
Pass@4
Pass^4
Avg Time
Avg Turns
Input Tokens
Output Tokens
Total Tokens
Claude
claude-4-1-opus
0
/1
--
174.8s
1.0
16,406
174
16,580
Claude
claude-4-sonnet
0
/4
125.5s
5.0
109,219
3,257
112,476
DeepSeek
deepseek-chat
0
/4
266.6s
7.5
150,886
4,533
155,419
Gemini
gemini-2-5-pro
0
/4
253.5s
8.0
122,951
8,640
131,591
OpenAI
gpt-5
0
/4
230.8s
5.5
81,383
15,792
97,175
Grok
grok-4
0
/4
30.0s
-
-
-
-
MoonshotAI
k2
0
/4
109.3s
2.0
35,877
1,216
37,092
OpenAI
o3
0
/4
76.8s
5.3
70,070
4,639
74,709
Qwen
qwen-3-coder
0
/4
75.0s
6.0
148,000
3,198
151,198

Task State


Instruction



Verify

*.py
Python
import sys
import os
import requests
import time
from typing import Dict, List, Optional, Tuple
from dotenv import load_dotenv
import base64


def _get_github_api(
    endpoint: str, headers: Dict[str, str], owner: str, repo: str
) -> Tuple[bool, Optional[Dict]]:
    """Make a GET request to GitHub API and return (success, response)."""
    url = f"https://api.github.com/repos/{owner}/{repo}/{endpoint}"
    try:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            return True, response.json()
        elif response.status_code == 404:
            return False, None
        else:
            print(f"API error for {endpoint}: {response.status_code}", file=sys.stderr)
            return False, None
    except Exception as e:
        print(f"Exception for {endpoint}: {e}", file=sys.stderr)
        return False, None


def _post_github_api(
    endpoint: str, headers: Dict[str, str], owner: str, repo: str, data: Dict
) -> Tuple[bool, Optional[Dict]]:
    """Make a POST request to GitHub API and return (success, response)."""
    url = f"https://api.github.com/repos/{owner}/{repo}/{endpoint}"
    try:
        response = requests.post(url, headers=headers, json=data)
        if response.status_code in [200, 201]:
            return True, response.json()
        else:
            print(
                f"API error for {endpoint}: {response.status_code} - {response.text}",
                file=sys.stderr,
            )
            return False, None
    except Exception as e:
        print(f"Exception for {endpoint}: {e}", file=sys.stderr)
        return False, None


def _patch_github_api(
    endpoint: str, headers: Dict[str, str], owner: str, repo: str, data: Dict
) -> Tuple[bool, Optional[Dict]]:
    """Make a PATCH request to GitHub API and return (success, response)."""
    url = f"https://api.github.com/repos/{owner}/{repo}/{endpoint}"
    try:
        response = requests.patch(url, headers=headers, json=data)
        if response.status_code == 200:
            return True, response.json()
        else:
            print(
                f"API error for {endpoint}: {response.status_code} - {response.text}",
                file=sys.stderr,
            )
            return False, None
    except Exception as e:
        print(f"Exception for {endpoint}: {e}", file=sys.stderr)
        return False, None


def _get_file_content(
    file_path: str,
    headers: Dict[str, str],
    owner: str,
    repo: str,
    ref: str = "main",
) -> Optional[str]:
    """Get the content of a file from the repository."""
    success, result = _get_github_api(
        f"contents/{file_path}?ref={ref}", headers, owner, repo
    )
    if not success or not result:
        return None

    try:
        content = base64.b64decode(result.get("content", "")).decode("utf-8")
        return content
    except Exception as e:
        print(f"Content decode error for {file_path}: {e}", file=sys.stderr)
        return None


def _find_pr_by_title(
    title: str, headers: Dict[str, str], owner: str, repo: str
) -> Optional[Dict]:
    """Find a PR by exact title match."""
    for state in ["closed", "open"]:
        success, prs = _get_github_api(
            f"pulls?state={state}&per_page=100", headers, owner, repo
        )
        if success and prs:
            for pr in prs:
                if pr.get("title") == title:
                    return pr
    return None


def _wait_for_workflow_completion(
    headers: Dict[str, str],
    owner: str,
    repo: str,
    workflow_file: str,
    max_wait: int = 600,
) -> bool:
    """Wait for GitHub Actions workflows to complete processing."""
    print(f"⏳ Waiting for {workflow_file} workflows to complete...")

    start_time = time.time()
    no_workflow_check_count = 0

    while time.time() - start_time < max_wait:
        try:
            success, response = _get_github_api(
                f"actions/workflows/{workflow_file}/runs?per_page=10",
                headers,
                owner,
                repo,
            )

            if success and response:
                runs = response.get("workflow_runs", [])
                if len(runs) > 0:
                    running_count = 0
                    completed_count = 0

                    for run in runs[:5]:  # Check recent runs
                        status = run["status"]
                        if status == "completed":
                            completed_count += 1
                        elif status in ["in_progress", "queued"]:
                            running_count += 1

                    print(
                        f"   Status: {completed_count} completed, {running_count} running/queued"
                    )

                    if running_count == 0:
                        print(f"✅ All {workflow_file} workflows completed.")
                        return True
                else:
                    # No workflow runs found
                    no_workflow_check_count += 1
                    if no_workflow_check_count == 1:
                        print(
                            "   No workflow runs found yet, waiting 5 seconds and checking once more..."
                        )
                        time.sleep(5)
                        continue
                    elif no_workflow_check_count >= 2:
                        print(
                            f"⚠️ No workflow runs detected after 2 checks. {workflow_file} may not have been triggered."
                        )
                        print("   Continuing with verification...")
                        return False

            print(f"⏳ Still waiting... ({int(time.time() - start_time)}s elapsed)")
            time.sleep(10)

        except Exception as e:
            print(f"⚠️ Error checking workflow status: {e}")
            time.sleep(10)

    print(f"⚠️ Workflow completion wait timed out after {max_wait}s")
    return False


def _verify_workflow_file(
    headers: Dict[str, str], owner: str, repo: str
) -> Tuple[bool, List[str]]:
    """Verify that the workflow file exists and has correct content."""
    print("\n📄 Verifying workflow file...")
    errors = []

    workflow_content = _get_file_content(
        ".github/workflows/pr-automation.yml", headers, owner, repo
    )

    if not workflow_content:
        return False, [
            "Workflow file .github/workflows/pr-automation.yml not found in main branch"
        ]

    print("   ✅ Workflow file exists in main branch")

    # Verify required components
    required_events = ["opened", "synchronize", "reopened"]
    required_jobs = [
        "code-quality",
        "testing-suite",
        "security-scan",
        "build-validation",
    ]

    if "pull_request:" not in workflow_content:
        errors.append("Workflow missing pull_request trigger")
    else:
        print("   ✅ Pull request trigger found")

    for event in required_events:
        if event not in workflow_content:
            errors.append(f"Missing event trigger: {event}")

    if not errors:
        print(f"   ✅ Required events found: {required_events}")

    for job in required_jobs:
        if f"{job}:" not in workflow_content:
            errors.append(f"Missing job: {job}")

    if not errors:
        print(f"   ✅ All 4 required jobs found: {required_jobs}")

    return len(errors) == 0, errors


def _verify_main_pr_merged(
    headers: Dict[str, str], owner: str, repo: str
) -> Tuple[bool, List[str], Optional[Dict]]:
    """Verify that the main PR implementing the workflow was merged."""
    print("\n🔍 Verifying main PR was merged...")
    errors = []

    pr = _find_pr_by_title(
        "Implement Pull Request Automation Workflow", headers, owner, repo
    )

    if not pr:
        return (
            False,
            ["Main PR 'Implement Pull Request Automation Workflow' not found"],
            None,
        )

    pr_number = pr["number"]
    print(f"   Found PR #{pr_number}")

    if not pr.get("merged_at", False):
        errors.append(f"PR #{pr_number} was not merged")
    else:
        print(f"   ✅ PR #{pr_number} was merged")

    if pr.get("head", {}).get("ref") != "pr-automation-workflow":
        errors.append(f"PR #{pr_number} was not from pr-automation-workflow branch")
    else:
        print("   ✅ PR was from pr-automation-workflow branch")

    if pr.get("base", {}).get("ref") != "main":
        errors.append(f"PR #{pr_number} was not merged to main branch")
    else:
        print("   ✅ PR was merged to main branch")

    return len(errors) == 0, errors, pr


def _verify_workflow_runs(
    pr_data: Dict, headers: Dict[str, str], owner: str, repo: str
) -> Tuple[bool, List[str]]:
    """Verify that workflow runs occurred for the PR and all 4 jobs ran in parallel."""
    print("\n⚙️ Verifying workflow runs...")
    errors = []

    pr_number = pr_data["number"]

    # Get workflow runs for the PR
    success, runs_response = _get_github_api(
        "actions/runs?event=pull_request&per_page=50", headers, owner, repo
    )

    if not success:
        return False, ["Failed to fetch workflow runs"]

    pr_runs = []
    pr_head_sha = pr_data.get("head", {}).get("sha")

    for run in runs_response.get("workflow_runs", []):
        # Method 1: Check if this run is associated with the PR's head SHA
        if pr_head_sha and run.get("head_sha") == pr_head_sha:
            pr_runs.append(run)
            continue

        # Method 2: Check pull_requests field (may be empty for merged PRs)
        for pr in run.get("pull_requests", []):
            if pr.get("number") == pr_number:
                pr_runs.append(run)
                break

    if not pr_runs:
        # Try alternative approach: get runs by head branch
        pr_head_ref = pr_data.get("head", {}).get("ref")
        if pr_head_ref:
            success, branch_runs = _get_github_api(
                f"actions/runs?branch={pr_head_ref}&per_page=50", headers, owner, repo
            )
            if success:
                pr_runs = branch_runs.get("workflow_runs", [])

    if not pr_runs:
        return False, [
            f"No workflow runs found for PR #{pr_number} (head_sha: {pr_head_sha})"
        ]

    print(f"   Found {len(pr_runs)} workflow run(s) for PR #{pr_number}")

    # Check the most recent run
    latest_run = pr_runs[0]  # GitHub returns runs in descending order by creation time
    run_id = latest_run["id"]

    if latest_run["conclusion"] != "success":
        errors.append(
            f"Latest workflow run {run_id} did not succeed (conclusion: {latest_run['conclusion']})"
        )
    else:
        print(f"   ✅ Latest workflow run {run_id} succeeded")

    # Get jobs for this run
    success, jobs_response = _get_github_api(
        f"actions/runs/{run_id}/jobs", headers, owner, repo
    )

    if not success:
        return False, ["Failed to fetch workflow jobs"]

    jobs = jobs_response.get("jobs", [])
    expected_jobs = [
        "code-quality",
        "testing-suite",
        "security-scan",
        "build-validation",
    ]

    found_jobs = [job["name"] for job in jobs]
    missing_jobs = [job for job in expected_jobs if job not in found_jobs]

    if missing_jobs:
        errors.append(f"Missing jobs: {missing_jobs}. Found: {found_jobs}")
    else:
        print(f"   ✅ All 4 required jobs found: {found_jobs}")

    # Verify all jobs succeeded
    failed_jobs = [job["name"] for job in jobs if job["conclusion"] != "success"]
    if failed_jobs:
        errors.append(f"Failed jobs: {failed_jobs}")
    else:
        print("   ✅ All jobs completed successfully")

    # Verify jobs ran in parallel (started around the same time)
    if len(jobs) >= 4:
        start_times = [job["started_at"] for job in jobs if job["started_at"]]
        if len(start_times) >= 4:
            # Check if all jobs started within 2 minutes of each other
            import datetime

            start_dt = [
                datetime.datetime.fromisoformat(t.replace("Z", "+00:00"))
                for t in start_times
            ]
            time_diff = max(start_dt) - min(start_dt)
            if time_diff.total_seconds() > 120:  # 2 minutes
                errors.append(
                    f"Jobs did not run in parallel (time span: {time_diff.total_seconds()}s)"
                )
            else:
                print("   ✅ Jobs ran in parallel")
        else:
            errors.append("Not enough job start times to verify parallel execution")

    return len(errors) == 0, errors


def _verify_pr_comments(
    pr_data: Dict, headers: Dict[str, str], owner: str, repo: str
) -> Tuple[bool, List[str]]:
    """Verify that PR has required automation comments from GitHub Actions bot."""
    print("\n💬 Verifying PR comments...")
    errors = []

    pr_number = pr_data["number"]

    success, comments = _get_github_api(
        f"issues/{pr_number}/comments", headers, owner, repo
    )

    if not success:
        return False, ["Failed to fetch PR comments"]

    # Filter for GitHub Actions bot comments only
    bot_comments = [
        comment
        for comment in comments
        if comment.get("user", {}).get("login") == "github-actions[bot]"
    ]

    if not bot_comments:
        return False, ["No comments found from GitHub Actions bot"]

    print(f"   Found {len(bot_comments)} comment(s) from GitHub Actions bot")

    # Get all bot comment bodies
    bot_comment_bodies = [comment.get("body", "") for comment in bot_comments]

    # Define required automation reports with their keywords
    required_reports = [
        {
            "name": "Code Quality Report",
            "main_keywords": ["Code Quality Report"],
            "sub_keywords": ["ESLint", "Prettier"],
            "found": False,
        },
        {
            "name": "Test Coverage Report",
            "main_keywords": ["Test Coverage Report"],
            "sub_keywords": [],
            "found": False,
        },
        {
            "name": "Security Scan Report",
            "main_keywords": ["Security Scan Report"],
            "sub_keywords": ["Vulnerabilities", "Dependencies"],
            "found": False,
        },
        {
            "name": "Build Validation Report",
            "main_keywords": ["Build Validation"],
            "sub_keywords": [],
            "found": False,
        },
    ]

    # Check each bot comment for the required reports
    for comment_body in bot_comment_bodies:
        for report in required_reports:
            # Check if this comment contains any of the main keywords for this report
            if any(keyword in comment_body for keyword in report["main_keywords"]):
                if not report["found"]:  # Only mark as found once
                    report["found"] = True
                    print(f"   ✅ Found {report['name']}")

                    # Verify sub-keywords are present in this specific comment
                    for sub_keyword in report["sub_keywords"]:
                        if sub_keyword not in comment_body:
                            errors.append(
                                f"Missing sub-keyword '{sub_keyword}' in {report['name']}"
                            )
                        else:
                            print(
                                f"   ✅ Found sub-keyword '{sub_keyword}' in {report['name']}"
                            )

    # Check if all required reports were found
    for report in required_reports:
        if not report["found"]:
            errors.append(f"Missing {report['name']} from GitHub Actions bot")

    # Verify we have exactly 4 automation reports
    found_reports = sum(1 for report in required_reports if report["found"])
    if found_reports != 4:
        errors.append(f"Expected 4 automation reports, but found {found_reports}")
    else:
        print("   ✅ All 4 required automation reports found from GitHub Actions bot")

    return len(errors) == 0, errors


def _create_test_pr(
    title: str,
    branch: str,
    content: str,
    file_path: str,
    headers: Dict[str, str],
    owner: str,
    repo: str,
) -> Optional[int]:
    """Create a test PR with specific content designed to fail a check."""
    print(f"   Creating test PR: {title}")

    # Create branch
    success, main_ref = _get_github_api("git/ref/heads/main", headers, owner, repo)
    if not success:
        print("   ❌ Failed to get main branch reference")
        return None

    main_sha = main_ref["object"]["sha"]

    branch_data = {"ref": f"refs/heads/{branch}", "sha": main_sha}

    success, _ = _post_github_api("git/refs", headers, owner, repo, branch_data)
    if not success:
        # Branch might already exist, try to delete and recreate
        print(f"   Branch {branch} already exists, trying to delete and recreate...")
        import requests

        # Force delete existing branch
        delete_url = (
            f"https://api.github.com/repos/{owner}/{repo}/git/refs/heads/{branch}"
        )
        delete_response = requests.delete(delete_url, headers=headers)

        if delete_response.status_code == 204:
            print(f"   Successfully deleted existing branch {branch}")
            # Wait a moment for deletion to complete
            import time

            time.sleep(2)

            # Try creating again
            success, _ = _post_github_api("git/refs", headers, owner, repo, branch_data)
            if not success:
                print(f"   ❌ Failed to create branch {branch} after cleanup")
                return None
            else:
                print(f"   ✅ Successfully created branch {branch} after cleanup")
        else:
            print(
                f"   ❌ Failed to delete existing branch {branch}: {delete_response.status_code}"
            )
            return None

    # Create or update file
    file_content = base64.b64encode(content.encode()).decode()

    file_data = {
        "message": f"Test commit for {title}",
        "content": file_content,
        "branch": branch,
    }

    # Check if file exists in main branch first
    success, file_info = _get_github_api(
        f"contents/{file_path}?ref=main", headers, owner, repo
    )
    if success and file_info:
        # File exists, need SHA for update
        file_data["sha"] = file_info["sha"]
        print(f"   File {file_path} exists, updating with SHA")
    else:
        print(f"   Creating new file {file_path}")

    # Use PUT method for file creation/update
    url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}"
    try:
        import requests

        response = requests.put(url, headers=headers, json=file_data)
        if response.status_code in [200, 201]:
            print(f"   ✅ Successfully created/updated file {file_path}")
        else:
            print(
                f"   ❌ Failed to create/update file {file_path}: {response.status_code} - {response.text}"
            )
            return None
    except Exception as e:
        print(f"   ❌ Exception creating file {file_path}: {e}")
        return None

    # Create PR
    pr_data = {
        "title": title,
        "head": branch,
        "base": "main",
        "body": f"Test PR to validate that {title.split(':')[1].strip()} check fails correctly.",
    }

    success, pr_response = _post_github_api("pulls", headers, owner, repo, pr_data)
    if not success:
        print("   ❌ Failed to create PR")
        return None

    pr_number = pr_response["number"]
    print(f"   ✅ Created test PR #{pr_number}")
    return pr_number


def _close_pr(pr_number: int, headers: Dict[str, str], owner: str, repo: str) -> bool:
    """Close a PR."""
    success, _ = _patch_github_api(
        f"pulls/{pr_number}", headers, owner, repo, {"state": "closed"}
    )
    return success


def _run_unit_tests(
    headers: Dict[str, str], owner: str, repo: str
) -> Tuple[bool, List[str]]:
    """Create test PRs to verify workflow correctly fails on bad code."""
    print("\n🧪 Running unit tests with failing PRs...")
    errors = []
    created_prs = []

    test_cases = [
        {
            "title": "Test: Code Quality Failure",
            "branch": "test-code-quality-fail",
            "file_path": "src/lint-fail-test.js",
            "content": "// This file contains intentional ESLint violations\nvar unused_variable = 'this will trigger unused-vars rule'\nconsole.log('missing semicolon - will trigger semi rule')\nconst   badly_spaced   =   'too many spaces'\nif(true){console.log('missing spaces around braces')}\nfunction unusedFunction() { return 'unused'; }\neeval('alert(\"dangerous eval\")');\nwith (Math) { var x = cos(3 * PI) + sin(LN10) }\nvar a = 1; var a = 2; // redeclared variable",
            "expected_failure": "code-quality",
        },
        {
            "title": "Test: Testing Suite Failure",
            "branch": "test-testing-fail",
            "file_path": "tests/fail-test.test.js",
            "content": "const request = require('supertest');\n\ndescribe('Intentional Test Failures', () => {\n  test('This test should always fail', () => {\n    expect(2 + 2).toBe(5); // Intentionally wrong\n  });\n  \n  test('Another failing test', () => {\n    expect(true).toBe(false); // Intentionally wrong\n  });\n  \n  test('Math failure', () => {\n    expect(Math.max(1, 2, 3)).toBe(1); // Intentionally wrong\n  });\n});",
            "expected_failure": "testing-suite",
        },
        {
            "title": "Test: Security Scan Failure",
            "branch": "test-security-fail",
            "file_path": "src/security-fail-test.js",
            "content": "// This file contains patterns that should trigger secret detection\nconst hardcodedPassword = 'admin123password';\nconst fakeApiKey = 'sk_test_' + 'fake123key456here789';\nconst awsLikeKey = 'AKIA' + 'FAKEKEY7EXAMPLE';\nconst dbPassword = 'password' + '=' + 'supersecret123';\nconst tokenPattern = 'token' + '=' + 'ghp_1234567890abcdef';\n\n// These patterns should trigger secret detection\nconsole.log('Password:', hardcodedPassword);\nconsole.log('API Key:', fakeApiKey);\nconsole.log('AWS Key:', awsLikeKey);\nconsole.log('DB Password:', dbPassword);\nconsole.log('Token:', tokenPattern);\n\nmodule.exports = {\n  password: hardcodedPassword,\n  apiKey: fakeApiKey\n};",
            "expected_failure": "security-scan",
        },
        {
            "title": "Test: Build Validation Failure",
            "branch": "test-build-fail",
            "file_path": "src/build-fail-test.js",
            "content": "// This file will cause build/startup failures\nconst express = require('express');\nconst nonExistentModule = require('this-module-does-not-exist-anywhere');\nconst anotherMissing = require('@fake/missing-package');\n\n// This will cause runtime errors during startup\nconst app = express();\n\n// Define a route that will cause issues\napp.get('/test', (req, res) => {\n  // Try to use non-existent modules\n  nonExistentModule.doSomething();\n  anotherMissing.initialize();\n  res.send('This should never work');\n});\n\n// Override the listen method to always fail\nconst originalListen = app.listen;\napp.listen = function(port, callback) {\n  console.log('Attempting to start server...');\n  // This will crash during build validation\n  throw new Error('Intentional build failure for testing');\n};\n\nmodule.exports = app;",
            "expected_failure": "build-validation",
        },
    ]

    for test_case in test_cases:
        pr_number = _create_test_pr(
            test_case["title"],
            test_case["branch"],
            test_case["content"],
            test_case["file_path"],
            headers,
            owner,
            repo,
        )

        if pr_number:
            created_prs.append(pr_number)
        else:
            errors.append(f"Failed to create test PR: {test_case['title']}")

    if created_prs:
        print(f"   Created {len(created_prs)} test PRs, waiting for workflows...")

        # Wait a bit for workflows to start
        time.sleep(5)

        # Wait for workflows to complete
        _wait_for_workflow_completion(
            headers, owner, repo, "pr-automation.yml", max_wait=300
        )

        # Verify each test PR failed appropriately
        for i, pr_number in enumerate(created_prs):
            test_case = test_cases[i]
            print(
                f"   Checking test PR #{pr_number} ({test_case['expected_failure']} failure)..."
            )

            # Get workflow runs for this PR
            success, runs_response = _get_github_api(
                "actions/runs?event=pull_request&per_page=20", headers, owner, repo
            )

            if success:
                pr_runs = []
                for run in runs_response.get("workflow_runs", []):
                    # Check pull_requests field
                    for pr in run.get("pull_requests", []):
                        if pr.get("number") == pr_number:
                            pr_runs.append(run)
                            break

                # If no runs found via pull_requests, try matching by branch
                if not pr_runs:
                    branch_name = test_case["branch"]
                    for run in runs_response.get("workflow_runs", []):
                        if run.get("head_branch") == branch_name:
                            pr_runs.append(run)

                if pr_runs:
                    latest_run = pr_runs[0]
                    if latest_run["conclusion"] != "failure":
                        errors.append(
                            f"Test PR #{pr_number} should have failed but got: {latest_run['conclusion']}"
                        )
                    else:
                        print(f"   ✅ Test PR #{pr_number} correctly failed")
                else:
                    errors.append(f"No workflow runs found for test PR #{pr_number}")

        # Clean up test PRs and branches
        print("   Cleaning up test PRs and branches...")
        for i, pr_number in enumerate(created_prs):
            if _close_pr(pr_number, headers, owner, repo):
                print(f"   ✅ Closed test PR #{pr_number}")
            else:
                print(f"   ⚠️ Failed to close test PR #{pr_number}")

            # Delete test branch
            branch_name = test_cases[i]["branch"]
            import requests

            url = f"https://api.github.com/repos/{owner}/{repo}/git/refs/heads/{branch_name}"
            response = requests.delete(url, headers=headers)
            if response.status_code == 204:
                print(f"   ✅ Deleted test branch {branch_name}")
            else:
                print(f"   ⚠️ Failed to delete test branch {branch_name}")

    return len(errors) == 0, errors


def verify() -> bool:
    """
    Verify that the PR automation workflow is working correctly.
    """
    load_dotenv(".mcp_env")

    github_token = os.environ.get("MCP_GITHUB_TOKEN")
    if not github_token:
        print("Error: MCP_GITHUB_TOKEN environment variable not set", file=sys.stderr)
        return False

    github_org = os.environ.get("GITHUB_EVAL_ORG")
    if not github_org:
        print("Error: GITHUB_EVAL_ORG environment variable not set", file=sys.stderr)
        return False

    owner = github_org
    repo = "mcpmark-cicd"

    headers = {
        "Authorization": f"token {github_token}",
        "Accept": "application/vnd.github.v3+json",
    }

    print("🔍 Starting PR Automation Workflow Verification")
    print("=" * 60)

    all_passed = True

    # 1. Verify workflow file exists
    workflow_ok, workflow_errors = _verify_workflow_file(headers, owner, repo)
    if not workflow_ok:
        all_passed = False
        print("❌ Workflow File Verification Failed:")
        for error in workflow_errors:
            print(f"   - {error}")
    else:
        print("✅ Workflow File Verification Passed")

    # 2. Verify main PR was merged
    pr_ok, pr_errors, pr_data = _verify_main_pr_merged(headers, owner, repo)
    if not pr_ok:
        all_passed = False
        print("❌ Main PR Verification Failed:")
        for error in pr_errors:
            print(f"   - {error}")
    else:
        print("✅ Main PR Verification Passed")

    # 3. Verify workflow runs (only if PR verification passed)
    if pr_ok and pr_data:
        runs_ok, runs_errors = _verify_workflow_runs(pr_data, headers, owner, repo)
        if not runs_ok:
            all_passed = False
            print("❌ Workflow Runs Verification Failed:")
            for error in runs_errors:
                print(f"   - {error}")
        else:
            print("✅ Workflow Runs Verification Passed")

        # 4. Verify PR comments
        comments_ok, comments_errors = _verify_pr_comments(
            pr_data, headers, owner, repo
        )
        if not comments_ok:
            all_passed = False
            print("❌ PR Comments Verification Failed:")
            for error in comments_errors:
                print(f"   - {error}")
        else:
            print("✅ PR Comments Verification Passed")

    # 5. Run unit tests with failing PRs
    tests_ok, tests_errors = _run_unit_tests(headers, owner, repo)
    if not tests_ok:
        all_passed = False
        print("❌ Unit Tests Failed:")
        for error in tests_errors:
            print(f"   - {error}")
    else:
        print("✅ Unit Tests Passed")

    print("\n" + "=" * 60)
    if all_passed:
        print("🎉 All PR Automation Workflow verifications PASSED!")
        print("\n📋 Summary:")
        print("   ✅ Workflow file exists with correct triggers and 4 parallel jobs")
        print("   ✅ Main PR was merged from pr-automation-workflow to main")
        print("   ✅ Workflow runs show all 4 jobs executed in parallel and succeeded")
        print("   ✅ PR comments contain required automation reports")
        print("   ✅ Unit tests confirmed workflow correctly fails on problematic code")
        print("\n🤖 The GitHub Actions PR automation workflow is working correctly!")
    else:
        print("❌ PR Automation Workflow verification FAILED!")
        print("   Some components did not meet the expected automation requirements.")

    return all_passed


if __name__ == "__main__":
    success = verify()
    sys.exit(0 if success else 1)