-
Notifications
You must be signed in to change notification settings - Fork 52
Expand file tree
/
Copy pathapi.ts
More file actions
73 lines (61 loc) · 2.33 KB
/
api.ts
File metadata and controls
73 lines (61 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import * as colors from "@std/fmt/colors";
import {
type ServerSentEventMessage,
ServerSentEventStream,
} from "@std/http/server-sent-event-stream";
import { Hono } from "../../runtime/deps.ts";
import { start as startFS } from "../fs/api.ts";
import { start as startMeta } from "../meta.ts";
import { start as startWorker } from "../worker.ts";
import { channel, type DaemonEvent } from "./channel.ts";
export const createSSE = () => {
const app = new Hono();
app.get("/watch", (c) => {
const signal = c.req.raw.signal;
const done = Promise.withResolvers<void>();
const sinceParam = c.req.query("since");
const since = Number(sinceParam);
const normalizedSince = Number.isFinite(since) ? since : 0;
console.log("[SSE /watch] Query params:", { sinceParam, since, normalizedSince });
const enqueue = (
controller: ReadableStreamDefaultController<ServerSentEventMessage>,
event: DaemonEvent,
) =>
!signal.aborted && controller.enqueue({
data: encodeURIComponent(JSON.stringify(event)),
event: "message",
});
return new Response(
new ReadableStream<ServerSentEventMessage>({
async start(controller) {
console.log(colors.bold(`[sse]:`), "stream is", colors.green("open"));
const handler = (e: CustomEvent<DaemonEvent>) =>
enqueue(controller, e.detail);
// @ts-expect-error TS does not handle well this case
channel.addEventListener("broadcast", handler);
done.promise.then(() => {
// @ts-expect-error TS does not handle well this case
channel.removeEventListener("broadcast", handler);
});
for await (const event of startFS(normalizedSince)) {
if (signal.aborted) {
return;
}
enqueue(controller, event);
}
enqueue(controller, startWorker());
startMeta(normalizedSince)
.then((meta) => meta && enqueue(controller, meta))
.catch(console.error);
return done.promise;
},
cancel() {
done.resolve();
console.log(colors.bold(`[sse]:`), "stream is", colors.red("closed"));
},
}).pipeThrough(new ServerSentEventStream()),
{ headers: { "Content-Type": "text/event-stream" } },
);
});
return app;
};