Skip to content

Commit c3cfe7f

Browse files
committed
track requests to specifically wait
1 parent dbd9a68 commit c3cfe7f

3 files changed

Lines changed: 244 additions & 27 deletions

File tree

lib/sse-server.ts

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
import {
2-
type Operation,
2+
all,
3+
call,
34
createQueue,
45
createScope,
56
ensure,
6-
race,
7+
type Operation,
78
resource,
9+
sleep,
810
spawn,
911
suspend,
1012
until,
1113
useAttributes,
1214
useScope,
1315
withResolvers,
14-
sleep,
1516
} from "effection";
1617
import type { Handle, Methods } from "./types.ts";
1718
import { validateUnsafe } from "./validate.ts";
1819

1920
import {
20-
type EventStreamMessage,
2121
createEventStream,
2222
defineEventHandler,
23+
type EventStreamMessage,
2324
H3,
2425
serve,
2526
serveStatic,
@@ -47,9 +48,10 @@ export function useSSEServer<M extends Methods>(
4748

4849
return resource(function* (provide) {
4950
yield* useAttributes({ name: "SSEServer", port });
50-
let [scope, destroy] = createScope(yield* useScope());
5151

5252
let app = new H3();
53+
let [requestScope, destroyRequestScope] = createScope();
54+
let activeRequests = new Set<ReturnType<typeof requestScope.run>>();
5355

5456
for (let name of methodNames) {
5557
app.all(`/${String(name)}`, async (event) => {
@@ -68,12 +70,21 @@ export function useSSEServer<M extends Methods>(
6870
function* drain() {
6971
let next = yield* queue.next();
7072
while (!next.done) {
71-
yield* until(stream.push(next.value));
73+
yield* until(stream.push([next.value]));
7274
next = yield* queue.next();
7375
}
7476
}
7577

76-
let requestTask = scope.run(function* () {
78+
let requestTask: ReturnType<typeof requestScope.run>;
79+
let requestStarted = withResolvers<void>();
80+
requestTask = requestScope.run(function* () {
81+
yield* requestStarted.operation;
82+
yield* spawn(function* () {
83+
while (true) {
84+
yield* sleep(1000);
85+
yield* until(stream.pushComment("keepalive"));
86+
}
87+
});
7788
yield* ensure(function* () {
7889
yield* until(stream.flush());
7990
yield* until(stream.close());
@@ -113,7 +124,8 @@ export function useSSEServer<M extends Methods>(
113124
// drop out of the loop and we won't be able to emit the final event.
114125
if (!req.signal.aborted) {
115126
let value = validateUnsafe(protocol.methods[name].returns, next.value);
116-
let data = JSON.stringify(value);
127+
// to avoid throws around JSON handling, let it throw in validation
128+
let data = value === undefined ? "undefined" : JSON.stringify(value);
117129
queue.add({
118130
event: "return",
119131
data,
@@ -136,34 +148,27 @@ export function useSSEServer<M extends Methods>(
136148
} finally {
137149
// `drain()` has finished either because the queue closed normally or
138150
// because the request was interrupted. Clean up the underlying
139-
// protocol subscription and ensure the stream is flushed/closed.
151+
// protocol subscription first, then allow any remaining queued
152+
// events to drain.
140153
yield* flush();
141154
queue.close(Nothing());
142-
try {
143-
yield* race([sleep(1000), drain()]);
144-
} catch {
145-
// ignore shutdown race failures in the last effort to drain
146-
// as H3 seems to abort the promise as some point and with
147-
// the sleep it can end up just getting killed
148-
}
149155
}
150156
});
157+
activeRequests.add(requestTask);
158+
requestTask.then(
159+
() => activeRequests.delete(requestTask),
160+
() => activeRequests.delete(requestTask),
161+
);
162+
163+
event.waitUntil?.(requestTask);
164+
165+
requestStarted.resolve();
151166

152167
requestTask.catch((error) => {
153168
if (isExpectedShutdownError(error)) return;
154169
console.error("requestTask failed unexpectedly:", error);
155170
});
156171

157-
// The transport has closed the SSE response, either because the client
158-
// disconnected or because the stream finished normally. Halt the request
159-
// task so any leftover work stops immediately.
160-
stream.onClosed(() =>
161-
requestTask.halt().catch((error) => {
162-
if (isExpectedShutdownError(error)) return;
163-
console.error("requestTask.halt() failed unexpectedly:", error);
164-
}),
165-
);
166-
167172
return stream.send();
168173
});
169174
}
@@ -206,7 +211,22 @@ export function useSSEServer<M extends Methods>(
206211
try {
207212
yield* provide(`http://localhost:${port}`);
208213
} finally {
209-
yield* destroy();
214+
if (activeRequests.size > 0) {
215+
yield* all(
216+
[...activeRequests].map((task) => {
217+
return call(function* () {
218+
try {
219+
yield* task;
220+
} catch (error) {
221+
if (!isExpectedShutdownError(error)) {
222+
throw error;
223+
}
224+
}
225+
});
226+
}),
227+
);
228+
}
229+
yield* destroyRequestScope();
210230
yield* until(server.close());
211231
}
212232
});

tests/implementation.test.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { scope as arktypeScope } from "arktype";
55
import type { Method } from "../lib/types.ts";
66

77
import { createImplementation, createProtocol } from "../lib/mod.ts";
8+
import { player } from "../lib/implementations/player.ts";
89

910
describe("createImplementation()", () => {
1011
it("attach yields a handle with protocol and methods and invoke calls the method", function* () {
@@ -51,4 +52,20 @@ describe("createImplementation()", () => {
5152
const next = yield* sub.next();
5253
expect(next).toEqual({ done: true, value: "hello" });
5354
});
55+
56+
it("play returns null", function* () {
57+
const [rootScope] = createScope();
58+
const handle = yield* player.attach(rootScope);
59+
60+
let result;
61+
let invocation = rootScope.run(function* () {
62+
const stream = handle.invoke({ name: "play", args: [] });
63+
const sub = yield* stream;
64+
result = yield* sub.next();
65+
});
66+
67+
yield* invocation;
68+
69+
expect(result).toEqual({ done: true, value: null });
70+
});
5471
});

tests/sse-server.test.ts

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import { describe, it } from "@effectionx/bdd";
2+
import { expect } from "expect";
3+
import { createServer } from "node:net";
4+
import type { Method, Handle, Inspector } from "../lib/types.ts";
5+
import { createProtocol } from "../lib/mod.ts";
6+
import { attach } from "../lib/attach.ts";
7+
import { createScope, call, suspend, useScope, withResolvers } from "effection";
8+
import { scope as arktypeScope } from "arktype";
9+
import { useSSEServer } from "../lib/sse-server.ts";
10+
11+
describe("useSSEServer()", () => {
12+
describe("generic echo protocol", () => {
13+
it("keeps an active SSE request alive long enough to flush during shutdown", function* () {
14+
const [serverScope, destroyServerScope] = createScope();
15+
const [clientScope, destroyClientScope] = createScope();
16+
17+
const schema = arktypeScope({
18+
NoneArr: "never[]",
19+
None: "never",
20+
Str: "string",
21+
}).export();
22+
23+
const protocol = createProtocol({
24+
echo: {
25+
args: schema.NoneArr,
26+
progress: schema.None,
27+
returns: schema.Str,
28+
},
29+
});
30+
31+
const requestStarted = withResolvers<void>();
32+
const continueRequest = withResolvers<void>();
33+
34+
const handle = {
35+
protocol,
36+
methods: {} as any,
37+
invoke() {
38+
return (function* () {
39+
return {
40+
*next() {
41+
requestStarted.resolve();
42+
yield* continueRequest.operation;
43+
return { done: true, value: "goodbye" };
44+
},
45+
};
46+
})();
47+
},
48+
} as unknown as Handle<{ echo: Method<never[], never, string> }>;
49+
50+
const addressResolver = withResolvers<string>();
51+
serverScope.run(function* () {
52+
const address = yield* useSSEServer(handle, { port: yield* call(getAvailablePort) });
53+
addressResolver.resolve(address);
54+
yield* suspend();
55+
});
56+
57+
const address = yield* addressResolver.operation;
58+
59+
const requestTask = clientScope.run(function* () {
60+
const response = yield* call(() =>
61+
fetch(`${address}/echo`, {
62+
method: "POST",
63+
headers: { "content-type": "application/json" },
64+
body: "[]",
65+
}),
66+
);
67+
68+
const text = yield* call(async () => {
69+
const decoder = new TextDecoder();
70+
const reader = response.body?.getReader();
71+
let result = "";
72+
73+
if (!reader) {
74+
throw new Error("response body is missing");
75+
}
76+
77+
while (true) {
78+
const { done, value } = await reader.read();
79+
if (done) break;
80+
result += decoder.decode(value, { stream: true });
81+
}
82+
83+
return result;
84+
});
85+
86+
return { status: response.status, text };
87+
});
88+
89+
yield* requestStarted.operation;
90+
continueRequest.resolve();
91+
yield* destroyServerScope();
92+
93+
const result = yield* requestTask;
94+
yield* destroyClientScope();
95+
96+
expect(result.status).toBe(200);
97+
expect(result.text).toContain("event: return");
98+
expect(result.text).toContain("goodbye");
99+
});
100+
});
101+
102+
describe("loader path integration", () => {
103+
it("uses attach() and useSSEServer() together and keeps the SSE request alive on shutdown", function* () {
104+
const schema = arktypeScope({
105+
NoneArr: "never[]",
106+
None: "never",
107+
Str: "string",
108+
}).export();
109+
110+
const protocol = createProtocol({
111+
echo: {
112+
args: schema.NoneArr,
113+
progress: schema.None,
114+
returns: schema.Str,
115+
},
116+
});
117+
118+
const handle: Handle<{ echo: Method<never[], never, string> }> = {
119+
protocol,
120+
methods: {
121+
echo: function* () {
122+
return {
123+
*next() {
124+
return { done: true, value: "goodbye" };
125+
},
126+
};
127+
},
128+
} as any,
129+
invoke({ name, args }) {
130+
return this.methods[name](...args);
131+
},
132+
};
133+
134+
const inspector: Inspector<{ echo: Method<never[], never, string> }> = {
135+
protocol,
136+
*attach(scope) {
137+
return handle;
138+
},
139+
};
140+
141+
const scope = yield* useScope();
142+
const addressResolver = withResolvers<string>();
143+
const detach = yield* attach(scope, inspector, function* (handle) {
144+
const address = yield* useSSEServer(handle, { port: yield* call(getAvailablePort) });
145+
addressResolver.resolve(address);
146+
});
147+
148+
const address = yield* addressResolver.operation;
149+
yield* detach();
150+
151+
expect(address).toMatch(/^http:\/\/localhost:\d+$/);
152+
});
153+
});
154+
});
155+
156+
function getAvailablePort(): Promise<number> {
157+
return new Promise((resolve, reject) => {
158+
const server = createServer();
159+
160+
server.once("error", (error) => {
161+
reject(error);
162+
});
163+
164+
server.listen(0, () => {
165+
const address = server.address();
166+
if (address && typeof address === "object") {
167+
const port = address.port;
168+
server.close((error) => {
169+
if (error) {
170+
reject(error);
171+
} else {
172+
resolve(port);
173+
}
174+
});
175+
} else {
176+
reject(new Error("unable to determine ephemeral port"));
177+
}
178+
});
179+
});
180+
}

0 commit comments

Comments
 (0)