Skip to content

Commit b24ade0

Browse files
committed
feat(tasks): wire agent server event ingest
1 parent 0c05cf5 commit b24ade0

4 files changed

Lines changed: 298 additions & 3 deletions

File tree

packages/agent/src/server/agent-server.test.ts

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,247 @@ describe("AgentServer HTTP Mode", () => {
203203
});
204204

205205
describe("turn completion", () => {
206+
function stubSessionCleanup(testServer: unknown): {
207+
cleanupSession: (options?: {
208+
completeEventStream?: boolean;
209+
}) => Promise<void>;
210+
eventStreamSender: {
211+
enqueue: ReturnType<typeof vi.fn>;
212+
stop: ReturnType<typeof vi.fn>;
213+
};
214+
} {
215+
const cleanupServer = testServer as {
216+
session: unknown;
217+
eventStreamSender: {
218+
enqueue: ReturnType<typeof vi.fn>;
219+
stop: ReturnType<typeof vi.fn>;
220+
};
221+
captureCheckpointState: ReturnType<typeof vi.fn>;
222+
cleanupSession: (options?: {
223+
completeEventStream?: boolean;
224+
}) => Promise<void>;
225+
};
226+
cleanupServer.captureCheckpointState = vi.fn(async () => {});
227+
cleanupServer.eventStreamSender = {
228+
enqueue: vi.fn(),
229+
stop: vi.fn(async () => {}),
230+
};
231+
cleanupServer.session = {
232+
payload: { run_id: "run-1" },
233+
pendingHandoffGitState: undefined,
234+
logWriter: { flush: vi.fn(async () => {}) },
235+
acpConnection: { cleanup: vi.fn(async () => {}) },
236+
sseController: { close: vi.fn() },
237+
};
238+
return cleanupServer;
239+
}
240+
241+
it("keeps event ingest open for non-terminal session cleanup", async () => {
242+
const testServer = stubSessionCleanup(createServer());
243+
244+
await testServer.cleanupSession();
245+
246+
expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled();
247+
expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled();
248+
});
249+
250+
it("stops event ingest for terminal session cleanup without fake task completion", async () => {
251+
const testServer = stubSessionCleanup(createServer());
252+
253+
await testServer.cleanupSession({ completeEventStream: true });
254+
255+
expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled();
256+
expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce();
257+
});
258+
259+
it("writes terminal failure status before completing event ingest", async () => {
260+
const order: string[] = [];
261+
const testServer = new AgentServer({
262+
port,
263+
jwtPublicKey: TEST_PUBLIC_KEY,
264+
repositoryPath: repo.path,
265+
apiUrl: "http://localhost:8000",
266+
apiKey: "test-api-key",
267+
projectId: 1,
268+
mode: "interactive",
269+
taskId: "test-task-id",
270+
runId: "test-run-id",
271+
}) as unknown as {
272+
eventStreamSender: {
273+
enqueue: (event: Record<string, unknown>) => void;
274+
stop: () => Promise<void>;
275+
};
276+
posthogAPI: {
277+
updateTaskRun: (
278+
taskId: string,
279+
runId: string,
280+
payload: Record<string, unknown>,
281+
) => Promise<unknown>;
282+
};
283+
signalTaskComplete(
284+
payload: JwtPayload,
285+
stopReason: string,
286+
errorMessage?: string,
287+
): Promise<void>;
288+
};
289+
testServer.eventStreamSender = {
290+
enqueue: vi.fn(() => {
291+
order.push("enqueue");
292+
}),
293+
stop: vi.fn(async () => {
294+
order.push("stop");
295+
}),
296+
};
297+
testServer.posthogAPI = {
298+
updateTaskRun: vi.fn(async () => {
299+
order.push("update");
300+
return {};
301+
}),
302+
};
303+
304+
await testServer.signalTaskComplete(
305+
{
306+
run_id: "run-1",
307+
task_id: "task-1",
308+
team_id: 1,
309+
user_id: 1,
310+
distinct_id: "distinct-id",
311+
mode: "interactive",
312+
},
313+
"error",
314+
"boom",
315+
);
316+
317+
expect(order).toEqual(["enqueue", "update", "stop"]);
318+
expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledWith(
319+
expect.objectContaining({
320+
type: "notification",
321+
notification: expect.objectContaining({
322+
method: "_posthog/error",
323+
params: expect.objectContaining({ error: "boom" }),
324+
}),
325+
}),
326+
);
327+
expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledWith(
328+
"task-1",
329+
"run-1",
330+
{
331+
status: "failed",
332+
error_message: "boom",
333+
},
334+
);
335+
});
336+
337+
it("still stops event ingest when terminal failure status update fails", async () => {
338+
const testServer = new AgentServer({
339+
port,
340+
jwtPublicKey: TEST_PUBLIC_KEY,
341+
repositoryPath: repo.path,
342+
apiUrl: "http://localhost:8000",
343+
apiKey: "test-api-key",
344+
projectId: 1,
345+
mode: "interactive",
346+
taskId: "test-task-id",
347+
runId: "test-run-id",
348+
}) as unknown as {
349+
eventStreamSender: {
350+
enqueue: (event: Record<string, unknown>) => void;
351+
stop: () => Promise<void>;
352+
};
353+
posthogAPI: {
354+
updateTaskRun: (
355+
taskId: string,
356+
runId: string,
357+
payload: Record<string, unknown>,
358+
) => Promise<unknown>;
359+
};
360+
signalTaskComplete(
361+
payload: JwtPayload,
362+
stopReason: string,
363+
errorMessage?: string,
364+
): Promise<void>;
365+
};
366+
testServer.eventStreamSender = {
367+
enqueue: vi.fn(),
368+
stop: vi.fn(async () => {}),
369+
};
370+
testServer.posthogAPI = {
371+
updateTaskRun: vi.fn(async () => {
372+
throw new Error("update failed");
373+
}),
374+
};
375+
376+
await testServer.signalTaskComplete(
377+
{
378+
run_id: "run-1",
379+
task_id: "task-1",
380+
team_id: 1,
381+
user_id: 1,
382+
distinct_id: "distinct-id",
383+
mode: "interactive",
384+
},
385+
"error",
386+
"boom",
387+
);
388+
389+
expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledOnce();
390+
expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledOnce();
391+
expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce();
392+
});
393+
394+
it("leaves event ingest open for non-error stop reasons", async () => {
395+
const testServer = new AgentServer({
396+
port,
397+
jwtPublicKey: TEST_PUBLIC_KEY,
398+
repositoryPath: repo.path,
399+
apiUrl: "http://localhost:8000",
400+
apiKey: "test-api-key",
401+
projectId: 1,
402+
mode: "interactive",
403+
taskId: "test-task-id",
404+
runId: "test-run-id",
405+
}) as unknown as {
406+
eventStreamSender: {
407+
enqueue: (event: Record<string, unknown>) => void;
408+
stop: () => Promise<void>;
409+
};
410+
posthogAPI: {
411+
updateTaskRun: (
412+
taskId: string,
413+
runId: string,
414+
payload: Record<string, unknown>,
415+
) => Promise<unknown>;
416+
};
417+
signalTaskComplete(
418+
payload: JwtPayload,
419+
stopReason: string,
420+
): Promise<void>;
421+
};
422+
testServer.eventStreamSender = {
423+
enqueue: vi.fn(),
424+
stop: vi.fn(async () => {}),
425+
};
426+
testServer.posthogAPI = {
427+
updateTaskRun: vi.fn(async () => ({})),
428+
};
429+
430+
await testServer.signalTaskComplete(
431+
{
432+
run_id: "run-1",
433+
task_id: "task-1",
434+
team_id: 1,
435+
user_id: 1,
436+
distinct_id: "distinct-id",
437+
mode: "interactive",
438+
},
439+
"end_turn",
440+
);
441+
442+
expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled();
443+
expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled();
444+
expect(testServer.posthogAPI.updateTaskRun).not.toHaveBeenCalled();
445+
});
446+
206447
it("persists structured turn completion notifications", () => {
207448
const appendRawLine = vi.fn();
208449
const testServer = new AgentServer({

packages/agent/src/server/agent-server.ts

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import {
5454
normalizeCloudPromptContent,
5555
promptBlocksToText,
5656
} from "./cloud-prompt";
57+
import { TaskRunEventStreamSender } from "./event-stream-sender";
5758
import { type JwtPayload, JwtValidationError, validateJwt } from "./jwt";
5859
import {
5960
handoffLocalGitStateSchema,
@@ -217,6 +218,7 @@ export class AgentServer {
217218
private session: ActiveSession | null = null;
218219
private app: Hono;
219220
private posthogAPI: PostHogAPIClient;
221+
private eventStreamSender: TaskRunEventStreamSender | null = null;
220222
private questionRelayedToSlack = false;
221223
private detectedPrUrl: string | null = null;
222224
private lastReportedBranch: string | null = null;
@@ -281,6 +283,16 @@ export class AgentServer {
281283
getApiKey: () => config.apiKey,
282284
userAgent: `posthog/cloud.hog.dev; version: ${config.version ?? packageJson.version}`,
283285
});
286+
if (config.eventIngestToken) {
287+
this.eventStreamSender = new TaskRunEventStreamSender({
288+
apiUrl: config.apiUrl,
289+
projectId: config.projectId,
290+
taskId: config.taskId,
291+
runId: config.runId,
292+
token: config.eventIngestToken,
293+
logger: this.logger.child("EventIngest"),
294+
});
295+
}
284296
this.app = this.createApp();
285297
}
286298

@@ -544,7 +556,9 @@ export class AgentServer {
544556
this.logger.debug("Stopping agent server...");
545557

546558
if (this.session) {
547-
await this.cleanupSession();
559+
await this.cleanupSession({ completeEventStream: true });
560+
} else {
561+
await this.eventStreamSender?.stop();
548562
}
549563

550564
if (this.server) {
@@ -1772,6 +1786,12 @@ ${attributionInstructions}
17721786

17731787
const status = "failed";
17741788

1789+
this.enqueueTaskTerminalEvent(POSTHOG_NOTIFICATIONS.ERROR, {
1790+
source: "agent_server",
1791+
stopReason,
1792+
error: errorMessage ?? "Agent error",
1793+
});
1794+
17751795
try {
17761796
await this.posthogAPI.updateTaskRun(payload.task_id, payload.run_id, {
17771797
status,
@@ -1780,9 +1800,28 @@ ${attributionInstructions}
17801800
this.logger.debug("Task completion signaled", { status, stopReason });
17811801
} catch (error) {
17821802
this.logger.error("Failed to signal task completion", error);
1803+
} finally {
1804+
await this.eventStreamSender?.stop();
17831805
}
17841806
}
17851807

1808+
private enqueueTaskTerminalEvent(
1809+
method:
1810+
| typeof POSTHOG_NOTIFICATIONS.TASK_COMPLETE
1811+
| typeof POSTHOG_NOTIFICATIONS.ERROR,
1812+
params: Record<string, unknown>,
1813+
): void {
1814+
this.eventStreamSender?.enqueue({
1815+
type: "notification",
1816+
timestamp: new Date().toISOString(),
1817+
notification: {
1818+
jsonrpc: "2.0",
1819+
method,
1820+
params,
1821+
},
1822+
});
1823+
}
1824+
17861825
private configureEnvironment({
17871826
isInternal = false,
17881827
}: {
@@ -2180,7 +2219,11 @@ ${attributionInstructions}
21802219
}
21812220
}
21822221

2183-
private async cleanupSession(): Promise<void> {
2222+
private async cleanupSession({
2223+
completeEventStream = false,
2224+
}: {
2225+
completeEventStream?: boolean;
2226+
} = {}): Promise<void> {
21842227
if (!this.session) return;
21852228

21862229
this.logger.debug("Cleaning up session");
@@ -2219,6 +2262,10 @@ ${attributionInstructions}
22192262
this.session.sseController.close();
22202263
}
22212264

2265+
if (completeEventStream) {
2266+
await this.eventStreamSender?.stop();
2267+
}
2268+
22222269
this.pendingEvents = [];
22232270
this.lastReportedBranch = null;
22242271
this.session = null;
@@ -2302,9 +2349,13 @@ ${attributionInstructions}
23022349
}
23032350

23042351
private broadcastEvent(event: Record<string, unknown>): void {
2352+
if (!this.session) return;
2353+
2354+
this.eventStreamSender?.enqueue(event);
2355+
23052356
if (this.session?.sseController) {
23062357
this.sendSseEvent(this.session.sseController, event);
2307-
} else if (this.session) {
2358+
} else {
23082359
// Buffer events during initialization (sseController not yet attached)
23092360
this.pendingEvents.push(event);
23102361
}

packages/agent/src/server/bin.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const envSchema = z.object({
3232
POSTHOG_CODE_REASONING_EFFORT: z
3333
.enum(["low", "medium", "high", "xhigh", "max"])
3434
.optional(),
35+
POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(),
3536
});
3637

3738
const program = new Command();
@@ -148,6 +149,7 @@ program
148149
const server = new AgentServer({
149150
port: parseInt(options.port, 10),
150151
jwtPublicKey: env.JWT_PUBLIC_KEY,
152+
eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN,
151153
repositoryPath: options.repositoryPath,
152154
apiUrl: env.POSTHOG_API_URL,
153155
apiKey: env.POSTHOG_PERSONAL_API_KEY,

packages/agent/src/server/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export interface AgentServerConfig {
1515
apiKey: string;
1616
projectId: number;
1717
jwtPublicKey: string; // RS256 public key for JWT verification
18+
eventIngestToken?: string;
1819
mode: AgentMode;
1920
taskId: string;
2021
runId: string;

0 commit comments

Comments
 (0)