forked from h3js/crossws
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeno.ts
More file actions
107 lines (94 loc) · 3.08 KB
/
Copy pathdeno.ts
File metadata and controls
107 lines (94 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import type { AdapterOptions, AdapterInstance, Adapter } from "../adapter.ts";
import { toBufferLike } from "../utils.ts";
import { adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts";
import { WSError } from "../error.ts";
import { Peer } from "../peer.ts";
// --- types ---
export interface DenoAdapter extends AdapterInstance {
handleUpgrade(req: Request, info: ServeHandlerInfo): Promise<Response>;
}
export interface DenoOptions extends AdapterOptions {}
type WebSocketUpgrade = Deno.WebSocketUpgrade;
type ServeHandlerInfo = {
remoteAddr?: { transport: string; hostname: string; port: number };
};
// --- adapter ---
// https://deno.land/api?s=WebSocket
// https://deno.land/api?s=Deno.upgradeWebSocket
// https://examples.deno.land/http-server-websocket
const denoAdapter: Adapter<DenoAdapter, DenoOptions> = (options = {}) => {
const hooks = new AdapterHookable(options);
const peers = new Set<DenoPeer>();
return {
...adapterUtils(peers),
handleUpgrade: async (request, info) => {
const { upgradeHeaders, endResponse, context } =
await hooks.upgrade(request);
if (endResponse) {
return endResponse;
}
// prettier-ignore
const headers = upgradeHeaders instanceof Headers ? upgradeHeaders : new Headers(upgradeHeaders);
const upgrade = Deno.upgradeWebSocket(request, {
// @ts-expect-error https://github.com/denoland/deno/pull/22242
headers,
protocol: headers.get("sec-websocket-protocol") ?? "",
});
const peer = new DenoPeer({
ws: upgrade.socket,
request,
peers,
denoInfo: info,
context,
});
peers.add(peer);
upgrade.socket.addEventListener("open", () => {
hooks.callHook("open", peer);
});
upgrade.socket.addEventListener("message", (event) => {
hooks.callHook("message", peer, new Message(event.data, peer, event));
});
upgrade.socket.addEventListener("close", () => {
peers.delete(peer);
hooks.callHook("close", peer, {});
});
upgrade.socket.addEventListener("error", (error) => {
peers.delete(peer);
hooks.callHook("error", peer, new WSError(error));
});
return upgrade.response;
},
};
};
export default denoAdapter;
// --- peer ---
class DenoPeer extends Peer<{
ws: WebSocketUpgrade["socket"];
request: Request;
peers: Set<DenoPeer>;
denoInfo: ServeHandlerInfo;
context: Peer["context"];
}> {
override get remoteAddress() {
return this._internal.denoInfo.remoteAddr?.hostname;
}
send(data: unknown) {
return this._internal.ws.send(toBufferLike(data));
}
publish(topic: string, data: unknown) {
const dataBuff = toBufferLike(data);
for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) {
peer._internal.ws.send(dataBuff);
}
}
}
close(code?: number, reason?: string) {
this._internal.ws.close(code, reason);
}
override terminate(): void {
(this._internal.ws as any).terminate();
}
}