Skip to content

[Bug] Partial partition consumers silent after topic fencing #62824

[Bug] Partial partition consumers silent after topic fencing

[Bug] Partial partition consumers silent after topic fencing #62824

Workflow file for this run

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
name: Pulsar Bot
on:
issue_comment:
types: [created]
permissions:
actions: write
contents: read
pull-requests: read
issues: read
jobs:
pulsarbot:
runs-on: ubuntu-24.04
timeout-minutes: 10
if: github.event_name == 'issue_comment' && contains(github.event.comment.body, '/pulsarbot')
steps:
- name: Execute pulsarbot command
uses: actions/github-script@v8
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
const commentBody = (context.payload.comment?.body || '').trim();
const prefix = '/pulsarbot';
if (!commentBody.startsWith(prefix)) {
console.log('Not a pulsarbot command, skipping ...');
return;
}
if (!context.payload.issue || !context.payload.issue.pull_request) {
console.error('This comment is not on a Pull Request. pulsarbot only works on PRs.');
return;
}
const parts = commentBody.split(/\s+/);
const sub = (parts[1] || '').toLowerCase();
const arg = parts.length > 2 ? parts.slice(2).join(' ') : '';
const supported = ['rerun', 'stop', 'cancel', 'rerun-failure-checks'];
if (!supported.includes(sub)) {
console.log(
`Unsupported command '${sub}'. Supported: ${supported
.map(cmd => `'/pulsarbot ${cmd}${cmd === 'rerun' ? ' [jobName?]' : ''}'`)
.join(', ')}.`
);
return;
}
const prNum = context.payload.issue.number;
// Get PR info
let pr;
try {
({ data: pr } = await github.rest.pulls.get({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: prNum,
}));
} catch (e) {
console.error(`Failed to fetch PR #${prNum}: ${e.message}`);
return;
}
const headSha = pr.head.sha;
const prBranch = pr.head.ref;
const prUser = pr.user.login;
const prUrl = pr.html_url;
console.log(`pulsarbot handling PR #${prNum} ${prUrl}`);
console.log(`PR branch='${prBranch}', headSha='${headSha}', author='${prUser}'`);
console.log(`Command parsed => sub='${sub}', arg='${arg || ''}'`);
// Most reliable: list workflow runs by head_sha (no guessing by actor/branch/event)
const runsAtHead = await github.paginate(
github.rest.actions.listWorkflowRunsForRepo,
{
owner: context.repo.owner,
repo: context.repo.repo,
head_sha: headSha,
per_page: 100,
},
);
console.log(`runsAtHead total=${runsAtHead.length} for head_sha=${headSha}`);
if (runsAtHead.length === 0) {
console.error(`No workflow runs found for head SHA ${headSha} (PR branch ${prBranch}).`);
return;
}
// Only keep the latest run for each workflow_id
runsAtHead.sort((a, b) => {
const aw = String(a.workflow_id);
const bw = String(b.workflow_id);
if (aw !== bw) {
if (aw.length < bw.length) return -1;
if (aw.length > bw.length) return 1;
return aw < bw ? -1 : 1;
}
const at = new Date(a.created_at).getTime();
const bt = new Date(b.created_at).getTime();
if (bt > at) return 1;
if (bt < at) return -1;
return 0;
});
const latestRuns = [];
const seen = new Set();
for (const r of runsAtHead) {
if (!seen.has(r.workflow_id)) {
seen.add(r.workflow_id);
latestRuns.push(r);
}
}
function runKey(r) {
return `[run_id=${r.id}] ${r.name || '(unnamed)'} | status=${r.status} | conclusion=${r.conclusion || '-'} | ${r.html_url}`;
}
console.log('--- Latest workflow runs for this PR headSHA (one per workflow) ---');
for (const r of latestRuns) console.log('- ' + runKey(r));
async function listAllJobs(runId) {
const jobs = await github.paginate(
github.rest.actions.listJobsForWorkflowRun,
{
owner: context.repo.owner,
repo: context.repo.repo,
run_id: runId,
per_page: 100,
},
);
return jobs;
}
async function rerunJob(job, run) {
try {
await github.rest.actions.reRunJobForWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
job_id: job.id,
});
console.log(`Re-ran job '${job.name}' (job_id=${job.id}) in run '${run.name}' | ${run.html_url}`);
return true;
} catch (e) {
console.log(`Failed to re-run job '${job.name}' (job_id=${job.id}) in run '${run.name}': ${e.message}`);
return false;
}
}
// Command 1: /pulsarbot rerun (or rerun-failure-checks)
if ((sub === 'rerun' || sub === 'rerun-failure-checks') && !arg) {
const targetConclusions = new Set(['failure', 'timed_out', 'cancelled', 'skipped']);
let rerunCount = 0;
let skippedRunning = 0;
let skippedConclusion = 0;
console.log('Mode: workflow re-run for completed runs with conclusions in [failure,timed_out,cancelled,skipped].');
for (const r of latestRuns) {
if (r.status !== 'completed') {
console.log(`Skip (still running) ${runKey(r)}. Cannot re-run whole workflow. Consider '/pulsarbot rerun <jobName>' for single job.`);
skippedRunning++;
continue;
}
if (!targetConclusions.has(r.conclusion)) {
console.log(`Skip (conclusion not eligible) ${runKey(r)}`);
skippedConclusion++;
continue;
}
try {
await github.rest.actions.reRunWorkflowFailedJobs({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: r.id,
});
console.log(`Triggered re-run for ${runKey(r)}`);
rerunCount++;
} catch (e) {
console.log(`Failed to trigger re-run for ${runKey(r)}: ${e.message}`);
}
}
if (rerunCount === 0) {
console.error(`No eligible workflow runs to re-run. Skipped running=${skippedRunning}, skipped by conclusion=${skippedConclusion}.`);
} else {
console.log(`Finished. Triggered re-run for ${rerunCount} workflow run(s). Skipped running=${skippedRunning}, skipped by conclusion=${skippedConclusion}.`);
}
return;
}
// Command 2: /pulsarbot rerun jobname
if (sub === 'rerun' && arg) {
const keyword = arg.trim();
console.log(`Mode: job-level re-run. keyword='${keyword}'`);
let matchedJobs = 0;
let successJobs = 0;
for (const r of latestRuns) {
console.log(`Inspecting jobs in run for workflow '${r.name}' | ${r.html_url}`);
let jobs = [];
try {
jobs = await listAllJobs(r.id);
} catch (e) {
console.log(`Failed to list jobs for ${runKey(r)}: ${e.message}`);
continue;
}
for (const j of jobs) {
console.log(`Inspecting job '${j.name}' (job_id=${j.id})`);
if (j.name && j.name.includes(keyword)) {
matchedJobs++;
console.log(`Matched job '${j.name}'. Rerunning...`);
const ok = await rerunJob(j, r);
if (ok) successJobs++;
}
}
}
if (matchedJobs === 0) {
console.error(`No jobs matched keyword '${keyword}' among latest runs for this PR head.`);
} else {
console.log(`Finished. Matched ${matchedJobs} job(s); successfully requested re-run for ${successJobs} job(s).`);
}
return;
}
// Command 3: /pulsarbot stop or /pulsarbot cancel
if (sub === 'stop' || sub === 'cancel') {
console.log('Mode: cancel running workflow runs (queued/in_progress).');
let cancelCount = 0;
let alreadyCompleted = 0;
for (const r of latestRuns) {
if (r.status === 'completed') {
console.log(`Skip (already completed) ${runKey(r)}`);
alreadyCompleted++;
continue;
}
try {
await github.rest.actions.cancelWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: r.id,
});
console.log(`Cancel requested for ${runKey(r)}`);
cancelCount++;
} catch (e) {
console.log(`Failed to cancel ${runKey(r)}: ${e.message}`);
}
}
if (cancelCount === 0) {
console.error(`No running workflow runs to cancel. Already completed: ${alreadyCompleted}.`);
} else {
console.log(`Finished. Requested cancel for ${cancelCount} running workflow run(s). Already completed: ${alreadyCompleted}.`);
}
return;
}