Skip to content

Approve Test Queue #37451

Approve Test Queue

Approve Test Queue #37451

# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Approve Test Queue
on:
schedule:
- cron: "*/5 * * * *" # Runs every 5 minutes
workflow_dispatch: # Allows manual triggering
jobs:
approve-queue:
runs-on: ubuntu-latest
environment: main
if: github.repository == 'NVIDIA-NeMo/Megatron-Bridge'
strategy:
matrix:
branch: [main, others, workflow_dispatch]
contributor_type: [internal, external]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install requests
- name: Download SSO users list
run: |
gh release download v0.1.0 \
--repo NVIDIA-GitHub-Management/github-audits \
--pattern users_sso.json \
--output users_sso.json || echo '{}' > users_sso.json
env:
GH_TOKEN: ${{ secrets.NVIDIA_MANAGEMENT_ORG_PAT }}
- name: Approve waiting deployments
env:
GITHUB_TOKEN: ${{ secrets.PAT }}
MAX_CONCURRENCY: ${{ vars.MAX_CONCURRENCY || 1 }}
MAX_CONCURRENCY_EXTERNAL: ${{ vars.MAX_CONCURRENCY_EXTERNAL || 3 }}
MAX_CONCURRENCY_WORKFLOW_DISPATCH: ${{ vars.MAX_CONCURRENCY || 1 }}
CONTRIBUTOR_TYPE: ${{ matrix.contributor_type }}
MATRIX_BRANCH: ${{ matrix.branch }}
SSO_USERS_FILE: users_sso.json
PYTHONUNBUFFERED: 1
shell: python
run: |
import os
import json
import requests
import re
# GitHub API configuration
GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
REPO = os.environ["GITHUB_REPOSITORY"]
CONTRIBUTOR_TYPE = os.environ["CONTRIBUTOR_TYPE"]
MATRIX_BRANCH = os.environ["MATRIX_BRANCH"]
if MATRIX_BRANCH == "workflow_dispatch":
MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY_WORKFLOW_DISPATCH"])
API_BASE = f"https://api.github.com/repos/{REPO}"
WORKFLOW_NAME = "CICD NeMo"
else:
if CONTRIBUTOR_TYPE == "external":
MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY_EXTERNAL"])
else:
MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY"])
API_BASE = "https://api.github.com/repos/NVIDIA-NeMo/Megatron-Bridge"
WORKFLOW_NAME = "CICD NeMo"
# Load SSO users for internal/external classification
with open(os.environ["SSO_USERS_FILE"]) as f:
sso_users = json.load(f)
# Headers for GitHub API
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28",
}
def make_request(endpoint, method="GET", data=None):
"""Make a request to the GitHub API with error handling."""
url = f"{API_BASE}/{endpoint}"
try:
if method == "GET":
response = requests.get(url, headers=headers)
else:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error making request to {endpoint}: {str(e)}")
if hasattr(e.response, 'text'):
print(f"Response: {e.response.text}")
return None
def is_internal_contributor(pr_info):
"""Return True if the PR author is a member of NVIDIA or NVIDIA-NeMo org (is_org_member)."""
login = pr_info.get("user", {}).get("login", "")
org_roles = sso_users.get(login, {}).get("org_roles", [])
return any(role in ("NVIDIA:Member", "NVIDIA-NeMo:Member") for role in org_roles)
def get_pr_base_branch(workflow_run):
"""
Return the base branch of the PR associated with a workflow run, or None.
Extracts PR number from head branch like 'pull-request/1913' and fetches PR info.
Returns (base_branch, pr_info) tuple, or (None, None) if not a PR run.
"""
print(workflow_run.get("head_branch", ""))
head_branch = workflow_run.get("head_branch", "")
match = re.match(r"pull-request/(\d+)", head_branch)
if not match:
return None, None # Not a PR branch pattern
pr_number = int(match.group(1))
# Fetch PR info from GitHub API
pr_info = make_request(f"pulls/{pr_number}")
if not pr_info:
print(f"Failed to fetch PR #{pr_number}")
return None, None
base_branch = pr_info.get("base", {}).get("ref")
return base_branch, pr_info
def is_internal_actor(workflow_run):
"""Return True if the actor who triggered the workflow run is an NVIDIA/NVIDIA-NeMo member."""
login = (workflow_run.get("triggering_actor") or workflow_run.get("actor") or {}).get("login", "")
org_roles = sso_users.get(login, {}).get("org_roles", [])
return any(role in ("NVIDIA:Member", "NVIDIA-NeMo:Member") for role in org_roles)
def is_pr_run(workflow_run):
"""Return True if this run was triggered by a PR (head_branch matches pull-request/<number>)."""
return bool(re.match(r"pull-request/\d+", workflow_run.get("head_branch", "")))
def is_workflow_dispatch_run(workflow_run):
"""Return True if this run was manually triggered (head_branch starts with mcore-testing-)."""
return workflow_run.get("head_branch", "").startswith("mcore-testing-")
def matches_queue(workflow_run, target_branch, contributor_type):
"""
Return True if the workflow run belongs to this queue cell:
matching target branch AND matching contributor type (internal/external).
workflow_dispatch runs (head_branch: mcore-testing-*) are routed to the 'workflow_dispatch' queue only.
PR runs (head_branch: pull-request/<n>) are routed to 'main' or 'others' queues only.
"""
if target_branch == "workflow_dispatch":
if not is_workflow_dispatch_run(workflow_run):
return False
internal = is_internal_actor(workflow_run)
contributor_match = (contributor_type == "internal") == internal
if contributor_match:
actor = (workflow_run.get("triggering_actor") or workflow_run.get("actor") or {}).get("login", "unknown")
print(f"workflow_dispatch run by {actor}, contributor_type={contributor_type} (internal={internal})")
return contributor_match
# PR queue: skip non-PR runs
if not is_pr_run(workflow_run):
return False
base_branch, pr_info = get_pr_base_branch(workflow_run)
if base_branch is None:
return False
branch_match = (
(base_branch == target_branch) or
(base_branch != "main" and base_branch != "dev" and target_branch == "others")
)
if not branch_match:
return False
pr_number = re.match(r"pull-request/(\d+)", workflow_run.get("head_branch", "")).group(1)
internal = is_internal_contributor(pr_info)
contributor_match = (contributor_type == "internal") == internal
if branch_match and contributor_match:
print(f"PR #{pr_number} targets {target_branch}, contributor_type={contributor_type} (internal={internal})")
return branch_match and contributor_match
# Get current running and queued workflows
print(f"\n=== Queue cell: branch=${{ matrix.branch }}, contributor_type={CONTRIBUTOR_TYPE} ===")
print("Fetching workflow runs...")
queued_workflow_runs = make_request("actions/runs?status=queued").get("workflow_runs", [])
in_progress_workflow_runs = make_request("actions/runs?status=in_progress").get("workflow_runs", [])
def log_and_filter(runs, label):
cicd_runs = [r for r in runs if r["name"] == WORKFLOW_NAME]
print(f"{label}: {len(runs)} total, {len(cicd_runs)} {WORKFLOW_NAME}")
for r in cicd_runs:
actor = (r.get("triggering_actor") or r.get("actor") or {}).get("login", "unknown")
matched = matches_queue(r, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)
print(f" run={r['id']} head_branch={r.get('head_branch')} event={r.get('event')} actor={actor} -> matched={matched}")
return [r for r in cicd_runs if matches_queue(r, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)]
queued_workflow_runs = log_and_filter(queued_workflow_runs, "queued")
in_progress_workflow_runs = log_and_filter(in_progress_workflow_runs, "in_progress")
# Count running and queued workflows
queued_workflows = len(queued_workflow_runs)
in_progress_workflows = len(in_progress_workflow_runs)
total_workflows = queued_workflows + in_progress_workflows
print(f"Current queued workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {queued_workflows}")
print(f"Current running workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {in_progress_workflows}")
print(f"Total workflows: {total_workflows}")
print(f"Max concurrency: {MAX_CONCURRENCY}")
if total_workflows >= MAX_CONCURRENCY:
print("Maximum concurrency reached, no new approvals will be made")
exit(0)
# Get waiting CI workflows for test environment
print("Fetching waiting deployments...")
pending_workflows = log_and_filter(
make_request("actions/runs?status=waiting").get("workflow_runs", []),
"waiting"
)
# Sort deployments by creation date (oldest first)
print("Sorting workflows...")
pending_workflows = sorted(pending_workflows, key=lambda x: x["created_at"])
# Process each deployment
print(f"Processing {len(pending_workflows)} pending workflows...")
for workflow in pending_workflows:
if total_workflows >= MAX_CONCURRENCY:
print("Maximum concurrency reached, stopping approvals")
break
workflow_id = workflow["id"]
workflow_name = workflow["display_title"]
print(f"Approving workflow {workflow_name} with Run Id: {workflow_id}")
deployment_url = f"actions/runs/{workflow_id}/pending_deployments"
deployment = make_request(deployment_url)[0]
environment_id = deployment["environment"]["id"]
# Approve the deployment
status_data = {
"environment_ids": [environment_id],
"state": "approved",
"comment": "Automatically approved by queue manager"
}
result = make_request(deployment_url, method="POST", data=status_data)
if result:
total_workflows += 1
else:
print(f"Failed to approve deployment {deployment['id']}")
exit(1)
notify:
if: failure()
runs-on: ubuntu-latest
needs: [approve-queue]
steps:
- name: Notify
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_CI_CHANNEL_WEBHOOK }}
SLACK_WEBHOOK_ADMIN: <!subteam^${{ secrets.SLACK_TEAM_GROUP_ID }}>
GITHUB_RUN_ID: ${{ github.run_id }}
GITHUB_REPOSITORY: ${{ github.repository }}
run: |
curl -X POST \
-H 'Content-type: application/json' \
--data "{\"text\":\":robot_joy: <https://github.com/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}|Test-queue-approval-bot workflow> failed. Please review manually.\n\ncc ${SLACK_WEBHOOK_ADMIN}\"}" \
$SLACK_WEBHOOK