Skip to content

Commit ed7ed13

Browse files
authored
feat: add task claim leases (#233)
Co-authored-by: devkade <devkade@users.noreply.github.com>
1 parent e4b1659 commit ed7ed13

3 files changed

Lines changed: 168 additions & 11 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio
148148

149149
### Graph Execution Runtime Boundary
150150

151-
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claims, leases, evidence expectations, and task-graph/readiness event records.
151+
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claim/lease ownership, stale-claim recovery, evidence expectations, and task-graph/readiness/claim event records.
152152

153153
Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed.
154154

src/domain/task-graph.ts

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ export interface TaskClaim {
44
token: string;
55
workerId: string;
66
expiresAt: string;
7+
claimedAt?: string;
8+
recoveredFromToken?: string;
9+
}
10+
11+
export interface ClaimLease extends TaskClaim {
12+
taskId: string;
13+
claimedAt: string;
714
}
815

916
export interface TaskNode {
@@ -52,7 +59,12 @@ export interface TaskGraphStatusProjection {
5259

5360
export type TaskGraphRuntimeEvent =
5461
| { type: "task_graph.created"; taskIds: string[]; topologicalOrder: string[] }
55-
| { type: "task.ready"; taskId: string; reason: string };
62+
| { type: "task.ready"; taskId: string; reason: string }
63+
| { type: "claim.created"; taskId: string; workerId: string; token: string; expiresAt: string }
64+
| { type: "lease.renewed"; taskId: string; workerId: string; token: string; expiresAt: string }
65+
| { type: "lease.expired"; taskId: string; workerId: string; token: string; expiredAt: string }
66+
| { type: "claim.released"; taskId: string; workerId: string; token: string; releasedAt: string }
67+
| { type: "claim.recovered"; taskId: string; workerId: string; token: string; recoveredFromToken: string; expiresAt: string };
5668

5769
export interface PolicyGraphSketch {
5870
id: string;
@@ -247,28 +259,97 @@ export function computeBlockedTaskIds(graph: TaskGraph): string[] {
247259
return computeTaskReadiness(graph).filter((item) => item.status === "blocked").map((item) => item.taskId);
248260
}
249261

250-
export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token: string; now: string; leaseExpiresAt: string }): TaskGraph {
251-
const now = parseTimestamp(request.now);
252-
const leaseExpiresAt = parseTimestamp(request.leaseExpiresAt);
253-
if (leaseExpiresAt <= now) throw new Error("claim lease must expire after now");
262+
export function createClaimLease(input: { taskId: string; workerId: string; claimedAt: string; leaseExpiresAt: string; token?: string; recoveredFromToken?: string }): ClaimLease {
263+
const claimedAt = parseTimestamp(input.claimedAt);
264+
const leaseExpiresAt = parseTimestamp(input.leaseExpiresAt);
265+
if (leaseExpiresAt <= claimedAt) throw new Error("claim lease must expire after claimedAt");
266+
return {
267+
taskId: input.taskId,
268+
workerId: input.workerId,
269+
claimedAt: input.claimedAt,
270+
expiresAt: input.leaseExpiresAt,
271+
token: input.token ?? createClaimLeaseToken(input),
272+
...(input.recoveredFromToken ? { recoveredFromToken: input.recoveredFromToken } : {}),
273+
};
274+
}
275+
276+
export function createClaimLeaseToken(input: { taskId: string; workerId: string; claimedAt: string; leaseExpiresAt: string }): string {
277+
const source = `${input.taskId}\u001f${input.workerId}\u001f${input.claimedAt}\u001f${input.leaseExpiresAt}`;
278+
let hash = 2166136261;
279+
for (let index = 0; index < source.length; index += 1) {
280+
hash ^= source.charCodeAt(index);
281+
hash = Math.imul(hash, 16777619) >>> 0;
282+
}
283+
return `claim-${hash.toString(36)}`;
284+
}
285+
286+
export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token?: string; now: string; leaseExpiresAt: string }): TaskGraph {
287+
assertValidGraph(graph);
288+
const lease = createClaimLease({ taskId: request.taskId, workerId: request.workerId, claimedAt: request.now, leaseExpiresAt: request.leaseExpiresAt, token: request.token });
254289
return updateTask(graph, request.taskId, (task) => {
255290
if (task.status !== "ready") throw new Error(`task ${task.id} is not ready to claim`);
256-
if (task.claim && parseTimestamp(task.claim.expiresAt) > now) throw new Error(`task ${task.id} already has an active claim`);
291+
if (task.claim) throw new Error(`task ${task.id} already has a claim; use explicit recovery for expired leases`);
257292
return {
258293
...task,
259294
status: "claimed",
260295
assignedWorker: request.workerId,
261296
attempts: (task.attempts ?? 0) + 1,
262-
claim: { token: request.token, workerId: request.workerId, expiresAt: request.leaseExpiresAt },
297+
claim: lease,
298+
};
299+
});
300+
}
301+
302+
export function renewClaimLease(graph: TaskGraph, request: { taskId: string; claimToken: string; now: string; leaseExpiresAt: string }): TaskGraph {
303+
const now = parseTimestamp(request.now);
304+
const leaseExpiresAt = parseTimestamp(request.leaseExpiresAt);
305+
if (leaseExpiresAt <= now) throw new Error("claim lease must expire after now");
306+
return updateTask(graph, request.taskId, (task) => {
307+
const claim = requireActiveClaim(task, request.claimToken, now);
308+
return { ...task, claim: { ...claim, expiresAt: request.leaseExpiresAt } };
309+
});
310+
}
311+
312+
export function releaseClaim(graph: TaskGraph, request: { taskId: string; claimToken: string; now: string }): TaskGraph {
313+
const now = parseTimestamp(request.now);
314+
return updateTask(graph, request.taskId, (task) => {
315+
requireActiveClaim(task, request.claimToken, now);
316+
return { ...task, status: "ready", assignedWorker: undefined, claim: undefined };
317+
});
318+
}
319+
320+
export function recoverExpiredClaim(graph: TaskGraph, request: { taskId: string; workerId: string; now: string; leaseExpiresAt: string; token?: string }): TaskGraph {
321+
const now = parseTimestamp(request.now);
322+
return updateTask(graph, request.taskId, (task) => {
323+
if (!task.claim) throw new Error(`task ${task.id} has no claim to recover`);
324+
if (parseTimestamp(task.claim.expiresAt) > now) throw new Error(`task ${task.id} claim is still active`);
325+
const leaseInput = {
326+
taskId: task.id,
327+
workerId: request.workerId,
328+
claimedAt: request.now,
329+
leaseExpiresAt: request.leaseExpiresAt,
330+
token: request.token,
331+
recoveredFromToken: task.claim.token,
263332
};
333+
const lease = createClaimLease(leaseInput);
334+
return { ...task, status: "claimed", assignedWorker: request.workerId, attempts: (task.attempts ?? 0) + 1, claim: lease };
264335
});
265336
}
266337

338+
export function createClaimLeaseRuntimeEvent(type: "claim.created" | "lease.renewed", lease: ClaimLease): TaskGraphRuntimeEvent;
339+
export function createClaimLeaseRuntimeEvent(type: "lease.expired", lease: ClaimLease, occurredAt: string): TaskGraphRuntimeEvent;
340+
export function createClaimLeaseRuntimeEvent(type: "claim.released", lease: ClaimLease, occurredAt: string): TaskGraphRuntimeEvent;
341+
export function createClaimLeaseRuntimeEvent(type: "claim.recovered", lease: ClaimLease): TaskGraphRuntimeEvent;
342+
export function createClaimLeaseRuntimeEvent(type: "claim.created" | "lease.renewed" | "lease.expired" | "claim.released" | "claim.recovered", lease: ClaimLease, occurredAt?: string): TaskGraphRuntimeEvent {
343+
if (type === "lease.expired") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, expiredAt: occurredAt ?? lease.expiresAt };
344+
if (type === "claim.released") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, releasedAt: occurredAt ?? lease.expiresAt };
345+
if (type === "claim.recovered") return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, recoveredFromToken: lease.recoveredFromToken ?? "", expiresAt: lease.expiresAt };
346+
return { type, taskId: lease.taskId, workerId: lease.workerId, token: lease.token, expiresAt: lease.expiresAt };
347+
}
348+
267349
export function completeTask(graph: TaskGraph, request: { taskId: string; claimToken: string; evidenceRefs: string[]; now: string }): TaskGraph {
268350
const now = parseTimestamp(request.now);
269351
return updateTask(graph, request.taskId, (task) => {
270-
if (!task.claim || task.claim.token !== request.claimToken) throw new Error(`task ${task.id} requires a matching claim token`);
271-
if (parseTimestamp(task.claim.expiresAt) <= now) throw new Error(`task ${task.id} claim expired`);
352+
requireActiveClaim(task, request.claimToken, now);
272353
if (request.evidenceRefs.length === 0) throw new Error(`task ${task.id} requires evidence refs before completion`);
273354
return { ...task, status: "completed", evidenceRefs: [...request.evidenceRefs] };
274355
});
@@ -342,6 +423,12 @@ function getCompletedOrRepairedTaskIds(graph: TaskGraph): Set<string> {
342423
return completedOrRepaired;
343424
}
344425

426+
function requireActiveClaim(task: TaskNode, claimToken: string, now: number): TaskClaim {
427+
if (!task.claim || task.claim.token !== claimToken) throw new Error(`task ${task.id} requires a matching claim token`);
428+
if (parseTimestamp(task.claim.expiresAt) <= now) throw new Error(`task ${task.id} claim expired`);
429+
return task.claim;
430+
}
431+
345432
function updateTask(graph: TaskGraph, taskId: string, update: (task: TaskNode) => TaskNode): TaskGraph {
346433
let found = false;
347434
const tasks = graph.tasks.map((task) => {

test/task-graph.test.ts

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,17 @@ import {
66
computeBlockedTaskIds,
77
computeReadyTaskIds,
88
computeTaskReadiness,
9+
createClaimLease,
10+
createClaimLeaseRuntimeEvent,
11+
createClaimLeaseToken,
912
createTaskGraphRuntimeEvents,
1013
getStaleWorkerIds,
1114
markWorkerStale,
1215
markWorkerTerminal,
1316
projectTaskGraphStatus,
17+
recoverExpiredClaim,
18+
releaseClaim,
19+
renewClaimLease,
1420
topologicalTaskIds,
1521
transitionReadyTasks,
1622
validateTaskGraph,
@@ -65,6 +71,31 @@ test("task graph exposes topological order, readiness reasons, projection, and e
6571
]);
6672
});
6773

74+
test("claim leases generate tokens, reject non-ready tasks, renew, release, and record events", () => {
75+
const token = createClaimLeaseToken({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" });
76+
assert.equal(token, createClaimLeaseToken({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }));
77+
78+
const lease = createClaimLease({ taskId: "build", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" });
79+
assert.deepEqual(createClaimLeaseRuntimeEvent("claim.created", lease), { type: "claim.created", taskId: "build", workerId: "w1", token, expiresAt: "2026-01-01T00:10:00.000Z" });
80+
81+
const graph = { tasks: [{ id: "build", status: "ready" as const, dependencies: [] }] };
82+
const claimed = claimTask(graph, { taskId: "build", workerId: "w1", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" });
83+
assert.equal(claimed.tasks[0]?.status, "claimed");
84+
assert.equal(claimed.tasks[0]?.claim?.token, token);
85+
86+
const renewed = renewClaimLease(claimed, { taskId: "build", claimToken: token, now: "2026-01-01T00:05:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
87+
assert.equal(renewed.tasks[0]?.claim?.expiresAt, "2026-01-01T00:20:00.000Z");
88+
assert.deepEqual(createClaimLeaseRuntimeEvent("lease.renewed", { ...lease, expiresAt: "2026-01-01T00:20:00.000Z" }), { type: "lease.renewed", taskId: "build", workerId: "w1", token, expiresAt: "2026-01-01T00:20:00.000Z" });
89+
90+
const released = releaseClaim(renewed, { taskId: "build", claimToken: token, now: "2026-01-01T00:06:00.000Z" });
91+
assert.equal(released.tasks[0]?.status, "ready");
92+
assert.equal(released.tasks[0]?.claim, undefined);
93+
94+
for (const status of ["pending", "blocked", "completed"] as const) {
95+
assert.throws(() => claimTask({ tasks: [{ id: status, status, dependencies: [] }] }, { taskId: status, workerId: "w1", now: "2026-01-01T00:00:00.000Z", leaseExpiresAt: "2026-01-01T00:10:00.000Z" }), /not ready/);
96+
}
97+
});
98+
6899
test("claim manager rejects duplicate active ownership", () => {
69100
const graph = {
70101
tasks: [
@@ -86,7 +117,7 @@ test("claim manager rejects duplicate active ownership", () => {
86117
now: "2026-01-01T00:00:00.000Z",
87118
leaseExpiresAt: "2026-01-01T00:10:00.000Z",
88119
}),
89-
/active claim/,
120+
/already has a claim/,
90121
);
91122
});
92123

@@ -121,6 +152,45 @@ test("task completion requires matching unexpired claim and evidence refs", () =
121152
assert.deepEqual(completed.tasks[0]?.evidenceRefs, ["verify.md#pass"]);
122153
});
123154

155+
test("expired leases block normal completion and recover through explicit recovery", () => {
156+
const graph = {
157+
tasks: [
158+
{
159+
id: "build",
160+
status: "claimed" as const,
161+
dependencies: [],
162+
assignedWorker: "w1",
163+
attempts: 1,
164+
claim: { token: "old", workerId: "w1", claimedAt: "2026-01-01T00:00:00.000Z", expiresAt: "2026-01-01T00:10:00.000Z" },
165+
},
166+
],
167+
};
168+
169+
assert.throws(() => completeTask(graph, { taskId: "build", claimToken: "old", evidenceRefs: ["verify.md#pass"], now: "2026-01-01T00:11:00.000Z" }), /expired/);
170+
assert.deepEqual(createClaimLeaseRuntimeEvent("lease.expired", { ...graph.tasks[0]!.claim!, taskId: "build", claimedAt: "2026-01-01T00:00:00.000Z" }, "2026-01-01T00:11:00.000Z"), {
171+
type: "lease.expired",
172+
taskId: "build",
173+
workerId: "w1",
174+
token: "old",
175+
expiredAt: "2026-01-01T00:11:00.000Z",
176+
});
177+
178+
const recovered = recoverExpiredClaim(graph, { taskId: "build", workerId: "w2", token: "new", now: "2026-01-01T00:11:00.000Z", leaseExpiresAt: "2026-01-01T00:20:00.000Z" });
179+
assert.equal(recovered.tasks[0]?.assignedWorker, "w2");
180+
assert.equal(recovered.tasks[0]?.attempts, 2);
181+
assert.deepEqual(recovered.tasks[0]?.claim, { taskId: "build", workerId: "w2", token: "new", claimedAt: "2026-01-01T00:11:00.000Z", expiresAt: "2026-01-01T00:20:00.000Z", recoveredFromToken: "old" });
182+
assert.deepEqual(createClaimLeaseRuntimeEvent("claim.recovered", recovered.tasks[0]!.claim! as typeof recovered.tasks[0]["claim"] & { taskId: string; claimedAt: string }), {
183+
type: "claim.recovered",
184+
taskId: "build",
185+
workerId: "w2",
186+
token: "new",
187+
recoveredFromToken: "old",
188+
expiresAt: "2026-01-01T00:20:00.000Z",
189+
});
190+
assert.doesNotThrow(() => completeTask(recovered, { taskId: "build", claimToken: "new", evidenceRefs: ["verify.md#pass"], now: "2026-01-01T00:12:00.000Z" }));
191+
assert.throws(() => recoverExpiredClaim(recovered, { taskId: "build", workerId: "w3", now: "2026-01-01T00:12:00.000Z", leaseExpiresAt: "2026-01-01T00:30:00.000Z" }), /still active/);
192+
});
193+
124194
test("failed dependencies block downstream tasks unless a repair supersedes them", () => {
125195
const graph = {
126196
tasks: [

0 commit comments

Comments
 (0)