Skip to content

Commit 2a33ca3

Browse files
committed
fix: require claimed status before dispatch
1 parent ac1af2b commit 2a33ca3

2 files changed

Lines changed: 14 additions & 1 deletion

File tree

src/domain/task-graph.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ export function dispatchClaimedTask(state: WorkerExecutionState, request: { task
418418
if (worker.state !== "ready") throw new Error(`worker ${worker.id} is not ready`);
419419
if (worker.readinessNonce && request.readinessNonce !== worker.readinessNonce) throw new Error(`worker ${worker.id} readiness nonce mismatch`);
420420
const task = findTask(state.graph, request.taskId);
421+
if (task.status !== "claimed") throw new Error(`task ${task.id} is not claimed`);
421422
const claim = requireActiveClaim(task, request.claimToken, now);
422423
if (claim.workerId !== request.workerId) throw new Error(`task ${task.id} is claimed by ${claim.workerId}`);
423424
if (state.dispatches.some((dispatch) => dispatch.taskId === task.id && dispatch.claimToken === request.claimToken)) throw new Error(`task ${task.id} claim is already dispatched`);

test/task-graph.test.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,15 @@ test("worker execution dispatches claimed parallel tasks and completes only from
254254
assert.equal(stale.find((worker) => worker.id === "w2")?.state, "unhealthy");
255255
});
256256

257+
test("worker dispatch rejects non-claimed tasks with active claims without mutation", () => {
258+
const graph = { tasks: [{ id: "done", status: "completed" as const, dependencies: [], assignedWorker: "w1", claim: { token: "tok", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", expiresAt: "2026-01-01T00:20:00.000Z" } }] };
259+
const state: WorkerExecutionState = { graph, workers: [{ id: "w1", state: "ready", lastHeartbeatAt: "2026-01-01T00:01:00.000Z" }], dispatches: [], reports: [] };
260+
261+
assert.throws(() => dispatchClaimedTask(state, { taskId: "done", workerId: "w1", claimToken: "tok", now: "2026-01-01T00:02:00.000Z" }), /not claimed/);
262+
assert.equal(state.graph.tasks[0]?.status, "completed");
263+
assert.deepEqual(state.dispatches, []);
264+
});
265+
257266
test("stale worker reports cannot mutate expired or recovered claims", () => {
258267
const state: WorkerExecutionState = {
259268
graph: { tasks: [{ id: "build", status: "in_progress" as const, dependencies: [], assignedWorker: "w1", claim: { token: "old", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", expiresAt: "2026-01-01T00:10:00.000Z" } }] },
@@ -269,7 +278,10 @@ test("stale worker reports cannot mutate expired or recovered claims", () => {
269278
assert.deepEqual(state.reports, []);
270279

271280
const recovered = recoverExpiredClaim(state.graph, { taskId: "build", workerId: "w2", token: "new", now: "2026-01-01T00:11:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
272-
assert.throws(() => captureWorkerReport({ ...state, graph: recovered }, { ...report, reportedAt: "2026-01-01T00:12:00.000Z" }), /matching claim token/);
281+
const recoveredState = { ...state, graph: recovered };
282+
assert.throws(() => captureWorkerReport(recoveredState, { ...report, reportedAt: "2026-01-01T00:12:00.000Z" }), /matching claim token/);
283+
assert.equal(recoveredState.workers[0]?.state, "busy");
284+
assert.deepEqual(recoveredState.reports, []);
273285
});
274286

275287
test("worker heartbeat stale detection and terminal retention are modeled", () => {

0 commit comments

Comments
 (0)