Skip to content

Commit d0c8d21

Browse files
authored
feat: add worker execution state (#234)
* feat: add worker execution state * fix: reject stale worker reports * fix: require claimed status before dispatch * fix: reject terminal worker reports --------- Co-authored-by: devkade <devkade@users.noreply.github.com>
1 parent ed7ed13 commit d0c8d21

5 files changed

Lines changed: 217 additions & 8 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,15 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio
148148

149149
### Graph Execution Runtime Boundary
150150

151-
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.
151+
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, worker dispatch state, heartbeat/staleness transitions, structured worker reports, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.
152152

153153
Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed.
154154

155155
Agent execution stays adapter-neutral. The `AgentAdapterContract` describes required launch/send/capture/health/readiness/report/substrate behavior for Pi, Codex, and Claude Code compatible workers without coupling the domain layer to any one CLI. Readiness requires a nonce-equivalent proof, worker reports require `taskId`, `status`, `evidenceRefs`, and `summary`, and health can be supported or best-effort but not absent.
156156

157157
The runtime state schema is separately versioned as `RuntimeState.schemaVersion: 1`. It defines additive boundaries for RunObjective, PolicySelection, TaskGraph refs, WorkerState, EvidenceRef, EvaluationResult, RewardRecord, and IntegrationCandidate data; unknown newer versions fail closed, and RunContract-facing artifact refs expose only objective, policy-selection, and evaluation artifacts.
158158

159-
The domain boundary is explicit: `Decomposer` creates the concrete graph, `Scheduler` computes ready tasks, `WorkerRuntime` dispatches and heartbeats work, `Verifier` validates evidence refs, and `GateEngine` decides transitions.
159+
The domain boundary is explicit: `Decomposer` creates the concrete graph, `Scheduler` computes ready tasks, `WorkerRuntime` dispatches and heartbeats work, `WorkerExecutionState` records dispatch/report progress for claimed tasks, `Verifier` validates evidence refs, and `GateEngine` decides transitions.
160160

161161
Runtime events are append-only `schemaVersion: 1` envelopes with monotonic `seq`, stable `eventId`, `idempotencyKey`, `type`, `category`, timestamp, run id, and a typed payload. The event taxonomy covers run lifecycle, objective/policy decisions, graph readiness, claim/lease transitions, worker readiness/heartbeat/retention/safe-close, evidence, evaluation, repair, integration, and reward records. Replay applies events by ascending sequence into derived state; duplicate event ids must be byte-equivalent, stale leases and missing workers recover only through explicit events, and `run.sealed` is terminal.
162162

src/domain/runtime-state.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
export const RUNTIME_STATE_SCHEMA_VERSION = 1;
22

33
export type RuntimeStatus = "draft" | "active" | "blocked" | "failed" | "sealed";
4-
export type RuntimeTaskStatus = "pending" | "ready" | "claimed" | "verifying" | "completed" | "blocked" | "failed" | "repair_required";
5-
export type RuntimeWorkerStatus = "ready" | "busy" | "unhealthy" | "completed-retained" | "safe-to-close" | "stale-registry";
4+
export type RuntimeTaskStatus = "pending" | "ready" | "claimed" | "in_progress" | "verifying" | "completed" | "blocked" | "failed" | "repair_required";
5+
export type RuntimeWorkerStatus = "ready" | "busy" | "unhealthy" | "completed-retained" | "safe-to-close" | "stale-registry" | "cleanup-released" | "closed";
66
export type RuntimeArtifactKind = "run-objective" | "policy-selection" | "task-graph" | "worker-state" | "evidence" | "evaluation" | "reward" | "integration-candidate" | "final-report";
77
export type EvaluationVerdict = "pass" | "fail" | "repair" | "blocked";
88
export type IntegrationCandidateStatus = "pending" | "accepted" | "rejected" | "repair_required";
@@ -35,8 +35,8 @@ export interface RuntimeState {
3535
export type RuntimeStateParseResult = { ok: true; state: RuntimeState } | { ok: false; reason: "malformed" | "unsupported-newer-schema"; issues: string[] };
3636

3737
const runtimeStatuses = new Set<RuntimeStatus>(["draft", "active", "blocked", "failed", "sealed"]);
38-
const taskStatuses = new Set<RuntimeTaskStatus>(["pending", "ready", "claimed", "verifying", "completed", "blocked", "failed", "repair_required"]);
39-
const workerStatuses = new Set<RuntimeWorkerStatus>(["ready", "busy", "unhealthy", "completed-retained", "safe-to-close", "stale-registry"]);
38+
const taskStatuses = new Set<RuntimeTaskStatus>(["pending", "ready", "claimed", "in_progress", "verifying", "completed", "blocked", "failed", "repair_required"]);
39+
const workerStatuses = new Set<RuntimeWorkerStatus>(["ready", "busy", "unhealthy", "completed-retained", "safe-to-close", "stale-registry", "cleanup-released", "closed"]);
4040
const artifactKinds = new Set<RuntimeArtifactKind>(["run-objective", "policy-selection", "task-graph", "worker-state", "evidence", "evaluation", "reward", "integration-candidate", "final-report"]);
4141
const evaluationVerdicts = new Set<EvaluationVerdict>(["pass", "fail", "repair", "blocked"]);
4242
const integrationStatuses = new Set<IntegrationCandidateStatus>(["pending", "accepted", "rejected", "repair_required"]);

src/domain/task-graph.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,34 @@ export interface WorkerRuntimeRef {
167167
id: string;
168168
state: WorkerRuntimeState;
169169
lastHeartbeatAt?: string;
170+
readinessNonce?: string;
171+
}
172+
173+
export type WorkerTaskReportStatus = "completed" | "failed" | "repair_required" | "blocked";
174+
175+
export interface WorkerTaskDispatch {
176+
taskId: string;
177+
workerId: string;
178+
claimToken: string;
179+
dispatchedAt: string;
180+
readinessNonce?: string;
181+
}
182+
183+
export interface WorkerTaskReport {
184+
taskId: string;
185+
workerId: string;
186+
claimToken: string;
187+
status: WorkerTaskReportStatus;
188+
summary: string;
189+
evidenceRefs: string[];
190+
reportedAt: string;
191+
}
192+
193+
export interface WorkerExecutionState {
194+
graph: TaskGraph;
195+
workers: WorkerRuntimeRef[];
196+
dispatches: WorkerTaskDispatch[];
197+
reports: WorkerTaskReport[];
170198
}
171199

172200
export function validateTaskGraph(graph: TaskGraph): TaskGraphValidationIssue[] {
@@ -362,6 +390,70 @@ export function getStaleWorkerIds(workers: WorkerRuntimeRef[], options: { now: s
362390
.map((worker) => worker.id);
363391
}
364392

393+
export function recordWorkerReady(workers: WorkerRuntimeRef[], input: { workerId: string; now: string; readinessNonce?: string }): WorkerRuntimeRef[] {
394+
requireText(input.workerId, "workerId");
395+
if (input.readinessNonce !== undefined) requireText(input.readinessNonce, "readinessNonce");
396+
parseTimestamp(input.now);
397+
return upsertWorker(workers, {
398+
id: input.workerId,
399+
state: "ready",
400+
lastHeartbeatAt: input.now,
401+
...(input.readinessNonce ? { readinessNonce: input.readinessNonce } : {}),
402+
});
403+
}
404+
405+
export function recordWorkerHeartbeat(workers: WorkerRuntimeRef[], input: { workerId: string; now: string }): WorkerRuntimeRef[] {
406+
parseTimestamp(input.now);
407+
return updateWorker(workers, input.workerId, (worker) => ({ ...worker, lastHeartbeatAt: input.now }));
408+
}
409+
410+
export function markMissingHeartbeats(workers: WorkerRuntimeRef[], options: { now: string; heartbeatTimeoutSeconds: number }): WorkerRuntimeRef[] {
411+
const stale = new Set(getStaleWorkerIds(workers, options));
412+
return workers.map((worker) => (stale.has(worker.id) ? { ...worker, state: "unhealthy" } : worker));
413+
}
414+
415+
export function dispatchClaimedTask(state: WorkerExecutionState, request: { taskId: string; workerId: string; claimToken: string; now: string; readinessNonce?: string }): WorkerExecutionState {
416+
const now = parseTimestamp(request.now);
417+
const worker = findWorker(state.workers, request.workerId);
418+
if (worker.state !== "ready") throw new Error(`worker ${worker.id} is not ready`);
419+
if (worker.readinessNonce && request.readinessNonce !== worker.readinessNonce) throw new Error(`worker ${worker.id} readiness nonce mismatch`);
420+
const task = findTask(state.graph, request.taskId);
421+
if (task.status !== "claimed") throw new Error(`task ${task.id} is not claimed`);
422+
const claim = requireActiveClaim(task, request.claimToken, now);
423+
if (claim.workerId !== request.workerId) throw new Error(`task ${task.id} is claimed by ${claim.workerId}`);
424+
if (state.dispatches.some((dispatch) => dispatch.taskId === task.id && dispatch.claimToken === request.claimToken)) throw new Error(`task ${task.id} claim is already dispatched`);
425+
const graph = updateTask(state.graph, task.id, (current) => ({ ...current, status: "in_progress" }));
426+
return {
427+
...state,
428+
graph,
429+
workers: updateWorker(state.workers, worker.id, (current) => ({ ...current, state: "busy", lastHeartbeatAt: request.now })),
430+
dispatches: [...state.dispatches, { taskId: task.id, workerId: worker.id, claimToken: request.claimToken, dispatchedAt: request.now, ...(request.readinessNonce ? { readinessNonce: request.readinessNonce } : {}) }],
431+
};
432+
}
433+
434+
export function captureWorkerReport(state: WorkerExecutionState, report: WorkerTaskReport): WorkerExecutionState {
435+
validateWorkerReport(state, report);
436+
const nextStatus = report.status === "completed" ? "verifying" : report.status;
437+
return {
438+
...state,
439+
graph: updateTask(state.graph, report.taskId, (task) => ({ ...task, status: nextStatus, evidenceRefs: [...report.evidenceRefs] })),
440+
workers: updateWorker(state.workers, report.workerId, (worker) => ({ ...worker, state: report.status === "completed" ? "completed-retained" : "unhealthy", lastHeartbeatAt: report.reportedAt })),
441+
reports: [...state.reports, { ...report, evidenceRefs: [...report.evidenceRefs] }],
442+
};
443+
}
444+
445+
export function completeTaskFromWorkerReport(state: WorkerExecutionState, request: { taskId: string; workerId: string; now: string; releaseWorker?: boolean }): WorkerExecutionState {
446+
const report = [...state.reports].reverse().find((item) => item.taskId === request.taskId && item.workerId === request.workerId);
447+
if (!report) throw new Error(`task ${request.taskId} has no worker report`);
448+
if (report.status !== "completed") throw new Error(`task ${request.taskId} report is not completed`);
449+
if (report.evidenceRefs.length === 0) throw new Error(`task ${request.taskId} report requires evidence refs`);
450+
return {
451+
...state,
452+
graph: completeTask(state.graph, { taskId: request.taskId, claimToken: report.claimToken, evidenceRefs: report.evidenceRefs, now: request.now }),
453+
workers: updateWorker(state.workers, request.workerId, (worker) => markWorkerTerminal(worker, { retained: !request.releaseWorker })),
454+
};
455+
}
456+
365457
export function markWorkerStale(worker: WorkerRuntimeRef): WorkerRuntimeRef {
366458
return { ...worker, state: "stale-registry" };
367459
}
@@ -375,6 +467,45 @@ function assertValidGraph(graph: TaskGraph): void {
375467
if (issues.length > 0) throw new Error(`invalid task graph: ${issues.map((issue) => issue.code).join(", ")}`);
376468
}
377469

470+
function validateWorkerReport(state: WorkerExecutionState, report: WorkerTaskReport): void {
471+
const reportedAt = parseTimestamp(report.reportedAt);
472+
if (!report.summary.trim()) throw new Error(`task ${report.taskId} report requires summary`);
473+
if (report.status === "completed" && report.evidenceRefs.length === 0) throw new Error(`task ${report.taskId} report requires evidence refs`);
474+
const task = findTask(state.graph, report.taskId);
475+
if (task.status !== "in_progress") throw new Error(`task ${report.taskId} is not reportable`);
476+
const claim = requireActiveClaim(task, report.claimToken, reportedAt);
477+
if (claim.workerId !== report.workerId) throw new Error(`task ${report.taskId} is claimed by ${claim.workerId}`);
478+
const dispatch = state.dispatches.find((item) => item.taskId === report.taskId && item.workerId === report.workerId && item.claimToken === report.claimToken);
479+
if (!dispatch) throw new Error(`task ${report.taskId} was not dispatched to worker ${report.workerId}`);
480+
}
481+
482+
function findTask(graph: TaskGraph, taskId: string): TaskNode {
483+
const task = graph.tasks.find((item) => item.id === taskId);
484+
if (!task) throw new Error(`unknown task ${taskId}`);
485+
return task;
486+
}
487+
488+
function findWorker(workers: WorkerRuntimeRef[], workerId: string): WorkerRuntimeRef {
489+
const worker = workers.find((item) => item.id === workerId);
490+
if (!worker) throw new Error(`unknown worker ${workerId}`);
491+
return worker;
492+
}
493+
494+
function upsertWorker(workers: WorkerRuntimeRef[], worker: WorkerRuntimeRef): WorkerRuntimeRef[] {
495+
return workers.some((item) => item.id === worker.id) ? updateWorker(workers, worker.id, () => worker) : [...workers, worker];
496+
}
497+
498+
function updateWorker(workers: WorkerRuntimeRef[], workerId: string, update: (worker: WorkerRuntimeRef) => WorkerRuntimeRef): WorkerRuntimeRef[] {
499+
let found = false;
500+
const next = workers.map((worker) => {
501+
if (worker.id !== workerId) return worker;
502+
found = true;
503+
return update(worker);
504+
});
505+
if (!found) throw new Error(`unknown worker ${workerId}`);
506+
return next;
507+
}
508+
378509
function computeFailedBlockersByTask(graph: TaskGraph, failed: Set<string>): Map<string, string[]> {
379510
const tasksById = new Map(graph.tasks.map((task) => [task.id, task]));
380511
const memo = new Map<string, string[]>();
@@ -465,6 +596,10 @@ function findCycleTaskId(graph: TaskGraph): string | undefined {
465596
return undefined;
466597
}
467598

599+
function requireText(value: string, label: string): void {
600+
if (!value.trim()) throw new Error(`${label} is required`);
601+
}
602+
468603
function parseTimestamp(value: string): number {
469604
const parsed = Date.parse(value);
470605
if (!Number.isFinite(parsed)) throw new Error(`invalid timestamp: ${value}`);

test/runtime-state.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ test("run contract runtime refs expose objective, policy, and evaluation artifac
4848
});
4949

5050
test("runtime state can represent successful and repair-required examples", () => {
51-
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }] };
52-
const repair: RuntimeState = { ...state(), status: "blocked", tasks: [{ id: "build", title: "Build slice", status: "repair_required", dependsOn: [], evidenceRefs: [], evaluationRefs: [{ kind: "evaluation", artifactId: "eval", path: "evaluation.json" }] }], integrationCandidates: [{ id: "candidate", taskIds: ["build"], status: "repair_required", evidenceRefs: [] }] };
51+
const success: RuntimeState = { ...state(), status: "sealed", objective: { id: "objective", goal: "ship MVP", successCriteria: ["tests pass"], constraints: ["no destructive cleanup"] }, tasks: [{ id: "build", title: "Build slice", status: "completed", dependsOn: [], evidenceRefs: [{ kind: "evidence", artifactId: "verify", path: "verify.md" }], evaluationRefs: [] }, { id: "parallel", title: "Parallel slice", status: "in_progress", dependsOn: [], evidenceRefs: [], evaluationRefs: [] }], workers: [{ id: "w1", adapterId: "local", status: "closed", readinessNonce: "nonce", lastHeartbeatAt: now }] };
52+
const repair: RuntimeState = { ...state(), status: "blocked", tasks: [{ id: "build", title: "Build slice", status: "repair_required", dependsOn: [], evidenceRefs: [], evaluationRefs: [{ kind: "evaluation", artifactId: "eval", path: "evaluation.json" }] }], workers: [{ id: "w2", adapterId: "local", status: "cleanup-released" }], integrationCandidates: [{ id: "candidate", taskIds: ["build"], status: "repair_required", evidenceRefs: [] }] };
5353
assert.equal(parseRuntimeState(success).ok, true);
5454
assert.equal(parseRuntimeState(repair).ok, true);
5555
});

0 commit comments

Comments
 (0)