Skip to content

Commit 74573c3

Browse files
committed
feat(cloudflare): support global publish via rpc
1 parent b6c3fa5 commit 74573c3

6 files changed

Lines changed: 50 additions & 12 deletions

File tree

docs/2.adapters/cloudflare.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ export class $DurableObject extends DurableObject {
5353
return ws.handleDurableMessage(this, client, message);
5454
}
5555

56+
webSocketPublish(topic, message, opts) {
57+
return ws.handleDurablePublish(this, topic, message, opts);
58+
}
59+
5660
webSocketClose(client, code, reason, wasClean) {
5761
return ws.handleDurableClose(this, client, code, reason, wasClean);
5862
}

src/adapters/cloudflare.ts

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type * as CF from "@cloudflare/workers-types";
22
import type { DurableObject } from "cloudflare:workers";
33
import type { AdapterOptions, AdapterInstance, Adapter } from "../adapter.ts";
44
import type * as web from "../../types/web.ts";
5+
import { env as cfGlobalEnv } from "cloudflare:workers";
56
import { toBufferLike } from "../utils.ts";
67
import { adapterUtils, getPeers } from "../adapter.ts";
78
import { AdapterHookable } from "../hooks.ts";
@@ -10,14 +11,15 @@ import { Peer, type PeerContext } from "../peer.ts";
1011
import { StubRequest } from "../_request.ts";
1112
import { WSError } from "../error.ts";
1213

14+
type WSDurableObjectStub = CF.DurableObjectStub & {
15+
webSocketPublish?: (topic: string, data: unknown, opts: any) => Promise<void>;
16+
};
17+
1318
type ResolveDurableStub = (
14-
req: CF.Request,
19+
req: CF.Request | undefined,
1520
env: unknown,
16-
context: CF.ExecutionContext,
17-
) =>
18-
| CF.DurableObjectStub
19-
| undefined
20-
| Promise<CF.DurableObjectStub | undefined>;
21+
context: CF.ExecutionContext | undefined,
22+
) => WSDurableObjectStub | undefined | Promise<WSDurableObjectStub | undefined>;
2123

2224
export interface CloudflareOptions extends AdapterOptions {
2325
/**
@@ -60,17 +62,21 @@ const cloudflareAdapter: Adapter<
6062

6163
const resolveDurableStub: ResolveDurableStub =
6264
opts.resolveDurableStub ||
63-
((_req, env: any, _context): CF.DurableObjectStub | undefined => {
65+
((_req, env: any, _context): WSDurableObjectStub | undefined => {
6466
const bindingName = opts.bindingName || "$DurableObject";
65-
const binding = env[bindingName] as CF.DurableObjectNamespace;
67+
const binding = (env || cfGlobalEnv)[
68+
bindingName
69+
] as CF.DurableObjectNamespace;
6670
if (binding) {
6771
const instanceId = binding.idFromName(opts.instanceName || "crossws");
6872
return binding.get(instanceId);
6973
}
7074
});
7175

76+
const { publish: durablePublish, ...utils } = adapterUtils(globalPeers);
77+
7278
return {
73-
...adapterUtils(globalPeers),
79+
...utils,
7480
handleUpgrade: async (request, cfEnv, cfCtx) => {
7581
// Upgrade request with Durable Object binding
7682
const stub = await resolveDurableStub(
@@ -179,6 +185,23 @@ const cloudflareAdapter: Adapter<
179185
const details = { code, reason, wasClean };
180186
await hooks.callHook("close", peer, details);
181187
},
188+
handleDurablePublish: async (_obj, topic, data, opts) => {
189+
return durablePublish(topic, data, opts);
190+
},
191+
publish: async (topic, data, opts) => {
192+
const stub = await resolveDurableStub(undefined, cfGlobalEnv, undefined);
193+
if (!stub) {
194+
throw new Error("[crossws] Durable Object binding cannot be resolved.");
195+
}
196+
// - Compatibility date >= 2024-04-03 or "rpc" feature flag is required
197+
// - We cannot check if webSocketPublish is exposed or not without RPC call
198+
try {
199+
return await stub.webSocketPublish!(topic, data, opts);
200+
} catch (error) {
201+
console.error(error);
202+
throw error;
203+
}
204+
},
182205
};
183206
};
184207

@@ -359,6 +382,13 @@ export interface CloudflareDurableAdapter extends AdapterInstance {
359382
message: ArrayBuffer | string,
360383
): Promise<void>;
361384

385+
handleDurablePublish: (
386+
obj: DurableObject,
387+
topic: string,
388+
data: unknown,
389+
opts: any,
390+
) => Promise<void>;
391+
362392
handleDurableClose(
363393
obj: DurableObject,
364394
ws: WebSocket | CF.WebSocket | web.WebSocket,

test/fixture/cloudflare-durable.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ export class $DurableObject extends DurableObject {
3636
return ws.handleDurableUpgrade(this, request);
3737
}
3838

39+
webSocketPublish(topic: string, message: unknown, opts: any) {
40+
return ws.handleDurablePublish(this, topic, message, opts);
41+
}
42+
3943
override async webSocketMessage(
4044
client: WebSocket,
4145
message: ArrayBuffer | string,

test/fixture/wrangler-durable.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# https://developers.cloudflare.com/workers/wrangler/configuration/
22

3-
compatibility_date = "2024-01-01"
3+
compatibility_date = "2024-04-03"
44
workers_dev = false
55

66
main = "cloudflare-durable.ts"

test/fixture/wrangler.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# https://developers.cloudflare.com/workers/wrangler/configuration/
22

3-
compatibility_date = "2024-01-01"
3+
compatibility_date = "2024-04-03"
44
workers_dev = false
55

66
main = "cloudflare.ts"

test/tests.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ export function wsTests(getURL: () => string, opts: WSTestOpts): void {
175175
expect(peers1).toMatchObject(peers2);
176176
});
177177

178-
test.skipIf(opts.adapter.startsWith("cloudflare"))(
178+
test.skipIf(opts.adapter === "cloudflare" /* durable only */)(
179179
"publish to all peers from adapter",
180180
async () => {
181181
const ws1 = await wsConnect(getURL(), { skip: 1 });

0 commit comments

Comments
 (0)