Skip to content

Commit 6462dc9

Browse files
authored
fix(agent): keep idle event streams alive (#2032)
1 parent ba035d8 commit 6462dc9

2 files changed

Lines changed: 113 additions & 21 deletions

File tree

packages/agent/src/server/agent-server.test.ts

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
import { createTestRepo, type TestRepo } from "../test/fixtures/api";
1414
import { createPostHogHandlers } from "../test/mocks/msw-handlers";
1515
import type { TaskRun } from "../types";
16-
import { AgentServer } from "./agent-server";
16+
import { AgentServer, SSE_KEEPALIVE_INTERVAL_MS } from "./agent-server";
1717
import { type JwtPayload, SANDBOX_CONNECTION_AUDIENCE } from "./jwt";
1818

1919
interface TestableServer {
@@ -274,6 +274,64 @@ describe("AgentServer HTTP Mode", () => {
274274
expect(response.status).toBe(200);
275275
expect(response.headers.get("content-type")).toBe("text/event-stream");
276276
}, 20000);
277+
278+
it("emits transport keepalive comments while idle", async () => {
279+
const keepaliveCallback: { current: (() => void) | null } = {
280+
current: null,
281+
};
282+
const setIntervalSpy = vi
283+
.spyOn(globalThis, "setInterval")
284+
.mockImplementation(
285+
(callback: (_: undefined) => void, timeout?: number) => {
286+
if (timeout === SSE_KEEPALIVE_INTERVAL_MS) {
287+
keepaliveCallback.current = () => callback(undefined);
288+
}
289+
return setTimeout(() => undefined, 60_000);
290+
},
291+
);
292+
293+
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
294+
try {
295+
await createServer().start();
296+
const token = createToken();
297+
298+
const response = await fetch(`http://localhost:${port}/events`, {
299+
headers: { Authorization: `Bearer ${token}` },
300+
});
301+
302+
expect(response.status).toBe(200);
303+
expect(response.body).not.toBeNull();
304+
reader = response.body?.getReader() ?? null;
305+
expect(reader).not.toBeNull();
306+
if (!reader) {
307+
throw new Error("Expected SSE response body reader");
308+
}
309+
310+
await vi.waitFor(() =>
311+
expect(keepaliveCallback.current).not.toBeNull(),
312+
);
313+
const emitKeepalive = keepaliveCallback.current;
314+
if (!emitKeepalive) {
315+
throw new Error("Expected keepalive callback to be registered");
316+
}
317+
emitKeepalive();
318+
319+
const decoder = new TextDecoder();
320+
let streamText = "";
321+
for (let attempts = 0; attempts < 5; attempts++) {
322+
const { done, value } = await reader.read();
323+
if (done) break;
324+
streamText += decoder.decode(value, { stream: true });
325+
if (streamText.includes(": keepalive\n\n")) break;
326+
}
327+
328+
expect(streamText).toContain(": keepalive\n\n");
329+
expect(streamText).not.toContain('"type":"keepalive"');
330+
} finally {
331+
await reader?.cancel();
332+
setIntervalSpy.mockRestore();
333+
}
334+
}, 20000);
277335
});
278336

279337
describe("POST /command", () => {

packages/agent/src/server/agent-server.ts

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ const errorWithClassificationSchema = z.object({
7373

7474
type MessageCallback = (message: unknown) => void;
7575

76+
export const SSE_KEEPALIVE_INTERVAL_MS = 25_000;
77+
7678
class NdJsonTap {
7779
private decoder = new TextDecoder();
7880
private buffer = "";
@@ -329,41 +331,73 @@ export class AgentServer {
329331
);
330332
}
331333

334+
let keepaliveInterval: ReturnType<typeof setInterval> | null = null;
335+
const clearKeepalive = (): void => {
336+
if (keepaliveInterval) {
337+
clearInterval(keepaliveInterval);
338+
keepaliveInterval = null;
339+
}
340+
};
341+
332342
const stream = new ReadableStream({
333343
start: async (controller) => {
334-
const sseController: SseController = {
344+
let sseController: SseController | null = null;
345+
const encoder = new TextEncoder();
346+
const detachCurrentSseController = (): void => {
347+
if (sseController) {
348+
this.detachSseController(sseController);
349+
}
350+
};
351+
const enqueueSseFrame = (frame: string): void => {
352+
try {
353+
controller.enqueue(encoder.encode(frame));
354+
} catch {
355+
clearKeepalive();
356+
detachCurrentSseController();
357+
}
358+
};
359+
360+
sseController = {
335361
send: (data: unknown) => {
336-
try {
337-
controller.enqueue(
338-
new TextEncoder().encode(`data: ${JSON.stringify(data)}\n\n`),
339-
);
340-
} catch {
341-
this.detachSseController(sseController);
342-
}
362+
enqueueSseFrame(`data: ${JSON.stringify(data)}\n\n`);
343363
},
344364
close: () => {
345365
try {
366+
clearKeepalive();
346367
controller.close();
347368
} catch {
348-
this.detachSseController(sseController);
369+
detachCurrentSseController();
349370
}
350371
},
351372
};
352373

353-
if (!this.session || this.session.payload.run_id !== payload.run_id) {
354-
await this.initializeSession(payload, sseController);
355-
} else {
356-
this.session.sseController = sseController;
357-
this.session.hasDesktopConnected = true;
358-
this.replayPendingEvents();
359-
}
374+
keepaliveInterval = setInterval(() => {
375+
enqueueSseFrame(": keepalive\n\n");
376+
}, SSE_KEEPALIVE_INTERVAL_MS);
377+
378+
try {
379+
if (
380+
!this.session ||
381+
this.session.payload.run_id !== payload.run_id
382+
) {
383+
await this.initializeSession(payload, sseController);
384+
} else {
385+
this.session.sseController = sseController;
386+
this.session.hasDesktopConnected = true;
387+
this.replayPendingEvents();
388+
}
360389

361-
this.sendSseEvent(sseController, {
362-
type: "connected",
363-
run_id: payload.run_id,
364-
});
390+
this.sendSseEvent(sseController, {
391+
type: "connected",
392+
run_id: payload.run_id,
393+
});
394+
} catch (error) {
395+
clearKeepalive();
396+
throw error;
397+
}
365398
},
366399
cancel: () => {
400+
clearKeepalive();
367401
this.logger.debug("SSE connection closed");
368402
if (this.session?.sseController) {
369403
this.session.sseController = null;

0 commit comments

Comments
 (0)