Skip to content

Commit 8fee177

Browse files
authored
Support streaming from client (#30)
* Support streaming from client Signed-off-by: Marcos Candeia <marrcooos@gmail.com> * Fixes typings Signed-off-by: Marcos Candeia <marrcooos@gmail.com> --------- Signed-off-by: Marcos Candeia <marrcooos@gmail.com>
1 parent 9f8ac32 commit 8fee177

File tree

5 files changed

+280
-20
lines changed

5 files changed

+280
-20
lines changed

src/actors/runtime.test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ class Hello {
1313
return "Hello, World!";
1414
}
1515

16+
chunkedEcho(): ChannelUpgrader<Uint8Array, Uint8Array> {
17+
return (async ({ recv, send }) => {
18+
for await (const chunk of recv()) {
19+
await send(chunk);
20+
}
21+
});
22+
}
23+
1624
chan(name: string): ChannelUpgrader<string, string> {
1725
return (async ({ send }) => {
1826
for (let i = 0; i < HELLO_COUNT; i++) {
@@ -109,6 +117,36 @@ Deno.test("counter tests", async () => {
109117
const counterActor = counterStub.id(actorId);
110118
using rpcActor = counterStub.id("12345").rpc();
111119

120+
const helloActorChunkedStub = actors.stub(Hello);
121+
const actor = helloActorChunkedStub.id("chunked-test");
122+
123+
// Create test data
124+
const testData = new Uint8Array([1, 2, 3, 4, 5]);
125+
126+
// Get the channel
127+
const ch = actor.chunkedEcho();
128+
ch.mode = "stream";
129+
130+
// Set up receiving
131+
const received: Uint8Array[] = [];
132+
const receivePromise = (async () => {
133+
for await (const chunk of ch.recv()) {
134+
received.push(chunk as Uint8Array);
135+
break;
136+
}
137+
await ch.close();
138+
})();
139+
140+
// Send the data
141+
await ch.send(testData);
142+
143+
// Wait for all data to be received
144+
await receivePromise;
145+
146+
// Verify the data
147+
assertEquals(received.length, 1);
148+
assertEquals(received[0], testData);
149+
112150
for (const actor of [rpcActor, counterActor]) {
113151
const name = `Counter Actor`;
114152
const ch = actor.chan(name);

src/actors/runtime.ts

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import {
2020
} from "./stubutil.ts";
2121
import { serializeUint8Array } from "./util/buffers.ts";
2222
import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts";
23+
import { isChunked, makeChunkedChannel } from "./util/channels/chunked.ts";
2324
import { getActorLocator } from "./util/locator.ts";
2425
import {
2526
type ServerSentEventMessage,
2627
ServerSentEventStream,
2728
} from "./util/sse.ts";
29+
2830
/**
2931
* Represents a fetcher for actors.
3032
*/
@@ -216,22 +218,34 @@ export class StdActorRuntime<TEnv extends object = object>
216218
methodName,
217219
args,
218220
metadata,
219-
false,
221+
undefined,
220222
req,
221223
);
222-
if (req.headers.get("upgrade") === "websocket" && isUpgrade(res)) {
223-
if (!this.websocketHandler) {
224-
return new Response("WebSockets are not supported", { status: 400 });
224+
if (isUpgrade(res)) {
225+
if (req.headers.get("upgrade") === "websocket") {
226+
if (!this.websocketHandler) {
227+
return new Response("WebSockets are not supported", {
228+
status: 400,
229+
});
230+
}
231+
const { socket, response } = await this.websocketHandler(req);
232+
const chunkSize = url.searchParams.get(ACTOR_MAX_CHUNK_SIZE_QS_NAME);
233+
makeWebSocket(
234+
socket,
235+
typeof chunkSize === "string" ? +chunkSize : undefined,
236+
).then((ch) => res(ch)).catch((err) => {
237+
console.error(`socket error`, err);
238+
}).finally(() => socket.close());
239+
return response;
240+
} else if (isChunked(req)) {
241+
const { ch, res: response } = await makeChunkedChannel(req);
242+
Promise.resolve(res(ch)).catch((err) => {
243+
console.error(`chunked channel error`, err);
244+
}).finally(() => ch.close());
245+
return response;
246+
} else {
247+
return new Response("upgrade not supported", { status: 400 });
225248
}
226-
const { socket, response } = await this.websocketHandler(req);
227-
const chunkSize = url.searchParams.get(ACTOR_MAX_CHUNK_SIZE_QS_NAME);
228-
makeWebSocket(
229-
socket,
230-
typeof chunkSize === "string" ? +chunkSize : undefined,
231-
).then((ch) => res(ch)).catch((err) => {
232-
console.error(`socket error`, err);
233-
}).finally(() => socket.close());
234-
return response;
235249
}
236250
if (isEventStreamResponse(res)) {
237251
req.signal.onabort = () => {

src/actors/silo.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import { ActorState } from "./state.ts";
55
import type { ActorStorage } from "./storage.ts";
66
import {
77
type ActorInvoker,
8+
type ConnectMode,
89
create,
910
createHttpInvoker,
1011
type EnrichMetadataFn,
1112
WELL_KNOWN_RPC_MEHTOD,
1213
} from "./stubutil.ts";
1314
import { isUpgrade, makeDuplexChannel } from "./util/channels/channel.ts";
15+
1416
// deno-lint-ignore no-explicit-any
1517
type FunctionType = (...args: any) => any;
1618
const isInvocable = (f: unknown): f is FunctionType => {
@@ -23,8 +25,10 @@ const KNOWN_METHODS: Record<string, symbol> = {
2325
};
2426

2527
const WELL_KNOWN_ENRICH_METADATA_METHOD = "enrichMetadata";
28+
2629
const isWellKnownEnrichMetadataMethod = (methodName: string) =>
2730
methodName === WELL_KNOWN_ENRICH_METADATA_METHOD;
31+
2832
const isWellKnownRPCMethod = (methodName: string) =>
2933
methodName === WELL_KNOWN_RPC_MEHTOD;
3034

@@ -80,7 +84,7 @@ export class ActorSilo<TEnv extends object = object> {
8084
methodName: string,
8185
args: unknown[],
8286
metadata: unknown,
83-
connect?: boolean,
87+
connect?: ConnectMode,
8488
req?: Request,
8589
) {
8690
const actorInstance = this.actors.get(actorName);

src/actors/stubutil.ts

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id";
2020
export const ACTOR_ID_QS_NAME = "deno_isolate_instance_id";
2121
export const ACTOR_CONSTRUCTOR_NAME_HEADER = "x-error-constructor-name";
2222

23+
export type ConnectMode = "websocket" | "stream";
2324
export type StubFactory<TInstance> = {
2425
id: StubFactoryFn<TInstance>;
2526
};
@@ -56,7 +57,7 @@ export interface ActorInvoker<
5657
method: string,
5758
methodArgs: unknown[],
5859
metadata: unknown,
59-
connect: true,
60+
connect: ConnectMode,
6061
): Promise<TChannel>;
6162
}
6263
export class ActorAwaiter<
@@ -67,6 +68,7 @@ export class ActorAwaiter<
6768
TResponse
6869
>,
6970
DuplexChannel<any, any> {
71+
mode: ConnectMode = "websocket";
7072
ch: Promise<TChannel> | null = null;
7173
ctrl: AbortController;
7274
_disconnected: PromiseWithResolvers<void> = Promise.withResolvers();
@@ -83,6 +85,9 @@ export class ActorAwaiter<
8385
this.close();
8486
}
8587
async close() {
88+
if (this.ch === null) {
89+
return;
90+
}
8691
const ch = await this.channel;
8792
await ch.close();
8893
this.ch = null;
@@ -122,7 +127,7 @@ export class ActorAwaiter<
122127
this.actorMethod,
123128
this.methodArgs,
124129
this.mMetadata,
125-
true,
130+
this.mode,
126131
);
127132
const nextConnection = async () => {
128133
const ch = await retry(connect, {
@@ -207,8 +212,15 @@ export type StubOptions<TInstance extends Actor> = ProxyOptions<TInstance>;
207212

208213
export type PromisifyKey<Actor, key extends keyof Actor> = Actor[key] extends
209214
(...args: infer Args) => Awaited<infer Return>
210-
? Return extends ChannelUpgrader<infer TSend, infer TReceive>
211-
? { (...args: Args): DuplexChannel<TReceive, TSend> }
215+
? Return extends ChannelUpgrader<infer TSend, infer TReceive> ? {
216+
(
217+
...args: Args
218+
):
219+
& DuplexChannel<TReceive, TSend>
220+
& (TReceive extends Uint8Array ? { mode?: ConnectMode }
221+
// deno-lint-ignore ban-types
222+
: {});
223+
}
212224
: { (...args: Args): Promise<Awaited<Return>> }
213225
: Actor[key];
214226

@@ -445,9 +457,15 @@ export const createHttpInvoker = <
445457
}
446458
const actorsServer = server ?? _server!;
447459
return {
448-
invoke: async (name, method, methodArgs, metadata, connect?: true) => {
460+
invoke: async (
461+
name,
462+
method,
463+
methodArgs,
464+
metadata,
465+
connect?: ConnectMode,
466+
) => {
449467
const endpoint = urlFor(actorsServer.url, name, method);
450-
if (connect) {
468+
if (connect === "websocket") {
451469
const url = new URL(`${endpoint}?args=${
452470
encodeURIComponent(
453471
btoa(
@@ -470,6 +488,97 @@ export const createHttpInvoker = <
470488
);
471489
return makeWebSocket(ws, options?.maxWsChunkSize) as Promise<TChannel>;
472490
}
491+
if (connect === "stream") {
492+
const url = new URL(`${endpoint}?args=${
493+
encodeURIComponent(
494+
btoa(
495+
JSON.stringify({
496+
args: methodArgs ?? [],
497+
metadata: metadata ?? {},
498+
}),
499+
),
500+
)
501+
}&${ACTOR_ID_QS_NAME}=${actorId}`);
502+
503+
const sendChan = makeChan<unknown>();
504+
const recvChan = makeChan<unknown>();
505+
506+
// Create streams for request body with an encoder
507+
const { readable: requestReadable, writable: requestWritable } =
508+
new TransformStream({
509+
transform(chunk, controller) {
510+
// Ensure we're sending Uint8Array
511+
if (chunk instanceof Uint8Array) {
512+
controller.enqueue(chunk);
513+
} else {
514+
controller.enqueue(
515+
new TextEncoder().encode(JSON.stringify(chunk)),
516+
);
517+
}
518+
},
519+
});
520+
521+
const requestWriter = requestWritable.getWriter();
522+
523+
const fetcher = options?.fetcher?.fetch ?? fetch;
524+
fetcher(url, {
525+
credentials: actorsServer?.credentials,
526+
method: "POST",
527+
headers: {
528+
"Content-Type": "application/octet-stream",
529+
"Transfer-Encoding": "chunked",
530+
[options?.actorIdHeaderName ?? ACTOR_ID_HEADER_NAME]: actorId,
531+
...actorsServer.deploymentId
532+
? { ["x-deno-deployment-id"]: actorsServer.deploymentId }
533+
: {},
534+
},
535+
body: requestReadable,
536+
}).then(async (response) => {
537+
if (!response.ok) {
538+
throw new Error(`HTTP error! status: ${response.status}`);
539+
}
540+
541+
const reader = response.body?.getReader();
542+
if (!reader) {
543+
throw new Error("Response body is null");
544+
}
545+
546+
let err: unknown | undefined = undefined;
547+
try {
548+
while (true) {
549+
const { done, value } = await reader.read();
550+
if (done) break;
551+
await recvChan.send(value);
552+
}
553+
} catch (e) {
554+
err = e;
555+
} finally {
556+
reader.releaseLock();
557+
recvChan.close(err);
558+
}
559+
}).catch((error) => {
560+
recvChan.close(error);
561+
});
562+
563+
// Handle sending data through the request body
564+
(async () => {
565+
let err: unknown | undefined = undefined;
566+
try {
567+
for await (const chunk of sendChan.recv()) {
568+
await requestWriter.write(chunk);
569+
}
570+
} catch (e) {
571+
err = e;
572+
} finally {
573+
await requestWriter.close();
574+
sendChan.close(err);
575+
}
576+
})();
577+
578+
// Create and return the duplex channel
579+
const channel = makeDuplexChannelWith(sendChan, recvChan);
580+
return channel as TChannel;
581+
}
473582
const abortCtrl = new AbortController();
474583
const fetcher = options?.fetcher?.fetch ?? fetch;
475584
const resp = await fetcher(

0 commit comments

Comments
 (0)