Skip to content

Commit e4b1659

Browse files
authored
feat: add task graph readiness projection (#232)
Co-authored-by: devkade <devkade@users.noreply.github.com>
1 parent 31a5c82 commit e4b1659

3 files changed

Lines changed: 182 additions & 10 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Kapi does not own subagent orchestration. Use `pi-subagents` for agent delegatio
148148

149149
### Graph Execution Runtime Boundary
150150

151-
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, dependencies, readiness, claims, leases, and evidence expectations.
151+
The `graph-execution` preset treats `TaskGraph` as the concrete execute-phase runtime primitive for single-agent, sequential, DAG-parallel, and team-parallel work. Policy selection can provide advisory `PolicyGraphSketch` inputs, but the execute phase owns concrete task ids, titles/descriptions, dependencies, topological ordering, readiness reasons, blocked/downstream status projection, claims, leases, evidence expectations, and task-graph/readiness event records.
152152

153153
Phase presets serialize as a versioned `schemaVersion: 1` catalog. Legacy arrays migrate explicitly; unsupported versions, malformed catalogs, and gate evaluation with missing top-level required evidence refs or artifact ids fail closed.
154154

src/domain/task-graph.ts

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ export interface TaskClaim {
88

99
export interface TaskNode {
1010
id: string;
11+
title?: string;
12+
description?: string;
1113
status: TaskStatus;
1214
dependencies: string[];
1315
supersedes?: string[];
@@ -22,6 +24,36 @@ export interface TaskGraph {
2224
tasks: TaskNode[];
2325
}
2426

27+
export interface RuntimeTask extends TaskNode {
28+
title: string;
29+
description: string;
30+
}
31+
32+
export type TaskReadinessStatus = "ready" | "waiting" | "blocked" | "not_pending";
33+
34+
export interface TaskReadiness {
35+
taskId: string;
36+
status: TaskReadinessStatus;
37+
ready: boolean;
38+
waitingFor: string[];
39+
blockedBy: string[];
40+
reasons: string[];
41+
}
42+
43+
export interface TaskGraphStatusProjection {
44+
total: number;
45+
ready: string[];
46+
blocked: string[];
47+
waiting: string[];
48+
completed: string[];
49+
failed: string[];
50+
readiness: TaskReadiness[];
51+
}
52+
53+
export type TaskGraphRuntimeEvent =
54+
| { type: "task_graph.created"; taskIds: string[]; topologicalOrder: string[] }
55+
| { type: "task.ready"; taskId: string; reason: string };
56+
2557
export interface PolicyGraphSketch {
2658
id: string;
2759
purpose: string;
@@ -145,11 +177,64 @@ export function validateTaskGraph(graph: TaskGraph): TaskGraphValidationIssue[]
145177
}
146178

147179
export function computeReadyTaskIds(graph: TaskGraph): string[] {
180+
return computeTaskReadiness(graph).filter((item) => item.ready).map((item) => item.taskId);
181+
}
182+
183+
export function computeTaskReadiness(graph: TaskGraph): TaskReadiness[] {
184+
assertValidGraph(graph);
185+
const tasksById = new Map(graph.tasks.map((task) => [task.id, task]));
148186
const completedOrRepaired = getCompletedOrRepairedTaskIds(graph);
149-
const blocked = new Set(computeBlockedTaskIds(graph));
150-
return graph.tasks
151-
.filter((task) => task.status === "pending" && !blocked.has(task.id) && task.dependencies.every((dependency) => completedOrRepaired.has(dependency)))
152-
.map((task) => task.id);
187+
const failed = getFailedBlockingTaskIds(graph);
188+
const failedBlockersByTask = computeFailedBlockersByTask(graph, failed);
189+
return graph.tasks.map((task) => {
190+
if (task.status !== "pending") {
191+
return { taskId: task.id, status: "not_pending", ready: false, waitingFor: [], blockedBy: [], reasons: [`task is ${task.status}`] };
192+
}
193+
const blockedBy = failedBlockersByTask.get(task.id) ?? [];
194+
const waitingFor = task.dependencies.filter((dependency) => !completedOrRepaired.has(dependency) && !failed.has(dependency) && (failedBlockersByTask.get(dependency) ?? []).length === 0);
195+
if (blockedBy.length > 0) {
196+
return { taskId: task.id, status: "blocked", ready: false, waitingFor: [], blockedBy, reasons: blockedBy.map((dependency) => describeFailedBlocker(task, dependency, tasksById, failed)) };
197+
}
198+
if (waitingFor.length > 0) return { taskId: task.id, status: "waiting", ready: false, waitingFor, blockedBy, reasons: waitingFor.map((dependency) => `waiting for dependency ${dependency}`) };
199+
return { taskId: task.id, status: "ready", ready: true, waitingFor, blockedBy, reasons: [task.dependencies.length === 0 ? "no dependencies" : "all dependencies completed"] };
200+
});
201+
}
202+
203+
export function projectTaskGraphStatus(graph: TaskGraph): TaskGraphStatusProjection {
204+
const readiness = computeTaskReadiness(graph);
205+
const readinessById = new Map(readiness.map((item) => [item.taskId, item]));
206+
return {
207+
total: graph.tasks.length,
208+
ready: readiness.filter((item) => item.ready).map((item) => item.taskId),
209+
blocked: readiness.filter((item) => item.status === "blocked").map((item) => item.taskId),
210+
waiting: readiness.filter((item) => item.status === "waiting").map((item) => item.taskId),
211+
completed: graph.tasks.filter((task) => task.status === "completed").map((task) => task.id),
212+
failed: graph.tasks.filter((task) => task.status === "failed" && readinessById.get(task.id)?.status !== "blocked").map((task) => task.id),
213+
readiness,
214+
};
215+
}
216+
217+
export function topologicalTaskIds(graph: TaskGraph): string[] {
218+
assertValidGraph(graph);
219+
const remaining = new Map(graph.tasks.map((task) => [task.id, new Set(task.dependencies)]));
220+
const ordered: string[] = [];
221+
while (remaining.size > 0) {
222+
const next = Array.from(remaining.entries()).filter(([, dependencies]) => dependencies.size === 0).map(([taskId]) => taskId).sort();
223+
if (next.length === 0) throw new Error("invalid task graph: dependency_cycle");
224+
for (const taskId of next) {
225+
ordered.push(taskId);
226+
remaining.delete(taskId);
227+
for (const dependencies of Array.from(remaining.values())) dependencies.delete(taskId);
228+
}
229+
}
230+
return ordered;
231+
}
232+
233+
export function createTaskGraphRuntimeEvents(graph: TaskGraph): TaskGraphRuntimeEvent[] {
234+
return [
235+
{ type: "task_graph.created", taskIds: graph.tasks.map((task) => task.id), topologicalOrder: topologicalTaskIds(graph) },
236+
...computeTaskReadiness(graph).filter((item) => item.ready).map((item) => ({ type: "task.ready" as const, taskId: item.taskId, reason: item.reasons.join("; ") })),
237+
];
153238
}
154239

155240
export function transitionReadyTasks(graph: TaskGraph): TaskGraph {
@@ -159,9 +244,7 @@ export function transitionReadyTasks(graph: TaskGraph): TaskGraph {
159244
}
160245

161246
export function computeBlockedTaskIds(graph: TaskGraph): string[] {
162-
const completedOrRepaired = getCompletedOrRepairedTaskIds(graph);
163-
const failed = new Set(graph.tasks.filter((task) => task.status === "failed" && !completedOrRepaired.has(task.id)).map((task) => task.id));
164-
return graph.tasks.filter((task) => task.status === "pending" && task.dependencies.some((dependency) => failed.has(dependency))).map((task) => task.id);
247+
return computeTaskReadiness(graph).filter((item) => item.status === "blocked").map((item) => item.taskId);
165248
}
166249

167250
export function claimTask(graph: TaskGraph, request: { taskId: string; workerId: string; token: string; now: string; leaseExpiresAt: string }): TaskGraph {
@@ -211,6 +294,45 @@ function assertValidGraph(graph: TaskGraph): void {
211294
if (issues.length > 0) throw new Error(`invalid task graph: ${issues.map((issue) => issue.code).join(", ")}`);
212295
}
213296

297+
function computeFailedBlockersByTask(graph: TaskGraph, failed: Set<string>): Map<string, string[]> {
298+
const tasksById = new Map(graph.tasks.map((task) => [task.id, task]));
299+
const memo = new Map<string, string[]>();
300+
const collect = (taskId: string): string[] => {
301+
const cached = memo.get(taskId);
302+
if (cached) return cached;
303+
const task = tasksById.get(taskId);
304+
if (!task) return [];
305+
const blockers = new Set<string>();
306+
for (const dependency of task.dependencies) {
307+
if (failed.has(dependency)) blockers.add(dependency);
308+
for (const upstream of collect(dependency)) blockers.add(upstream);
309+
}
310+
const result = Array.from(blockers).sort();
311+
memo.set(taskId, result);
312+
return result;
313+
};
314+
for (const task of graph.tasks) collect(task.id);
315+
return memo;
316+
}
317+
318+
function describeFailedBlocker(task: TaskNode, failedTaskId: string, tasksById: Map<string, TaskNode>, failed: Set<string>): string {
319+
if (task.dependencies.includes(failedTaskId) && failed.has(failedTaskId)) return `dependency ${failedTaskId} failed`;
320+
const via = task.dependencies.find((dependency) => dependsOnFailedTask(dependency, failedTaskId, tasksById));
321+
return via ? `blocked by failed upstream dependency ${failedTaskId} via ${via}` : `blocked by failed upstream dependency ${failedTaskId}`;
322+
}
323+
324+
function dependsOnFailedTask(taskId: string, failedTaskId: string, tasksById: Map<string, TaskNode>): boolean {
325+
const task = tasksById.get(taskId);
326+
if (!task) return false;
327+
if (task.dependencies.includes(failedTaskId)) return true;
328+
return task.dependencies.some((dependency) => dependsOnFailedTask(dependency, failedTaskId, tasksById));
329+
}
330+
331+
function getFailedBlockingTaskIds(graph: TaskGraph): Set<string> {
332+
const completedOrRepaired = getCompletedOrRepairedTaskIds(graph);
333+
return new Set(graph.tasks.filter((task) => task.status === "failed" && !completedOrRepaired.has(task.id)).map((task) => task.id));
334+
}
335+
214336
function getCompletedOrRepairedTaskIds(graph: TaskGraph): Set<string> {
215337
const completedOrRepaired = new Set(graph.tasks.filter((task) => task.status === "completed").map((task) => task.id));
216338
for (const task of graph.tasks) {

test/task-graph.test.ts

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import {
55
completeTask,
66
computeBlockedTaskIds,
77
computeReadyTaskIds,
8+
computeTaskReadiness,
9+
createTaskGraphRuntimeEvents,
810
getStaleWorkerIds,
911
markWorkerStale,
1012
markWorkerTerminal,
13+
projectTaskGraphStatus,
14+
topologicalTaskIds,
1115
transitionReadyTasks,
1216
validateTaskGraph,
1317
} from "../src/domain/task-graph.js";
@@ -24,6 +28,43 @@ test("task graph readiness waits for completed dependencies", () => {
2428
assert.deepEqual(ready, ["build"]);
2529
});
2630

31+
test("task graph exposes topological order, readiness reasons, projection, and events", () => {
32+
const graph = {
33+
tasks: [
34+
{ id: "prepare", title: "Prepare", description: "Prepare inputs", status: "completed" as const, dependencies: [] },
35+
{ id: "branch-a", title: "A", description: "Parallel A", status: "pending" as const, dependencies: ["prepare"] },
36+
{ id: "branch-b", title: "B", description: "Parallel B", status: "pending" as const, dependencies: ["prepare"] },
37+
{ id: "join", title: "Join", description: "Join branches", status: "pending" as const, dependencies: ["branch-a", "branch-b"] },
38+
{ id: "seal", title: "Seal", description: "Seal run", status: "pending" as const, dependencies: ["join"] },
39+
],
40+
};
41+
42+
assert.deepEqual(topologicalTaskIds(graph), ["prepare", "branch-a", "branch-b", "join", "seal"]);
43+
assert.deepEqual(computeReadyTaskIds(graph), ["branch-a", "branch-b"]);
44+
assert.deepEqual(computeTaskReadiness(graph).find((item) => item.taskId === "join"), {
45+
taskId: "join",
46+
status: "waiting",
47+
ready: false,
48+
waitingFor: ["branch-a", "branch-b"],
49+
blockedBy: [],
50+
reasons: ["waiting for dependency branch-a", "waiting for dependency branch-b"],
51+
});
52+
assert.deepEqual(projectTaskGraphStatus(graph), {
53+
total: 5,
54+
ready: ["branch-a", "branch-b"],
55+
blocked: [],
56+
waiting: ["join", "seal"],
57+
completed: ["prepare"],
58+
failed: [],
59+
readiness: computeTaskReadiness(graph),
60+
});
61+
assert.deepEqual(createTaskGraphRuntimeEvents(graph), [
62+
{ type: "task_graph.created", taskIds: ["prepare", "branch-a", "branch-b", "join", "seal"], topologicalOrder: ["prepare", "branch-a", "branch-b", "join", "seal"] },
63+
{ type: "task.ready", taskId: "branch-a", reason: "all dependencies completed" },
64+
{ type: "task.ready", taskId: "branch-b", reason: "all dependencies completed" },
65+
]);
66+
});
67+
2768
test("claim manager rejects duplicate active ownership", () => {
2869
const graph = {
2970
tasks: [
@@ -86,13 +127,22 @@ test("failed dependencies block downstream tasks unless a repair supersedes them
86127
{ id: "build", status: "failed" as const, dependencies: [] },
87128
{ id: "repair-build", status: "completed" as const, dependencies: [], supersedes: ["build"] },
88129
{ id: "verify", status: "pending" as const, dependencies: ["build"] },
89-
{ id: "publish", status: "pending" as const, dependencies: ["missing-fix"] },
90130
{ id: "missing-fix", status: "failed" as const, dependencies: [] },
131+
{ id: "publish", status: "pending" as const, dependencies: ["missing-fix"] },
132+
{ id: "announce", status: "pending" as const, dependencies: ["publish"] },
91133
],
92134
};
93135

94136
assert.deepEqual(computeReadyTaskIds(graph), ["verify"]);
95-
assert.deepEqual(computeBlockedTaskIds(graph), ["publish"]);
137+
assert.deepEqual(computeBlockedTaskIds(graph), ["publish", "announce"]);
138+
assert.deepEqual(computeTaskReadiness(graph).find((item) => item.taskId === "announce"), {
139+
taskId: "announce",
140+
status: "blocked",
141+
ready: false,
142+
waitingFor: [],
143+
blockedBy: ["missing-fix"],
144+
reasons: ["blocked by failed upstream dependency missing-fix via publish"],
145+
});
96146
});
97147

98148
test("worker heartbeat stale detection and terminal retention are modeled", () => {

0 commit comments

Comments
 (0)