Skip to content

ci: gate images on the system closure, not the OCI tar #3402

ci: gate images on the system closure, not the OCI tar

ci: gate images on the system closure, not the OCI tar #3402

Workflow file for this run

name: AI review gate
on:
workflow_call:
inputs:
caller_event_name:
description: Original event name from the caller.
required: true
type: string
model:
description: Model passed to the current OpenAI-backed reviewer.
required: false
type: string
default: ""
effort:
description: Reasoning effort passed to the current OpenAI-backed reviewer.
required: false
type: string
default: ""
required_check_name:
description: Final gate job name that branch protection should require.
required: false
type: string
default: ai review approved
secrets:
openai_api_key:
description: OpenAI API key used by the current reviewer implementation.
required: false
repository_token:
description: GitHub token from the caller with pull request write access.
required: true
pull_request:
branches:
- main
types:
- opened
- synchronize
- reopened
- ready_for_review
pull_request_target:
branches:
- main
types:
- opened
- synchronize
- reopened
- ready_for_review
pull_request_review:
types:
- submitted
- edited
- dismissed
permissions:
contents: read
concurrency:
group: ai-review-${{ inputs.caller_event_name || github.event_name }}-${{ github.event.pull_request.number }}
cancel-in-progress: true
jobs:
ai-structured-review:
name: ai structured review
if: >-
github.event.pull_request.draft == false &&
(inputs.caller_event_name || github.event_name) == 'pull_request' &&
github.event.pull_request.head.repo.full_name == github.repository &&
github.event.pull_request.user.login != 'dependabot[bot]'
runs-on: ubuntu-latest
permissions:
contents: read
env:
AI_REVIEW_EFFORT: ${{ inputs.effort || vars.AI_REVIEW_EFFORT || 'xhigh' }}
BASE_SHA: ${{ github.event.pull_request.base.sha }}
AI_REVIEW_MODEL: ${{ inputs.model || vars.AI_REVIEW_MODEL || 'gpt-5.5' }}
HEAD_SHA: ${{ github.event.pull_request.head.sha }}
PR_NUMBER: ${{ github.event.pull_request.number }}
REPOSITORY: ${{ github.repository }}
steps:
- name: Check AI review availability
env:
OPENAI_API_KEY: ${{ secrets.openai_api_key || secrets.OPENAI_API_KEY }}
run: |
if [ -z "${OPENAI_API_KEY:-}" ]; then
echo "::error::Missing OpenAI API key secret; AI review cannot run."
exit 1
fi
- name: Checkout base commit
uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6
with:
fetch-depth: 0
persist-credentials: false
ref: ${{ github.event.pull_request.base.sha }}
- name: Fetch pull request refs
env:
GITHUB_TOKEN: ${{ secrets.repository_token || github.token }}
run: |
set -euo pipefail
auth_header="$(printf 'x-access-token:%s' "${GITHUB_TOKEN}" | base64 | tr -d '\n')"
git \
-c "http.https://github.com/.extraheader=AUTHORIZATION: basic ${auth_header}" \
fetch --no-tags origin \
"+refs/pull/${PR_NUMBER}/head:refs/remotes/ai-review-pr/${PR_NUMBER}/head"
fetched_head="$(git rev-parse "refs/remotes/ai-review-pr/${PR_NUMBER}/head")"
if [ "${fetched_head}" != "${HEAD_SHA}" ]; then
echo "::error::Fetched pull request head ${fetched_head}, expected ${HEAD_SHA}."
exit 1
fi
- name: Checkout pull request head for review exploration
run: git checkout --detach "${HEAD_SHA}"
- name: Restore trusted reviewer instructions
run: |
set -euo pipefail
resolve_base_instruction_path() {
local resolved="$1"
for _ in {1..20}; do
local entry mode object_type link_target
entry="$(git ls-tree "${BASE_SHA}" -- "${resolved}")"
if [ -z "${entry}" ]; then
echo "::error::Trusted instruction path ${resolved} is missing from base ${BASE_SHA}."
return 1
fi
IFS=$' \t' read -r mode object_type _ <<< "${entry}"
if [ "${mode}" != "120000" ]; then
if [ "${object_type}" != "blob" ]; then
echo "::error::Trusted instruction path ${resolved} is ${object_type}, expected a file."
return 1
fi
printf '%s\n' "${resolved}"
return 0
fi
link_target="$(git show "${BASE_SHA}:${resolved}")"
resolved="$(
python3 - "${resolved}" "${link_target}" <<'PY'
import posixpath
import sys
current, target = sys.argv[1:]
if target.startswith("/"):
raise SystemExit(f"absolute symlink target is not allowed: {target}")
normalized = posixpath.normpath(posixpath.join(posixpath.dirname(current), target))
if normalized == ".." or normalized.startswith("../"):
raise SystemExit(f"symlink target escapes repository: {target}")
print(normalized)
PY
)"
done
echo "::error::Trusted instruction symlink chain is too deep for $1."
return 1
}
mapfile -d '' instruction_paths < <(
{
git ls-files -z | grep -z -E '(^|/)(AGENTS|CLAUDE)\.md$' || true
git ls-tree -r -z --name-only "${BASE_SHA}" | grep -z -E '(^|/)(AGENTS|CLAUDE)\.md$' || true
} | sort -zu
)
for path in "${instruction_paths[@]}"; do
if git cat-file -e "${BASE_SHA}:${path}" 2>/dev/null; then
resolved_path="$(resolve_base_instruction_path "${path}")"
mkdir -p -- "$(dirname -- "${path}")"
rm -f -- "${path}"
git show "${BASE_SHA}:${resolved_path}" > "${path}"
else
rm -f -- "${path}"
fi
done
- name: Generate structured output schema
run: |
cat > ai-review-output-schema.json <<'JSON'
{
"type": "object",
"properties": {
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {
"type": "string",
"minLength": 1,
"maxLength": 80
},
"body": {
"type": "string",
"minLength": 1
},
"confidence_score": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"priority": {
"type": "integer",
"minimum": 0,
"maximum": 3
},
"code_location": {
"type": "object",
"properties": {
"relative_file_path": {
"type": "string",
"minLength": 1
},
"line_range": {
"type": "object",
"properties": {
"start": {
"type": "integer",
"minimum": 1
},
"end": {
"type": "integer",
"minimum": 1
}
},
"required": [
"start",
"end"
],
"additionalProperties": false
}
},
"required": [
"relative_file_path",
"line_range"
],
"additionalProperties": false
},
"suggested_replacement": {
"type": "string"
}
},
"required": [
"title",
"body",
"confidence_score",
"priority",
"code_location",
"suggested_replacement"
],
"additionalProperties": false
}
},
"overall_correctness": {
"type": "string",
"enum": [
"patch is correct",
"patch is incorrect"
]
},
"overall_explanation": {
"type": "string",
"minLength": 1
},
"overall_confidence_score": {
"type": "number",
"minimum": 0,
"maximum": 1
}
},
"required": [
"findings",
"overall_correctness",
"overall_explanation",
"overall_confidence_score"
],
"additionalProperties": false
}
JSON
- name: Build AI review prompt
run: |
set -euo pipefail
{
cat <<'PROMPT'
You are acting as a reviewer for a proposed code change made by another engineer.
Focus on issues that impact correctness, performance, security, maintainability, or developer experience.
Flag only actionable issues introduced by the pull request.
When you flag an issue, cite a file path and line range from the changed side of the pull request diff.
Prioritize severe issues and avoid nit-level comments unless they block understanding of the diff.
The workspace is checked out at the pull request head, with reviewer instruction files restored from the trusted base commit before this action runs. Before returning JSON, inspect the changed files and any nearby code needed to validate assumptions; do not rely only on the pasted diff when broader context matters.
For GitHub Actions and CI changes, explicitly check event coverage, fork and Dependabot token/secrets behavior, workflow_call caller context, branch-protection check names, stale/skipped check rows, job permissions, pagination of GitHub API queries, and whether fallback paths can actually retrigger and pass.
For automation that uses secrets or models, check that untrusted PR text cannot reach a secret-backed agent with write permissions, and that the final gate fails closed when the reviewer output is absent, malformed, or incomplete.
After listing findings, produce an overall correctness verdict ("patch is correct" or "patch is incorrect") with a concise justification and a confidence score between 0 and 1.
If a finding can be fixed by replacing the exact cited line range, set suggested_replacement to the complete replacement text for that range. Only provide a suggested_replacement when it is safe for GitHub's suggestion UI to apply directly. Use an empty string when the fix needs wider edits, depends on context outside the cited range, or is not obvious.
Return only JSON that matches the supplied schema.
PROMPT
echo
echo "Repository: ${REPOSITORY}"
echo "Pull Request #: ${PR_NUMBER}"
echo "Base SHA: ${BASE_SHA}"
echo "Head SHA: ${HEAD_SHA}"
echo
echo "Changed files:"
git --no-pager diff --name-status "${BASE_SHA}...${HEAD_SHA}"
echo
echo "Diff stat:"
git --no-pager diff --stat=200 "${BASE_SHA}...${HEAD_SHA}"
echo
echo "Unified diff:"
} > ai-review-prompt.md
git --no-pager diff --unified=5 "${BASE_SHA}...${HEAD_SHA}" > ai-review-full.diff
python3 - <<'PY'
# codex-action rejects turn input over 1,048,576 characters
# (input_too_large), so a subtree-sized PR cannot paste its whole
# diff. Append whole per-file diffs in order until the prompt budget
# runs out, then say how many were left out; every changed file is
# already named above and the prompt instructs the reviewer to read
# the checked-out head rather than rely only on the paste.
BUDGET = 900_000 # headroom under the cap for schema and turn overhead
with open("ai-review-full.diff", encoding="utf-8", errors="replace") as diff_file:
diff = diff_file.read()
with open("ai-review-prompt.md", encoding="utf-8", errors="replace") as prompt_file:
remaining = BUDGET - len(prompt_file.read())
with open("ai-review-prompt.md", "a", encoding="utf-8") as prompt:
if len(diff) <= remaining:
prompt.write(diff)
raise SystemExit(0)
boundary = "\ndiff --git "
parts = diff.split(boundary)
chunks = parts[:1] + [boundary + part for part in parts[1:]]
omitted = 0
for chunk in chunks:
if omitted == 0 and len(chunk) <= remaining:
prompt.write(chunk)
remaining -= len(chunk)
else:
omitted += 1
prompt.write(
"\n\n[Prompt budget reached: inline diffs omitted for "
f"{omitted} of {len(chunks)} changed file(s). The omitted "
"files appear in the Changed files list above; review them "
"from the checked-out head.]\n"
)
PY
- name: Remove stale AI review output
run: rm -f ai-review-output.json
- name: Run AI structured review
id: run-ai-review
uses: openai/codex-action@e0fdf01220eb9a88167c4898839d273e3f2609d1 # v1.8
with:
openai-api-key: ${{ secrets.openai_api_key || secrets.OPENAI_API_KEY }}
prompt-file: ai-review-prompt.md
output-schema-file: ai-review-output-schema.json
output-file: ai-review-output.json
sandbox: read-only
safety-strategy: drop-sudo
model: ${{ env.AI_REVIEW_MODEL }}
effort: ${{ env.AI_REVIEW_EFFORT }}
- name: Inspect structured review output
if: always()
run: |
if [ -s ai-review-output.json ]; then
jq '.' ai-review-output.json
else
echo "AI review output file missing."
fi
- name: Stage AI review artifact
if: ${{ success() }}
run: |
set -euo pipefail
if [ ! -s ai-review-output.json ]; then
echo "::error::AI review output file missing or empty after a successful reviewer run."
exit 1
fi
mkdir -p ai-review-artifact
cp ai-review-output.json ai-review-artifact/ai-review-output.json
- name: Upload AI review output
if: ${{ success() }}
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: ai-review-${{ github.run_id }}
path: ai-review-artifact/ai-review-output.json
retention-days: 1
publish-ai-review:
name: publish ai structured review
if: >-
always() &&
github.event.pull_request.draft == false &&
(inputs.caller_event_name || github.event_name) == 'pull_request' &&
github.event.pull_request.head.repo.full_name == github.repository &&
github.event.pull_request.user.login != 'dependabot[bot]'
needs:
- ai-structured-review
runs-on: ubuntu-latest
permissions:
actions: read
issues: write
pull-requests: write
env:
HEAD_SHA: ${{ github.event.pull_request.head.sha }}
PR_NUMBER: ${{ github.event.pull_request.number }}
REPOSITORY: ${{ github.repository }}
steps:
- name: Require successful AI review job
env:
AI_STRUCTURED_REVIEW_RESULT: ${{ needs.ai-structured-review.result }}
run: |
if [ "${AI_STRUCTURED_REVIEW_RESULT}" != "success" ]; then
echo "::error::AI structured review did not complete successfully (${AI_STRUCTURED_REVIEW_RESULT})."
exit 1
fi
- name: Download AI review output
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1
with:
name: ai-review-${{ github.run_id }}
path: .
- name: Publish GitHub review suggestions
env:
GITHUB_TOKEN: ${{ secrets.repository_token || github.token }}
run: |
python3 - <<'PY'
from __future__ import annotations
import hashlib
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
def require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise RuntimeError(f"missing required environment variable {name}")
return value
REPOSITORY = require_env("REPOSITORY")
PR_NUMBER = int(require_env("PR_NUMBER"))
HEAD_SHA = require_env("HEAD_SHA")
GITHUB_TOKEN = require_env("GITHUB_TOKEN")
OUTPUT_PATH = "ai-review-output.json"
BOT_AUTHORS = {"github-actions", "github-actions[bot]"}
MARKER_PREFIX = f"<!-- ai-review:{HEAD_SHA}:"
SUMMARY_MARKER = f"<!-- ai-review-summary:{HEAD_SHA} -->"
def request_api_json(method: str, path: str, data: dict[str, Any] | None = None) -> Any:
url = f"https://api.github.com{path}"
body = None if data is None else json.dumps(data).encode()
request = urllib.request.Request(url, data=body, method=method)
request.add_header("Accept", "application/vnd.github+json")
request.add_header("Authorization", f"Bearer {GITHUB_TOKEN}")
request.add_header("X-GitHub-Api-Version", "2022-11-28")
if data is not None:
request.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(request) as response:
payload = response.read().decode()
except urllib.error.HTTPError as error:
detail = error.read().decode(errors="replace")
raise RuntimeError(f"GitHub API {method} {path} failed: {error.code} {detail}") from error
return None if not payload else json.loads(payload)
def request_json(method: str, path: str, data: dict[str, Any] | None = None) -> Any:
return request_api_json(method, f"/repos/{REPOSITORY}{path}", data)
def try_request_api_json(method: str, path: str, data: dict[str, Any] | None = None) -> Any:
try:
return request_api_json(method, path, data)
except (RuntimeError, urllib.error.URLError) as error:
print(f"warning: could not resolve publisher identity from {path}: {error}", file=sys.stderr)
return None
def request_pages(path: str) -> list[dict[str, Any]]:
page = 1
items: list[dict[str, Any]] = []
while True:
separator = "&" if "?" in path else "?"
payload = request_json("GET", f"{path}{separator}{urllib.parse.urlencode({'per_page': 100, 'page': page})}")
if not payload:
return items
if not isinstance(payload, list):
raise RuntimeError(f"GitHub API {path} returned a non-list payload")
items.extend(payload)
if len(payload) < 100:
return items
page += 1
def load_review_output() -> dict[str, Any]:
if not os.path.exists(OUTPUT_PATH) or os.path.getsize(OUTPUT_PATH) == 0:
raise RuntimeError("AI review output file missing; failing review gate.")
with open(OUTPUT_PATH, encoding="utf-8") as output:
payload = json.load(output)
if not isinstance(payload, dict):
raise RuntimeError("AI review output must be a JSON object")
return payload
def author_login(item: dict[str, Any]) -> str:
user = item.get("user")
if not isinstance(user, dict):
return ""
return str(user.get("login") or "")
def publisher_authors() -> set[str]:
# Installation tokens may not expose `/user` or `/app`; keep the
# default Actions authors when identity discovery is unavailable.
authors = set(BOT_AUTHORS)
viewer_payload = try_request_api_json("POST", "/graphql", {"query": "query { viewer { login } }"})
if isinstance(viewer_payload, dict):
viewer_data = viewer_payload.get("data")
if isinstance(viewer_data, dict):
graphql_viewer = viewer_data.get("viewer")
if isinstance(graphql_viewer, dict):
graphql_login = str(graphql_viewer.get("login") or "")
if graphql_login:
authors.add(graphql_login)
viewer = try_request_api_json("GET", "/user")
if isinstance(viewer, dict):
login = str(viewer.get("login") or "")
if login:
authors.add(login)
return authors
def marker_bodies(path: str, authors: set[str]) -> list[str]:
bodies: list[str] = []
for item in request_pages(path):
if author_login(item) in authors:
bodies.append(str(item.get("body") or ""))
return bodies
def existing_markers() -> set[str]:
authors = publisher_authors()
bodies: list[str] = []
bodies.extend(marker_bodies(f"/pulls/{PR_NUMBER}/comments", authors))
bodies.extend(marker_bodies(f"/pulls/{PR_NUMBER}/reviews", authors))
bodies.extend(marker_bodies(f"/issues/{PR_NUMBER}/comments", authors))
markers: set[str] = set()
for body in bodies:
for line in body.splitlines():
if line.startswith(MARKER_PREFIX) or line == SUMMARY_MARKER:
markers.add(line)
return markers
def priority_badge(priority: int) -> str:
colors = {
0: "red",
1: "orange",
2: "yellow",
3: "blue",
}
color = colors.get(priority)
if color is None:
raise RuntimeError(f"finding priority must be between 0 and 3: {priority}")
return f"![P{priority} Badge](https://img.shields.io/badge/P{priority}-{color}?style=flat)"
def finding_marker(path: str, start: int, end: int, priority: int, title: str, body: str, suggestion: str | None) -> str:
marker_payload = {
"body": body,
"end": end,
"path": path,
"priority": priority,
"start": start,
"suggestion": suggestion or "",
"title": title,
}
marker_json = json.dumps(marker_payload, sort_keys=True, separators=(",", ":"))
marker_hash = hashlib.sha256(marker_json.encode()).hexdigest()[:24]
return f"{MARKER_PREFIX}{marker_hash} -->"
def normalize_finding(finding: dict[str, Any], index: int) -> dict[str, Any]:
location = finding.get("code_location")
if not isinstance(location, dict):
raise RuntimeError(f"finding {index} is missing code_location")
line_range = location.get("line_range")
if not isinstance(line_range, dict):
raise RuntimeError(f"finding {index} is missing code_location.line_range")
path = str(location.get("relative_file_path") or "").strip()
if not path or path.startswith("/") or ".." in path.split("/"):
raise RuntimeError(f"finding {index} has invalid relative_file_path: {path!r}")
start = int(line_range.get("start"))
end = int(line_range.get("end"))
if start <= 0 or end < start:
raise RuntimeError(f"finding {index} has invalid line range: {start}-{end}")
priority = int(finding.get("priority"))
confidence = float(finding.get("confidence_score"))
if confidence < 0 or confidence > 1:
raise RuntimeError(f"finding {index} confidence must be between 0 and 1: {confidence}")
title = str(finding.get("title") or "").strip()
body = str(finding.get("body") or "").strip()
if not title or not body:
raise RuntimeError(f"finding {index} is missing title or body")
suggestion = finding.get("suggested_replacement")
if suggestion is not None:
suggestion = str(suggestion).rstrip("\n")
if "```" in suggestion or suggestion == "":
suggestion = None
marker = finding_marker(path, start, end, priority, title, body, suggestion)
comment_body = "\n".join(
[
marker,
f"{priority_badge(priority)} **{title}**",
"",
body,
]
)
if suggestion is not None:
comment_body = "\n".join(
[
comment_body,
"",
"```suggestion",
suggestion,
"```",
]
)
comment = {
"body": comment_body,
"path": path,
"side": "RIGHT",
"line": end,
}
if start != end:
comment["start_line"] = start
comment["start_side"] = "RIGHT"
return {
"marker": marker,
"comment": comment,
"summary": f"- P{priority} `{path}:{start}` {title}",
}
def create_issue_comment(body: str) -> None:
request_json("POST", f"/issues/{PR_NUMBER}/comments", {"body": body})
def publish_review(findings: list[dict[str, Any]], review_output: dict[str, Any]) -> None:
markers = existing_markers()
normalized = [
normalize_finding(finding, index)
for index, finding in enumerate(findings, start=1)
]
new_comments = [
item["comment"]
for item in normalized
if item["marker"] not in markers
]
summary_lines = [
SUMMARY_MARKER,
"AI review found issues in this pull request.",
"",
f"Verdict: {review_output.get('overall_correctness')}",
f"Confidence: {float(review_output.get('overall_confidence_score', 0)):.2f}",
"",
str(review_output.get("overall_explanation") or "").strip(),
"",
*[item["summary"] for item in normalized],
]
if not new_comments:
print("All AI review findings were already published for this head.")
if SUMMARY_MARKER not in markers:
create_issue_comment("\n".join(summary_lines))
return
review_body = "\n".join(summary_lines)
try:
request_json(
"POST",
f"/pulls/{PR_NUMBER}/reviews",
{
"commit_id": HEAD_SHA,
"event": "COMMENT",
"body": review_body,
"comments": new_comments,
},
)
print(f"Published {len(new_comments)} AI review comment(s).")
return
except RuntimeError as error:
print(f"Bulk review publication failed; falling back to individual comments: {error}")
failed: list[str] = []
for item in normalized:
if item["marker"] in markers:
continue
try:
payload = {"commit_id": HEAD_SHA, **item["comment"]}
request_json("POST", f"/pulls/{PR_NUMBER}/comments", payload)
except RuntimeError as error:
failed.append(f"{item['summary']} ({error})")
if failed:
create_issue_comment(
"\n".join(
[
*summary_lines,
"",
"Some inline comments could not be placed on the diff:",
*failed,
]
)
)
print(f"{len(failed)} AI review finding(s) could not be placed inline.")
else:
print("Published AI review comments individually.")
def main() -> int:
review_output = load_review_output()
findings = review_output.get("findings")
if not isinstance(findings, list):
raise RuntimeError("AI review output field findings must be a list")
verdict = str(review_output.get("overall_correctness") or "")
if findings:
publish_review(findings, review_output)
elif verdict == "patch is incorrect":
markers = existing_markers()
if SUMMARY_MARKER not in markers:
create_issue_comment(
"\n".join(
[
SUMMARY_MARKER,
"AI review marked this pull request incorrect but did not return inline findings.",
"",
str(review_output.get("overall_explanation") or "").strip(),
]
)
)
if findings or verdict != "patch is correct":
print("AI review found issues.")
return 1
print("AI review found no issues.")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except RuntimeError as error:
print(f"error: {error}", file=sys.stderr)
raise SystemExit(1)
PY
no-secret-review-fallback:
name: no-secret review fallback
if: >-
github.event.pull_request.draft == false &&
(
(inputs.caller_event_name || github.event_name) == 'pull_request_target' ||
(inputs.caller_event_name || github.event_name) == 'pull_request_review'
) &&
(
github.event.pull_request.head.repo.full_name != github.repository ||
github.event.pull_request.user.login == 'dependabot[bot]'
)
runs-on: ubuntu-latest
permissions:
pull-requests: read
env:
HEAD_SHA: ${{ github.event.pull_request.head.sha }}
PR_NUMBER: ${{ github.event.pull_request.number }}
REPOSITORY: ${{ github.repository }}
steps:
- name: Require maintainer approval on no-secret head
env:
GITHUB_TOKEN: ${{ secrets.repository_token || github.token }}
run: |
python3 - <<'PY'
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.request
from typing import Any
def require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise RuntimeError(f"missing required environment variable {name}")
return value
def request_graphql(token: str, query: str, variables: dict[str, Any]) -> dict[str, Any]:
payload = json.dumps(
{
"query": query,
"variables": variables,
}
).encode()
request = urllib.request.Request("https://api.github.com/graphql", data=payload, method="POST")
request.add_header("Authorization", f"Bearer {token}")
request.add_header("Content-Type", "application/json")
request.add_header("X-GitHub-Api-Version", "2022-11-28")
try:
with urllib.request.urlopen(request) as response:
response_payload = json.loads(response.read().decode())
except urllib.error.HTTPError as error:
detail = error.read().decode(errors="replace")
raise RuntimeError(f"GitHub GraphQL request failed: {error.code} {detail}") from error
if response_payload.get("errors"):
raise RuntimeError(f"GitHub GraphQL returned errors: {response_payload['errors']}")
if not isinstance(response_payload, dict):
raise RuntimeError("GitHub GraphQL returned a non-object response")
return response_payload
def main() -> int:
repository = require_env("REPOSITORY")
owner, repo = repository.split("/", 1)
number = int(require_env("PR_NUMBER"))
head_sha = require_env("HEAD_SHA")
token = require_env("GITHUB_TOKEN")
query = """
query($owner:String!,$repo:String!,$number:Int!,$cursor:String) {
repository(owner:$owner, name:$repo) {
pullRequest(number:$number) {
latestReviews(first:100, after:$cursor) {
nodes {
state
authorAssociation
author { login }
commit { oid }
}
pageInfo {
hasNextPage
endCursor
}
}
}
}
}
"""
reviews: list[dict[str, Any]] = []
cursor: str | None = None
while True:
response_payload = request_graphql(
token,
query,
{
"owner": owner,
"repo": repo,
"number": number,
"cursor": cursor,
},
)
latest_reviews = (
response_payload
.get("data", {})
.get("repository", {})
.get("pullRequest", {})
.get("latestReviews", {})
)
if not isinstance(latest_reviews, dict):
raise RuntimeError("GitHub GraphQL response is missing latestReviews")
nodes = latest_reviews.get("nodes", [])
if not isinstance(nodes, list):
raise RuntimeError("GitHub GraphQL latestReviews.nodes must be a list")
reviews.extend(review for review in nodes if isinstance(review, dict))
page_info = latest_reviews.get("pageInfo", {})
if not isinstance(page_info, dict) or not page_info.get("hasNextPage"):
break
cursor = str(page_info.get("endCursor") or "")
if not cursor:
raise RuntimeError("GitHub GraphQL latestReviews pagination returned no endCursor")
trusted_associations = {"COLLABORATOR", "MEMBER", "OWNER"}
approvers = [
str(review.get("author", {}).get("login") or "")
for review in reviews
if review.get("state") == "APPROVED"
and review.get("authorAssociation") in trusted_associations
and review.get("commit", {}).get("oid") == head_sha
]
if not approvers:
print(
"::error::This pull request does not run the secret-backed AI reviewer. "
"A repository maintainer must approve the current head commit before this gate can pass."
)
return 1
print(f"No-secret fallback accepted maintainer approval from: {', '.join(approvers)}")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except RuntimeError as error:
print(f"error: {error}", file=sys.stderr)
raise SystemExit(1)
PY
required-review-gate:
name: >-
${{
github.event.pull_request.draft == false &&
github.event.pull_request.base.ref == 'main' &&
(
(inputs.caller_event_name || github.event_name) == 'pull_request' ||
(inputs.caller_event_name || github.event_name) == 'pull_request_target' ||
(inputs.caller_event_name || github.event_name) == 'pull_request_review'
) &&
(
(
github.event.pull_request.head.repo.full_name == github.repository &&
github.event.pull_request.user.login != 'dependabot[bot]'
) ||
(inputs.caller_event_name || github.event_name) != 'pull_request'
) &&
(inputs.required_check_name || vars.AI_REVIEW_REQUIRED_CHECK_NAME || 'ai review approved') ||
'ai review gate skipped'
}}
if: >-
always() &&
github.event.pull_request.draft == false &&
github.event.pull_request.base.ref == 'main' &&
(
(inputs.caller_event_name || github.event_name) == 'pull_request' ||
(inputs.caller_event_name || github.event_name) == 'pull_request_target' ||
(inputs.caller_event_name || github.event_name) == 'pull_request_review'
) &&
(
(
github.event.pull_request.head.repo.full_name == github.repository &&
github.event.pull_request.user.login != 'dependabot[bot]'
) ||
(
(inputs.caller_event_name || github.event_name) != 'pull_request' &&
(
github.event.pull_request.head.repo.full_name != github.repository ||
github.event.pull_request.user.login == 'dependabot[bot]'
)
)
)
needs:
- no-secret-review-fallback
- publish-ai-review
runs-on: ubuntu-latest
permissions:
checks: read
contents: read
pull-requests: read
env:
HEAD_SHA: ${{ github.event.pull_request.head.sha }}
PR_NUMBER: ${{ github.event.pull_request.number }}
REPOSITORY: ${{ github.repository }}
RESULT_SOURCE: >-
${{
((inputs.caller_event_name || github.event_name) == 'pull_request' &&
github.event.pull_request.head.repo.full_name == github.repository &&
github.event.pull_request.user.login != 'dependabot[bot]') &&
'publish-current' ||
((github.event.pull_request.head.repo.full_name == github.repository &&
github.event.pull_request.user.login != 'dependabot[bot]') &&
'publish-existing' ||
'fallback-current')
}}
steps:
- name: Mirror AI review result for branch protection
env:
GITHUB_TOKEN: ${{ secrets.repository_token || github.token }}
run: |
if [ "${RESULT_SOURCE}" = "publish-current" ]; then
result="${{ needs.publish-ai-review.result }}"
elif [ "${RESULT_SOURCE}" = "fallback-current" ]; then
result="${{ needs.no-secret-review-fallback.result }}"
else
if python3 - <<'PY'
from __future__ import annotations
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
def require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise RuntimeError(f"missing required environment variable {name}")
return value
def request_json(path: str) -> dict[str, Any]:
repository = require_env("REPOSITORY")
token = require_env("GITHUB_TOKEN")
request = urllib.request.Request(f"https://api.github.com/repos/{repository}{path}")
request.add_header("Accept", "application/vnd.github+json")
request.add_header("Authorization", f"Bearer {token}")
request.add_header("X-GitHub-Api-Version", "2022-11-28")
try:
with urllib.request.urlopen(request) as response:
payload = json.loads(response.read().decode())
except urllib.error.HTTPError as error:
detail = error.read().decode(errors="replace")
raise RuntimeError(f"GitHub API GET {path} failed: {error.code} {detail}") from error
if not isinstance(payload, dict):
raise RuntimeError("GitHub API returned a non-object response")
return payload
def main() -> int:
head_sha = require_env("HEAD_SHA")
pr_number = int(require_env("PR_NUMBER"))
publish_job = "publish ai structured review"
structured_job = "ai structured review"
ignored_conclusions = {"neutral", "skipped"}
terminal_failures = {"action_required", "cancelled", "failure", "stale", "startup_failure", "timed_out"}
def candidate_shas() -> list[str]:
shas = [head_sha]
pull = request_json(f"/pulls/{pr_number}")
pull_head = pull.get("head", {})
pull_head_sha = ""
if isinstance(pull_head, dict):
pull_head_sha = str(pull_head.get("sha") or "")
merge_sha = str(pull.get("merge_commit_sha") or "")
if pull_head_sha == head_sha and merge_sha and merge_sha not in shas:
shas.append(merge_sha)
return shas
def belongs_to_pull_request(run: dict[str, Any]) -> bool:
pull_requests = run.get("pull_requests")
if not isinstance(pull_requests, list) or not pull_requests:
return True
return any(
isinstance(pull_request, dict)
and pull_request.get("number") == pr_number
for pull_request in pull_requests
)
def is_job_run(run: dict[str, Any], job_name: str) -> bool:
name = str(run.get("name") or "")
return name == job_name or name.endswith(f" / {job_name}")
def run_time(run: dict[str, Any]) -> str:
for key in ("completed_at", "started_at", "created_at"):
value = run.get(key)
if isinstance(value, str) and value:
return value
return ""
def latest_run(runs: list[dict[str, Any]]) -> dict[str, Any] | None:
if not runs:
return None
return max(runs, key=run_time)
for attempt in range(1, 181):
check_runs: list[dict[str, Any]] = []
for sha in candidate_shas():
page = 1
while True:
query = urllib.parse.urlencode(
{
"filter": "all",
"page": page,
"per_page": 100,
}
)
payload = request_json(f"/commits/{sha}/check-runs?{query}")
page_runs = payload.get("check_runs", [])
if not isinstance(page_runs, list):
raise RuntimeError("GitHub API check_runs field must be a list")
check_runs.extend(
run
for run in page_runs
if isinstance(run, dict) and belongs_to_pull_request(run)
)
if len(page_runs) < 100:
break
page += 1
watched_runs = [
run
for run in check_runs
if (
is_job_run(run, publish_job)
or is_job_run(run, structured_job)
)
and run.get("conclusion") not in ignored_conclusions
]
publisher_runs = [
run for run in watched_runs if is_job_run(run, publish_job)
]
structured_runs = [
run for run in watched_runs if is_job_run(run, structured_job)
]
publisher = latest_run(publisher_runs)
structured = latest_run(structured_runs)
selected = publisher
if structured is not None and (
publisher is None or run_time(structured) > run_time(publisher)
):
selected = structured
if selected is not None:
name = str(selected.get("name") or "AI review")
status = str(selected.get("status") or "")
conclusion = str(selected.get("conclusion") or "")
if status != "completed":
print(f"Waiting for current-head AI review check: {name}.")
elif conclusion in terminal_failures:
print(f"::error::{name} failed for the current head.")
return 1
elif is_job_run(selected, publish_job) and conclusion == "success":
print("Found latest successful AI review publisher check for current head.")
return 0
else:
print("Waiting for the current-head AI review publisher check to finish.")
else:
print("Waiting for a current-head AI review check to appear.")
active_runs = [
str(run.get("name"))
for run in watched_runs
if run.get("status") != "completed"
]
if active_runs and selected is None:
print(f"Active AI review checks: {', '.join(sorted(set(active_runs)))}.")
if attempt == 180:
print(
"::error::Timed out waiting for a successful AI review publisher check "
"on the current pull request head or merge SHA."
)
return 1
time.sleep(10)
if __name__ == "__main__":
try:
raise SystemExit(main())
except RuntimeError as error:
print(f"error: {error}", file=sys.stderr)
raise SystemExit(1)
PY
then
result="success"
else
result="failure"
fi
fi
if [ "${result}" != "success" ]; then
echo "::error::AI review did not pass."
exit 1
fi
echo "AI review passed."