Skip to content

Commit 76788d3

Browse files
committed
feat: add worker execution state
1 parent ed7ed13 commit 76788d3

5 files changed

Lines changed: 172 additions & 6 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,15 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio
148148

149149
### Graph Execution Runtime Boundary
150150

151-
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.
151+
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, worker dispatch state, heartbeat/staleness transitions, structured worker reports, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.
152152

153153
Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed.
154154

155155
Agent execution stays adapter-neutral. The `AgentAdapterContract` describes required launch/send/capture/health/readiness/report/substrate behavior for Pi, Codex, and Claude Code compatible workers without coupling the domain layer to any one CLI. Readiness requires a nonce-equivalent proof, worker reports require `taskId`, `status`, `evidenceRefs`, and `summary`, and health can be supported or best-effort but not absent.
156156

157157
The runtime state schema is separately versioned as `RuntimeState.schemaVersion: 1`. It defines additive boundaries for RunObjective, PolicySelection, TaskGraph refs, WorkerState, EvidenceRef, EvaluationResult, RewardRecord, and IntegrationCandidate data; unknown newer versions fail closed, and RunContract-facing artifact refs expose only objective, policy-selection, and evaluation artifacts.
158158

159-
The domain boundary is explicit: `Decomposer` creates the concrete graph, `Scheduler` computes ready tasks, `WorkerRuntime` dispatches and heartbeats work, `Verifier` validates evidence refs, and `GateEngine` decides transitions.
159+
The domain boundary is explicit: `Decomposer` creates the concrete graph, `Scheduler` computes ready tasks, `WorkerRuntime` dispatches and heartbeats work, `WorkerExecutionState` records dispatch/report progress for claimed tasks, `Verifier` validates evidence refs, and `GateEngine` decides transitions.
160160

161161
Runtime events are append-only `schemaVersion: 1` envelopes with monotonic `seq`, stable `eventId`, `idempotencyKey`, `type`, `category`, timestamp, run id, and a typed payload. The event taxonomy covers run lifecycle, objective/policy decisions, graph readiness, claim/lease transitions, worker readiness/heartbeat/retention/safe-close, evidence, evaluation, repair, integration, and reward records. Replay applies events by ascending sequence into derived state; duplicate event ids must be byte-equivalent, stale leases and missing workers recover only through explicit events, and `run.sealed` is terminal.
162162

src/domain/runtime-state.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ export const RUNTIME_STATE_SCHEMA_VERSION = 1;
22

33
export type RuntimeStatus = "draft" | "active" | "blocked" | "failed" | "sealed";
44
export type RuntimeTaskStatus = "pending" | "ready" | "claimed" | "verifying" | "completed" | "blocked" | "failed" | "repair_required";
5-
export type RuntimeWorkerStatus = "ready" | "busy" | "unhealthy" | "completed-retained" | "safe-to-close" | "stale-registry";
5+
export type RuntimeWorkerStatus = "ready" | "busy" | "unhealthy" | "completed-retained" | "safe-to-close" | "stale-registry" | "cleanup-released" | "closed";
66
export type RuntimeArtifactKind = "run-objective" | "policy-selection" | "task-graph" | "worker-state" | "evidence" | "evaluation" | "reward" | "integration-candidate" | "final-report";
77
export type EvaluationVerdict = "pass" | "fail" | "repair" | "blocked";
88
export type IntegrationCandidateStatus = "pending" | "accepted" | "rejected" | "repair_required";
@@ -36,7 +36,7 @@ export type RuntimeStateParseResult = { ok: true; state: RuntimeState } | { ok:
3636

3737
const runtimeStatuses = new Set<RuntimeStatus>(["draft", "active", "blocked", "failed", "sealed"]);
3838
const taskStatuses = new Set<RuntimeTaskStatus>(["pending", "ready", "claimed", "verifying", "completed", "blocked", "failed", "repair_required"]);
39-
const workerStatuses = new Set<RuntimeWorkerStatus>(["ready", "busy", "unhealthy", "completed-retained", "safe-to-close", "stale-registry"]);
39+
const workerStatuses = new Set<RuntimeWorkerStatus>(["ready", "busy", "unhealthy", "completed-retained", "safe-to-close", "stale-registry", "cleanup-released", "closed"]);
4040
const artifactKinds = new Set<RuntimeArtifactKind>(["run-objective", "policy-selection", "task-graph", "worker-state", "evidence", "evaluation", "reward", "integration-candidate", "final-report"]);
4141
const evaluationVerdicts = new Set<EvaluationVerdict>(["pass", "fail", "repair", "blocked"]);
4242
const integrationStatuses = new Set<IntegrationCandidateStatus>(["pending", "accepted", "rejected", "repair_required"]);

src/domain/task-graph.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,34 @@ export interface WorkerRuntimeRef {
167167
id: string;
168168
state: WorkerRuntimeState;
169169
lastHeartbeatAt?: string;
170+
readinessNonce?: string;
171+
}
172+
173+
export type WorkerTaskReportStatus = "completed" | "failed" | "repair_required" | "blocked";
174+
175+
export interface WorkerTaskDispatch {
176+
taskId: string;
177+
workerId: string;
178+
claimToken: string;
179+
dispatchedAt: string;
180+
readinessNonce?: string;
181+
}
182+
183+
export interface WorkerTaskReport {
184+
taskId: string;
185+
workerId: string;
186+
claimToken: string;
187+
status: WorkerTaskReportStatus;
188+
summary: string;
189+
evidenceRefs: string[];
190+
reportedAt: string;
191+
}
192+
193+
export interface WorkerExecutionState {
194+
graph: TaskGraph;
195+
workers: WorkerRuntimeRef[];
196+
dispatches: WorkerTaskDispatch[];
197+
reports: WorkerTaskReport[];
170198
}
171199

172200
export function validateTaskGraph(graph: TaskGraph): TaskGraphValidationIssue[] {
@@ -362,6 +390,66 @@ export function getStaleWorkerIds(workers: WorkerRuntimeRef[], options: { now: s
362390
.map((worker) => worker.id);
363391
}
364392

393+
export function recordWorkerReady(workers: WorkerRuntimeRef[], input: { workerId: string; now: string; readinessNonce?: string }): WorkerRuntimeRef[] {
394+
parseTimestamp(input.now);
395+
return upsertWorker(workers, {
396+
id: input.workerId,
397+
state: "ready",
398+
lastHeartbeatAt: input.now,
399+
...(input.readinessNonce ? { readinessNonce: input.readinessNonce } : {}),
400+
});
401+
}
402+
403+
export function recordWorkerHeartbeat(workers: WorkerRuntimeRef[], input: { workerId: string; now: string }): WorkerRuntimeRef[] {
404+
parseTimestamp(input.now);
405+
return updateWorker(workers, input.workerId, (worker) => ({ ...worker, lastHeartbeatAt: input.now }));
406+
}
407+
408+
export function markMissingHeartbeats(workers: WorkerRuntimeRef[], options: { now: string; heartbeatTimeoutSeconds: number }): WorkerRuntimeRef[] {
409+
const stale = new Set(getStaleWorkerIds(workers, options));
410+
return workers.map((worker) => (stale.has(worker.id) ? { ...worker, state: "unhealthy" } : worker));
411+
}
412+
413+
export function dispatchClaimedTask(state: WorkerExecutionState, request: { taskId: string; workerId: string; claimToken: string; now: string; readinessNonce?: string }): WorkerExecutionState {
414+
const now = parseTimestamp(request.now);
415+
const worker = findWorker(state.workers, request.workerId);
416+
if (worker.state !== "ready") throw new Error(`worker ${worker.id} is not ready`);
417+
if (request.readinessNonce && worker.readinessNonce !== request.readinessNonce) throw new Error(`worker ${worker.id} readiness nonce mismatch`);
418+
const task = findTask(state.graph, request.taskId);
419+
const claim = requireActiveClaim(task, request.claimToken, now);
420+
if (claim.workerId !== request.workerId) throw new Error(`task ${task.id} is claimed by ${claim.workerId}`);
421+
const graph = updateTask(state.graph, task.id, (current) => ({ ...current, status: "in_progress" }));
422+
return {
423+
...state,
424+
graph,
425+
workers: updateWorker(state.workers, worker.id, (current) => ({ ...current, state: "busy", lastHeartbeatAt: request.now })),
426+
dispatches: [...state.dispatches, { taskId: task.id, workerId: worker.id, claimToken: request.claimToken, dispatchedAt: request.now, ...(request.readinessNonce ? { readinessNonce: request.readinessNonce } : {}) }],
427+
};
428+
}
429+
430+
export function captureWorkerReport(state: WorkerExecutionState, report: WorkerTaskReport): WorkerExecutionState {
431+
validateWorkerReport(state, report);
432+
const nextStatus = report.status === "completed" ? "verifying" : report.status;
433+
return {
434+
...state,
435+
graph: updateTask(state.graph, report.taskId, (task) => ({ ...task, status: nextStatus, evidenceRefs: [...report.evidenceRefs] })),
436+
workers: updateWorker(state.workers, report.workerId, (worker) => ({ ...worker, state: report.status === "completed" ? "completed-retained" : "unhealthy", lastHeartbeatAt: report.reportedAt })),
437+
reports: [...state.reports, { ...report, evidenceRefs: [...report.evidenceRefs] }],
438+
};
439+
}
440+
441+
export function completeTaskFromWorkerReport(state: WorkerExecutionState, request: { taskId: string; workerId: string; now: string; releaseWorker?: boolean }): WorkerExecutionState {
442+
const report = [...state.reports].reverse().find((item) => item.taskId === request.taskId && item.workerId === request.workerId);
443+
if (!report) throw new Error(`task ${request.taskId} has no worker report`);
444+
if (report.status !== "completed") throw new Error(`task ${request.taskId} report is not completed`);
445+
if (report.evidenceRefs.length === 0) throw new Error(`task ${request.taskId} report requires evidence refs`);
446+
return {
447+
...state,
448+
graph: completeTask(state.graph, { taskId: request.taskId, claimToken: report.claimToken, evidenceRefs: report.evidenceRefs, now: request.now }),
449+
workers: updateWorker(state.workers, request.workerId, (worker) => markWorkerTerminal(worker, { retained: !request.releaseWorker })),
450+
};
451+
}
452+
365453
export function markWorkerStale(worker: WorkerRuntimeRef): WorkerRuntimeRef {
366454
return { ...worker, state: "stale-registry" };
367455
}
@@ -375,6 +463,41 @@ function assertValidGraph(graph: TaskGraph): void {
375463
if (issues.length > 0) throw new Error(`invalid task graph: ${issues.map((issue) => issue.code).join(", ")}`);
376464
}
377465

466+
function validateWorkerReport(state: WorkerExecutionState, report: WorkerTaskReport): void {
467+
parseTimestamp(report.reportedAt);
468+
if (!report.summary.trim()) throw new Error(`task ${report.taskId} report requires summary`);
469+
if (report.status === "completed" && report.evidenceRefs.length === 0) throw new Error(`task ${report.taskId} report requires evidence refs`);
470+
const dispatch = state.dispatches.find((item) => item.taskId === report.taskId && item.workerId === report.workerId && item.claimToken === report.claimToken);
471+
if (!dispatch) throw new Error(`task ${report.taskId} was not dispatched to worker ${report.workerId}`);
472+
}
473+
474+
function findTask(graph: TaskGraph, taskId: string): TaskNode {
475+
const task = graph.tasks.find((item) => item.id === taskId);
476+
if (!task) throw new Error(`unknown task ${taskId}`);
477+
return task;
478+
}
479+
480+
function findWorker(workers: WorkerRuntimeRef[], workerId: string): WorkerRuntimeRef {
481+
const worker = workers.find((item) => item.id === workerId);
482+
if (!worker) throw new Error(`unknown worker ${workerId}`);
483+
return worker;
484+
}
485+
486+
function upsertWorker(workers: WorkerRuntimeRef[], worker: WorkerRuntimeRef): WorkerRuntimeRef[] {
487+
return workers.some((item) => item.id === worker.id) ? updateWorker(workers, worker.id, () => worker) : [...workers, worker];
488+
}
489+
490+
function updateWorker(workers: WorkerRuntimeRef[], workerId: string, update: (worker: WorkerRuntimeRef) => WorkerRuntimeRef): WorkerRuntimeRef[] {
491+
let found = false;
492+
const next = workers.map((worker) => {
493+
if (worker.id !== workerId) return worker;
494+
found = true;
495+
return update(worker);
496+
});
497+
if (!found) throw new Error(`unknown worker ${workerId}`);
498+
return next;
499+
}
500+
378501
function computeFailedBlockersByTask(graph: TaskGraph, failed: Set<string>): Map<string, string[]> {
379502
const tasksById = new Map(graph.tasks.map((task) => [task.id, task]));
380503
const memo = new Map<string, string[]>();

test/runtime-state.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ test("run contract runtime refs expose objective, policy, and evaluation artifac
4848
});
4949

5050
test("runtime state can represent successful and repair-required examples", () => {
51-
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }] };
52-
const repair: RuntimeState = { ...state(), status: "blocked", tasks: [{ id: "build", title: "Build slice", status: "repair_required", dependsOn: [], evidenceRefs: [], evaluationRefs: [{ kind: "evaluation", artifactId: "eval", path: "evaluation.json" }] }], integrationCandidates: [{ id: "candidate", taskIds: ["build"], status: "repair_required", evidenceRefs: [] }] };
51+
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }], workers: [{ id: "w1", adapterId: "local", status: "closed", readinessNonce: "nonce", lastHeartbeatAt: now }] };
52+
const repair: RuntimeState = { ...state(), status: "blocked", tasks: [{ id: "build", title: "Build slice", status: "repair_required", dependsOn: [], evidenceRefs: [], evaluationRefs: [{ kind: "evaluation", artifactId: "eval", path: "evaluation.json" }] }], workers: [{ id: "w2", adapterId: "local", status: "cleanup-released" }], integrationCandidates: [{ id: "candidate", taskIds: ["build"], status: "repair_required", evidenceRefs: [] }] };
5353
assert.equal(parseRuntimeState(success).ok, true);
5454
assert.equal(parseRuntimeState(repair).ok, true);
5555
});

test/task-graph.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,32 @@
11
import assert from "node:assert/strict";
22
import { test } from "node:test";
33
import {
4+
captureWorkerReport,
45
claimTask,
56
completeTask,
7+
completeTaskFromWorkerReport,
68
computeBlockedTaskIds,
79
computeReadyTaskIds,
810
computeTaskReadiness,
911
createClaimLease,
1012
createClaimLeaseRuntimeEvent,
1113
createClaimLeaseToken,
1214
createTaskGraphRuntimeEvents,
15+
dispatchClaimedTask,
1316
getStaleWorkerIds,
17+
markMissingHeartbeats,
1418
markWorkerStale,
1519
markWorkerTerminal,
1620
projectTaskGraphStatus,
1721
recoverExpiredClaim,
22+
recordWorkerHeartbeat,
23+
recordWorkerReady,
1824
releaseClaim,
1925
renewClaimLease,
2026
topologicalTaskIds,
2127
transitionReadyTasks,
2228
validateTaskGraph,
29+
type WorkerExecutionState,
2330
} from "../src/domain/task-graph.js";
2431

2532
test("task graph readiness waits for completed dependencies", () => {
@@ -215,6 +222,42 @@ test("failed dependencies block downstream tasks unless a repair supersedes them
215222
});
216223
});
217224

225+
test("worker execution dispatches claimed parallel tasks and completes only from evidence reports", () => {
226+
const graph = {
227+
tasks: [
228+
{ id: "branch-a", status: "ready" as const, dependencies: [] },
229+
{ id: "branch-b", status: "ready" as const, dependencies: [] },
230+
],
231+
};
232+
const claimedA = claimTask(graph, { taskId: "branch-a", workerId: "w1", token: "tok-a", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
233+
const claimed = claimTask(claimedA, { taskId: "branch-b", workerId: "w2", token: "tok-b", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
234+
const workers = recordWorkerReady(recordWorkerReady([], { workerId: "w1", now: "2026-01-01T00:00:01.000Z", readinessNonce: "n1" }), { workerId: "w2", now: "2026-01-01T00:00:01.000Z", readinessNonce: "n2" });
235+
let state: WorkerExecutionState = { graph: claimed, workers, dispatches: [], reports: [] };
236+
237+
assert.throws(() => dispatchClaimedTask(state, { taskId: "branch-a", workerId: "w1", claimToken: "tok-a", now: "2026-01-01T00:00:02.000Z", readinessNonce: "wrong" }), /nonce/);
238+
state = dispatchClaimedTask(state, { taskId: "branch-a", workerId: "w1", claimToken: "tok-a", now: "2026-01-01T00:00:02.000Z", readinessNonce: "n1" });
239+
state = dispatchClaimedTask(state, { taskId: "branch-b", workerId: "w2", claimToken: "tok-b", now: "2026-01-01T00:00:02.000Z", readinessNonce: "n2" });
240+
assert.deepEqual(state.graph.tasks.map((task) => task.status), ["in_progress", "in_progress"]);
241+
assert.deepEqual(state.workers.map((worker) => worker.state), ["busy", "busy"]);
242+
243+
state = { ...state, workers: recordWorkerHeartbeat(state.workers, { workerId: "w1", now: "2026-01-01T00:01:00.000Z" }) };
244+
assert.equal(state.workers[0]?.lastHeartbeatAt, "2026-01-01T00:01:00.000Z");
245+
assert.throws(
246+
() => captureWorkerReport(state, { taskId: "branch-a", workerId: "w1", claimToken: "tok-a", status: "completed", summary: "done", evidenceRefs: [], reportedAt: "2026-01-01T00:02:00.000Z" }),
247+
/evidence refs/,
248+
);
249+
250+
state = captureWorkerReport(state, { taskId: "branch-a", workerId: "w1", claimToken: "tok-a", status: "completed", summary: "done", evidenceRefs: ["logs/a.txt", "verify/a.md"], reportedAt: "2026-01-01T00:02:00.000Z" });
251+
assert.equal(state.graph.tasks[0]?.status, "verifying");
252+
state = completeTaskFromWorkerReport(state, { taskId: "branch-a", workerId: "w1", now: "2026-01-01T00:03:00.000Z", releaseWorker: true });
253+
assert.equal(state.graph.tasks[0]?.status, "completed");
254+
assert.deepEqual(state.graph.tasks[0]?.evidenceRefs, ["logs/a.txt", "verify/a.md"]);
255+
assert.equal(state.workers[0]?.state, "safe-to-close");
256+
257+
const stale = markMissingHeartbeats(state.workers, { now: "2026-01-01T00:30:00.000Z", heartbeatTimeoutSeconds: 60 });
258+
assert.equal(stale.find((worker) => worker.id === "w2")?.state, "unhealthy");
259+
});
260+
218261
test("worker heartbeat stale detection and terminal retention are modeled", () => {
219262
const workers = [
220263
{ id: "fresh", state: "busy" as const, lastHeartbeatAt: "2026-01-01T00:04:30.000Z" },

0 commit comments

Comments
 (0)