Skip to content

Commit b9851f9

Browse files
committed
fix: reject stale worker reports
1 parent 76788d3 commit b9851f9

4 files changed

Lines changed: 35 additions & 11 deletions

File tree

src/domain/runtime-state.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
export const RUNTIME_STATE_SCHEMA_VERSION = 1;
22

33
export type RuntimeStatus = "draft" | "active" | "blocked" | "failed" | "sealed";
4-
export type RuntimeTaskStatus = "pending" | "ready" | "claimed" | "verifying" | "completed" | "blocked" | "failed" | "repair_required";
4+
export type RuntimeTaskStatus = "pending" | "ready" | "claimed" | "in_progress" | "verifying" | "completed" | "blocked" | "failed" | "repair_required";
55
export type RuntimeWorkerStatus = "ready" | "busy" | "unhealthy" | "completed-retained" | "safe-to-close" | "stale-registry" | "cleanup-released" | "closed";
66
export type RuntimeArtifactKind = "run-objective" | "policy-selection" | "task-graph" | "worker-state" | "evidence" | "evaluation" | "reward" | "integration-candidate" | "final-report";
77
export type EvaluationVerdict = "pass" | "fail" | "repair" | "blocked";
@@ -35,7 +35,7 @@ export interface RuntimeState {
3535
export type RuntimeStateParseResult = { ok: true; state: RuntimeState } | { ok: false; reason: "malformed" | "unsupported-newer-schema"; issues: string[] };
3636

3737
const runtimeStatuses = new Set<RuntimeStatus>(["draft", "active", "blocked", "failed", "sealed"]);
38-
const taskStatuses = new Set<RuntimeTaskStatus>(["pending", "ready", "claimed", "verifying", "completed", "blocked", "failed", "repair_required"]);
38+
const taskStatuses = new Set<RuntimeTaskStatus>(["pending", "ready", "claimed", "in_progress", "verifying", "completed", "blocked", "failed", "repair_required"]);
3939
const workerStatuses = new Set<RuntimeWorkerStatus>(["ready", "busy", "unhealthy", "completed-retained", "safe-to-close", "stale-registry", "cleanup-released", "closed"]);
4040
const artifactKinds = new Set<RuntimeArtifactKind>(["run-objective", "policy-selection", "task-graph", "worker-state", "evidence", "evaluation", "reward", "integration-candidate", "final-report"]);
4141
const evaluationVerdicts = new Set<EvaluationVerdict>(["pass", "fail", "repair", "blocked"]);

src/domain/task-graph.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,8 @@ export function getStaleWorkerIds(workers: WorkerRuntimeRef[], options: { now: s
391391
}
392392

393393
export function recordWorkerReady(workers: WorkerRuntimeRef[], input: { workerId: string; now: string; readinessNonce?: string }): WorkerRuntimeRef[] {
394+
requireText(input.workerId, "workerId");
395+
if (input.readinessNonce !== undefined) requireText(input.readinessNonce, "readinessNonce");
394396
parseTimestamp(input.now);
395397
return upsertWorker(workers, {
396398
id: input.workerId,
@@ -414,10 +416,11 @@ export function dispatchClaimedTask(state: WorkerExecutionState, request: { task
414416
const now = parseTimestamp(request.now);
415417
const worker = findWorker(state.workers, request.workerId);
416418
if (worker.state !== "ready") throw new Error(`worker ${worker.id} is not ready`);
417-
if (request.readinessNonce && worker.readinessNonce !== request.readinessNonce) throw new Error(`worker ${worker.id} readiness nonce mismatch`);
419+
if (worker.readinessNonce && request.readinessNonce !== worker.readinessNonce) throw new Error(`worker ${worker.id} readiness nonce mismatch`);
418420
const task = findTask(state.graph, request.taskId);
419421
const claim = requireActiveClaim(task, request.claimToken, now);
420422
if (claim.workerId !== request.workerId) throw new Error(`task ${task.id} is claimed by ${claim.workerId}`);
423+
if (state.dispatches.some((dispatch) => dispatch.taskId === task.id && dispatch.claimToken === request.claimToken)) throw new Error(`task ${task.id} claim is already dispatched`);
421424
const graph = updateTask(state.graph, task.id, (current) => ({ ...current, status: "in_progress" }));
422425
return {
423426
...state,
@@ -464,9 +467,12 @@ function assertValidGraph(graph: TaskGraph): void {
464467
}
465468

466469
function validateWorkerReport(state: WorkerExecutionState, report: WorkerTaskReport): void {
467-
parseTimestamp(report.reportedAt);
470+
const reportedAt = parseTimestamp(report.reportedAt);
468471
if (!report.summary.trim()) throw new Error(`task ${report.taskId} report requires summary`);
469472
if (report.status === "completed" && report.evidenceRefs.length === 0) throw new Error(`task ${report.taskId} report requires evidence refs`);
473+
const task = findTask(state.graph, report.taskId);
474+
const claim = requireActiveClaim(task, report.claimToken, reportedAt);
475+
if (claim.workerId !== report.workerId) throw new Error(`task ${report.taskId} is claimed by ${claim.workerId}`);
470476
const dispatch = state.dispatches.find((item) => item.taskId === report.taskId && item.workerId === report.workerId && item.claimToken === report.claimToken);
471477
if (!dispatch) throw new Error(`task ${report.taskId} was not dispatched to worker ${report.workerId}`);
472478
}
@@ -588,6 +594,10 @@ function findCycleTaskId(graph: TaskGraph): string | undefined {
588594
return undefined;
589595
}
590596

597+
function requireText(value: string, label: string): void {
598+
if (!value.trim()) throw new Error(`${label} is required`);
599+
}
600+
591601
function parseTimestamp(value: string): number {
592602
const parsed = Date.parse(value);
593603
if (!Number.isFinite(parsed)) throw new Error(`invalid timestamp: ${value}`);

test/runtime-state.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ test("run contract runtime refs expose objective, policy, and evaluation artifac
4848
});
4949

5050
test("runtime state can represent successful and repair-required examples", () => {
51-
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }], workers: [{ id: "w1", adapterId: "local", status: "closed", readinessNonce: "nonce", lastHeartbeatAt: now }] };
51+
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }, { id: "parallel", title: "Parallel slice", status: "in_progress", dependsOn: [], evidenceRefs: [], evaluationRefs: [] }], workers: [{ id: "w1", adapterId: "local", status: "closed", readinessNonce: "nonce", lastHeartbeatAt: now }] };
5252
const repair: RuntimeState = { ...state(), status: "blocked", tasks: [{ id: "build", title: "Build slice", status: "repair_required", dependsOn: [], evidenceRefs: [], evaluationRefs: [{ kind: "evaluation", artifactId: "eval", path: "evaluation.json" }] }], workers: [{ id: "w2", adapterId: "local", status: "cleanup-released" }], integrationCandidates: [{ id: "candidate", taskIds: ["build"], status: "repair_required", evidenceRefs: [] }] };
5353
assert.equal(parseRuntimeState(success).ok, true);
5454
assert.equal(parseRuntimeState(repair).ok, true);

test/task-graph.test.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,18 +223,14 @@ test("failed dependencies block downstream tasks unless a repair supersedes them
223223
});
224224

225225
test("worker execution dispatches claimed parallel tasks and completes only from evidence reports", () => {
226-
const graph = {
227-
tasks: [
228-
{ id: "branch-a", status: "ready" as const, dependencies: [] },
229-
{ id: "branch-b", status: "ready" as const, dependencies: [] },
230-
],
231-
};
226+
const graph = { tasks: [{ id: "branch-a", status: "ready" as const, dependencies: [] }, { id: "branch-b", status: "ready" as const, dependencies: [] }] };
232227
const claimedA = claimTask(graph, { taskId: "branch-a", workerId: "w1", token: "tok-a", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
233228
const claimed = claimTask(claimedA, { taskId: "branch-b", workerId: "w2", token: "tok-b", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
234229
const workers = recordWorkerReady(recordWorkerReady([], { workerId: "w1", now: "2026-01-01T00:00:01.000Z", readinessNonce: "n1" }), { workerId: "w2", now: "2026-01-01T00:00:01.000Z", readinessNonce: "n2" });
235230
let state: WorkerExecutionState = { graph: claimed, workers, dispatches: [], reports: [] };
236231

237232
assert.throws(() => dispatchClaimedTask(state, { taskId: "branch-a", workerId: "w1", claimToken: "tok-a", now: "2026-01-01T00:00:02.000Z", readinessNonce: "wrong" }), /nonce/);
233+
assert.throws(() => dispatchClaimedTask(state, { taskId: "branch-a", workerId: "w1", claimToken: "tok-a", now: "2026-01-01T00:00:02.000Z" }), /nonce/);
238234
state = dispatchClaimedTask(state, { taskId: "branch-a", workerId: "w1", claimToken: "tok-a", now: "2026-01-01T00:00:02.000Z", readinessNonce: "n1" });
239235
state = dispatchClaimedTask(state, { taskId: "branch-b", workerId: "w2", claimToken: "tok-b", now: "2026-01-01T00:00:02.000Z", readinessNonce: "n2" });
240236
assert.deepEqual(state.graph.tasks.map((task) => task.status), ["in_progress", "in_progress"]);
@@ -258,6 +254,24 @@ test("worker execution dispatches claimed parallel tasks and completes only from
258254
assert.equal(stale.find((worker) => worker.id === "w2")?.state, "unhealthy");
259255
});
260256

257+
test("stale worker reports cannot mutate expired or recovered claims", () => {
258+
const state: WorkerExecutionState = {
259+
graph: { tasks: [{ id: "build", status: "in_progress" as const, dependencies: [], assignedWorker: "w1", claim: { token: "old", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", expiresAt: "2026-01-01T00:10:00.000Z" } }] },
260+
workers: [{ id: "w1", state: "busy", lastHeartbeatAt: "2026-01-01T00:01:00.000Z" }],
261+
dispatches: [{ taskId: "build", workerId: "w1", claimToken: "old", dispatchedAt: "2026-01-01T00:01:00.000Z" }],
262+
reports: [],
263+
};
264+
const report = { taskId: "build", workerId: "w1", claimToken: "old", status: "completed" as const, summary: "late", evidenceRefs: ["verify.md#late"], reportedAt: "2026-01-01T00:11:00.000Z" };
265+
266+
assert.throws(() => captureWorkerReport(state, report), /expired/);
267+
assert.equal(state.graph.tasks[0]?.status, "in_progress");
268+
assert.equal(state.workers[0]?.state, "busy");
269+
assert.deepEqual(state.reports, []);
270+
271+
const recovered = recoverExpiredClaim(state.graph, { taskId: "build", workerId: "w2", token: "new", now: "2026-01-01T00:11:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
272+
assert.throws(() => captureWorkerReport({ ...state, graph: recovered }, { ...report, reportedAt: "2026-01-01T00:12:00.000Z" }), /matching claim token/);
273+
});
274+
261275
test("worker heartbeat stale detection and terminal retention are modeled", () => {
262276
const workers = [
263277
{ id: "fresh", state: "busy" as const, lastHeartbeatAt: "2026-01-01T00:04:30.000Z" },

0 commit comments

Comments
 (0)