# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name: Approve Test Queue on: schedule: - cron: "*/5 * * * *" # Runs every 5 minutes workflow_dispatch: # Allows manual triggering jobs: approve-queue: runs-on: ubuntu-latest environment: main strategy: matrix: branch: [main, dev, others] steps: - name: Checkout repository uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v5 with: python-version: "3.12" - name: Install dependencies run: | python -m pip install --upgrade pip pip install requests - name: Approve waiting deployments env: GITHUB_TOKEN: ${{ secrets.PAT }} MAX_CONCURRENCY: ${{ vars.MAX_CONCURRENCY || 1 }} PYTHONUNBUFFERED: 1 shell: python run: | import os import requests import re # GitHub API configuration GITHUB_TOKEN = os.environ["GITHUB_TOKEN"] REPO = os.environ["GITHUB_REPOSITORY"] MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY"]) // 2 API_BASE = f"https://api.github.com/repos/NVIDIA/Megatron-LM" # Headers for GitHub API headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json", "X-GitHub-Api-Version": "2022-11-28", } def make_request(endpoint, method="GET", data=None): """Make a request to the GitHub API with error handling.""" url = f"{API_BASE}/{endpoint}" try: if method == "GET": response = requests.get(url, headers=headers) else: response = requests.post(url, headers=headers, json=data) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error making request to {endpoint}: {str(e)}") if hasattr(e.response, 'text'): print(f"Response: {e.response.text}") return None def is_pr_targeting_branch(workflow_run, target_branch): """ Check if a workflow run belongs to a PR targeting the given branch. Extract PR number from head branch like 'pull-request/1913' and verify base branch. """ print(workflow_run.get("head_branch", "")) head_branch = workflow_run.get("head_branch", "") match = re.match(r"pull-request/(\d+)", head_branch) if not match: return False # Not a PR branch pattern pr_number = int(match.group(1)) # Fetch PR info from GitHub API pr_info = make_request(f"pulls/{pr_number}") if not pr_info: print(f"Failed to fetch PR #{pr_number}") return False base_branch = pr_info.get("base", {}).get("ref") if ( (base_branch == target_branch) or (base_branch != "main" and base_branch != "dev" and target_branch == "others") ): print(f"PR #{pr_number} targets {target_branch}") return True return False # Get current running and queued workflows print("Fetching workflow runs...") queued_workflow_runs = make_request("actions/runs?status=queued").get("workflow_runs", []) in_progress_workflow_runs = make_request("actions/runs?status=in_progress").get("workflow_runs", []) # Filter for workflows belonging to PRs targeting ${{ matrix.branch }} queued_workflow_runs = [run for run in queued_workflow_runs if run["name"] == "CICD Megatron-LM" and is_pr_targeting_branch(run, "${{ matrix.branch }}")] in_progress_workflow_runs = [run for run in in_progress_workflow_runs if run["name"] == "CICD Megatron-LM" and is_pr_targeting_branch(run, "${{ matrix.branch }}")] # Count running and queued workflows queued_workflows = len(queued_workflow_runs) in_progress_workflows = len(in_progress_workflow_runs) total_workflows = queued_workflows + in_progress_workflows print(f"Current queued workflows (PRs targeting ${{ matrix.branch }}): {queued_workflows}") print(f"Current running workflows (PRs targeting ${{ matrix.branch }}): {in_progress_workflows}") print(f"Total workflows: {total_workflows}") print(f"Max concurrency: {MAX_CONCURRENCY}") if total_workflows >= MAX_CONCURRENCY: print("Maximum concurrency reached, no new approvals will be made") exit(0) # Get waiting CI workflows for test environment print("Fetching deployments...") pending_workflows = make_request("actions/runs?status=waiting").get("workflow_runs", []) print("Pending workflows:", len(pending_workflows)) pending_workflows = [run for run in pending_workflows if run["name"] == "CICD Megatron-LM" and is_pr_targeting_branch(run, "${{ matrix.branch }}")] # Sort deployments by creation date (oldest first) print("Sorting workflows...") pending_workflows = sorted(pending_workflows, key=lambda x: x["created_at"]) # Process each deployment print(f"Processing {len(pending_workflows)} pending workflows...") for workflow in pending_workflows: if total_workflows >= MAX_CONCURRENCY: print("Maximum concurrency reached, stopping approvals") break workflow_id = workflow["id"] workflow_name = workflow["display_title"] print(f"Approving workflow {workflow_name} with Run Id: {workflow_id}") deployment_url = f"actions/runs/{workflow_id}/pending_deployments" deployment = make_request(deployment_url)[0] environment_id = deployment["environment"]["id"] # Approve the deployment status_data = { "environment_ids": [environment_id], "state": "approved", "comment": "Automatically approved by queue manager" } result = make_request(deployment_url, method="POST", data=status_data) if result: total_workflows += 1 else: print(f"Failed to approve deployment {deployment['id']}") exit(1) notify: if: failure() runs-on: ubuntu-latest needs: [approve-queue] steps: - name: Notify env: SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }} SLACK_WEBHOOK_ADMIN: GITHUB_RUN_ID: ${{ github.run_id }} GITHUB_REPOSITORY: ${{ github.repository }} run: | curl -X POST \ -H 'Content-type: application/json' \ --data "{\"text\":\":robot_joy: failed. Please review manually.\n\ncc ${SLACK_WEBHOOK_ADMIN}\"}" \ $SLACK_WEBHOOK