Skip to content

Commit 4562299

Browse files
authored
Allow users to define onBeforeInvokeFunction (#35)
Signed-off-by: Marcos Candeia <[email protected]>
1 parent d8c4c31 commit 4562299

File tree

5 files changed

+165
-114
lines changed

5 files changed

+165
-114
lines changed

src/actors/mod.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,7 @@ export type {
1919
export { getActorLocator } from "./util/locator.ts";
2020

2121
export { Actor, RuntimeClass } from "./discover.ts";
22+
export type {
23+
InvokeMiddleware,
24+
InvokeMiddlewareOptions,
25+
} from "./stub/stub.server.ts";

src/actors/silo.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ export class ActorSilo<TEnv extends object = object> {
7878
connect?: true,
7979
req?: Request,
8080
) {
81-
return invoke(
82-
await this.instance(actorName),
83-
actorName,
81+
return invoke({
82+
instance: await this.instance(actorName),
83+
stubName: actorName,
8484
methodName,
8585
args,
8686
metadata,
8787
connect,
88-
req,
89-
);
88+
request: req,
89+
});
9090
}
9191
}

src/actors/stub/invoker.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,21 @@ const isWellKnownEnrichMetadataMethod = (methodName: string) =>
2626
const isWellKnownRPCMethod = (methodName: string) =>
2727
methodName === WELL_KNOWN_RPC_MEHTOD;
2828

29+
export interface InvokeOptions<TInstance extends object> {
30+
instance: TInstance;
31+
stubName: string;
32+
methodName: string;
33+
args: unknown[];
34+
metadata: unknown;
35+
connect?: true;
36+
request?: Request;
37+
}
2938
/**
3039
* Invoke a method on a stub instance.
3140
*/
3241
export const invoke = async <TInstance extends object>(
33-
instance: TInstance,
34-
stubName: string,
35-
methodName: string,
36-
args: unknown[],
37-
metadata: unknown,
38-
connect?: true,
39-
req?: Request,
42+
{ request: req, connect, instance, stubName, methodName, args, metadata }:
43+
InvokeOptions<TInstance>,
4044
) => {
4145
const method = KNOWN_METHODS[methodName] ?? methodName;
4246
metadata = WELL_KNOWN_ENRICH_METADATA_METHOD in instance && req
@@ -50,7 +54,15 @@ export const invoke = async <TInstance extends object>(
5054
const chan = rpc({
5155
// @ts-ignore: this is a hack to make the invoke method work
5256
invoke: (name, method, args, metadata, connect) =>
53-
invoke(instance, name, method, args, metadata, connect),
57+
invoke({
58+
instance,
59+
stubName: name,
60+
methodName: method,
61+
args,
62+
metadata,
63+
connect,
64+
request: req,
65+
}),
5466
}, metadata);
5567
return chan;
5668
}

src/actors/stub/stub.server.ts

Lines changed: 133 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
type ServerSentEventMessage,
1313
ServerSentEventStream,
1414
} from "../util/sse.ts";
15-
import { invoke } from "./invoker.ts";
15+
import { invoke, type InvokeOptions } from "./invoker.ts";
1616
import {
1717
STUB_CONSTRUCTOR_NAME_HEADER,
1818
STUB_MAX_CHUNK_SIZE_QS_NAME,
@@ -43,6 +43,125 @@ const upgradeWebSocket = (req: Request) => {
4343
}),
4444
};
4545
};
46+
47+
export interface InvokeMiddlewareOptions {
48+
args: unknown[];
49+
metadata: unknown;
50+
request: Request;
51+
}
52+
53+
export type InvokeMiddleware = (
54+
options: InvokeMiddlewareOptions,
55+
next: (options: InvokeMiddlewareOptions) => Promise<Response>,
56+
) => Promise<Response>;
57+
58+
const hasInvokeMiddleware = (
59+
instance: unknown,
60+
): instance is { onBeforeInvoke: InvokeMiddleware } => {
61+
return typeof instance === "object" && instance != null &&
62+
"onBeforeInvoke" in instance &&
63+
typeof instance.onBeforeInvoke === "function";
64+
};
65+
66+
const invokeResponse = async (
67+
url: URL,
68+
options: InvokeOptions<any> & { request: Request },
69+
): Promise<Response> => {
70+
try {
71+
const { request: req } = options;
72+
const res = await invoke(options);
73+
if (isUpgrade(res) && req.headers.get("upgrade") === "websocket") {
74+
const { socket, response } = upgradeWebSocket(req);
75+
const chunkSize = url.searchParams.get(STUB_MAX_CHUNK_SIZE_QS_NAME);
76+
makeWebSocket(
77+
socket,
78+
typeof chunkSize === "string" ? +chunkSize : undefined,
79+
).then((ch) => res(ch)).catch((err) => {
80+
console.error(`socket error`, err);
81+
}).finally(() => socket.close());
82+
return response;
83+
}
84+
85+
if (isEventStreamResponse(res)) {
86+
req.signal.onabort = () => {
87+
res?.return?.();
88+
};
89+
90+
return new Response(
91+
new ReadableStream<ServerSentEventMessage>({
92+
async pull(controller) {
93+
for await (const content of res) {
94+
controller.enqueue({
95+
data: encodeURIComponent(
96+
JSON.stringify(content, serializeUint8Array),
97+
),
98+
id: Date.now().toString(),
99+
event: "message",
100+
});
101+
}
102+
controller.close();
103+
},
104+
cancel() {
105+
res?.return?.();
106+
},
107+
}).pipeThrough(new ServerSentEventStream()),
108+
{
109+
headers: {
110+
"Content-Type": EVENT_STREAM_RESPONSE_HEADER,
111+
},
112+
},
113+
);
114+
}
115+
if (res instanceof ReadableStream) {
116+
return new Response(res, {
117+
headers: {
118+
"content-type": "application/octet-stream",
119+
},
120+
});
121+
}
122+
if (typeof res === "undefined" || res === null) {
123+
return new Response(null, { status: 204 });
124+
}
125+
if (res instanceof Uint8Array) {
126+
return new Response(res, {
127+
headers: {
128+
"content-type": "application/octet-stream",
129+
"content-length": `${res.length}`,
130+
},
131+
status: 200,
132+
});
133+
}
134+
if (res instanceof Response) {
135+
return res;
136+
}
137+
return Response.json(res);
138+
} catch (err) {
139+
if (err instanceof StubError) {
140+
return new Response(err.message, {
141+
status: {
142+
METHOD_NOT_FOUND: 404,
143+
METHOD_NOT_INVOCABLE: 405,
144+
NOT_FOUND: 404,
145+
}[err.code] ?? 500,
146+
});
147+
}
148+
const constructorName = err?.constructor?.name;
149+
if (constructorName) {
150+
const serializedError = JSON.stringify(
151+
err,
152+
Object.getOwnPropertyNames(err),
153+
);
154+
return new Response(serializedError, {
155+
status: 400,
156+
headers: {
157+
[STUB_CONSTRUCTOR_NAME_HEADER]: constructorName,
158+
"content-type": "application/json",
159+
},
160+
});
161+
}
162+
throw err;
163+
}
164+
};
46165
/**
47166
* Create a server for a stub.
48167
* @param instanceCreator - A function that creates an instance of the stub.
@@ -100,106 +219,20 @@ export const server = <T extends object>(
100219
args = parsedArgs.args;
101220
metadata = parsedArgs.metadata;
102221
}
103-
try {
104-
const res = await invoke(
105-
objInstance,
222+
const next = (mid: InvokeMiddlewareOptions) => {
223+
return invokeResponse(url, {
224+
instance: objInstance,
106225
stubName,
107-
stubMethod,
108-
args,
109-
metadata,
110-
undefined,
111-
req,
112-
);
113-
if (isUpgrade(res) && req.headers.get("upgrade") === "websocket") {
114-
const { socket, response } = upgradeWebSocket(req);
115-
const chunkSize = url.searchParams.get(STUB_MAX_CHUNK_SIZE_QS_NAME);
116-
makeWebSocket(
117-
socket,
118-
typeof chunkSize === "string" ? +chunkSize : undefined,
119-
).then((ch) => res(ch)).catch((err) => {
120-
console.error(`socket error`, err);
121-
}).finally(() => socket.close());
122-
return response;
123-
}
124-
125-
if (isEventStreamResponse(res)) {
126-
req.signal.onabort = () => {
127-
res?.return?.();
128-
};
129-
130-
return new Response(
131-
new ReadableStream<ServerSentEventMessage>({
132-
async pull(controller) {
133-
for await (const content of res) {
134-
controller.enqueue({
135-
data: encodeURIComponent(
136-
JSON.stringify(content, serializeUint8Array),
137-
),
138-
id: Date.now().toString(),
139-
event: "message",
140-
});
141-
}
142-
controller.close();
143-
},
144-
cancel() {
145-
res?.return?.();
146-
},
147-
}).pipeThrough(new ServerSentEventStream()),
148-
{
149-
headers: {
150-
"Content-Type": EVENT_STREAM_RESPONSE_HEADER,
151-
},
152-
},
153-
);
154-
}
155-
if (res instanceof ReadableStream) {
156-
return new Response(res, {
157-
headers: {
158-
"content-type": "application/octet-stream",
159-
},
160-
});
161-
}
162-
if (typeof res === "undefined" || res === null) {
163-
return new Response(null, { status: 204 });
164-
}
165-
if (res instanceof Uint8Array) {
166-
return new Response(res, {
167-
headers: {
168-
"content-type": "application/octet-stream",
169-
"content-length": `${res.length}`,
170-
},
171-
status: 200,
172-
});
173-
}
174-
if (res instanceof Response) {
175-
return res;
176-
}
177-
return Response.json(res);
178-
} catch (err) {
179-
if (err instanceof StubError) {
180-
return new Response(err.message, {
181-
status: {
182-
METHOD_NOT_FOUND: 404,
183-
METHOD_NOT_INVOCABLE: 405,
184-
NOT_FOUND: 404,
185-
}[err.code] ?? 500,
186-
});
187-
}
188-
const constructorName = err?.constructor?.name;
189-
if (constructorName) {
190-
const serializedError = JSON.stringify(
191-
err,
192-
Object.getOwnPropertyNames(err),
193-
);
194-
return new Response(serializedError, {
195-
status: 400,
196-
headers: {
197-
[STUB_CONSTRUCTOR_NAME_HEADER]: constructorName,
198-
"content-type": "application/json",
199-
},
200-
});
201-
}
202-
throw err;
226+
methodName: stubMethod,
227+
args: mid.args,
228+
metadata: mid.metadata,
229+
request: mid.request,
230+
});
231+
};
232+
const options = { args, metadata, request: req };
233+
if (hasInvokeMiddleware(objInstance)) {
234+
return objInstance.onBeforeInvoke(options, next);
203235
}
236+
return next(options);
204237
};
205238
};

src/actors/stub/stubutil.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,9 @@ export interface BaseMetadata {
231231
*/
232232
export type ActorProxy<Actor> =
233233
& {
234-
[key in keyof Omit<Actor, "enrichMetadata" | "metadata">]: PromisifyKey<
234+
[
235+
key in keyof Omit<Actor, "enrichMetadata" | "metadata" | "onBeforeInvoke">
236+
]: PromisifyKey<
235237
Actor,
236238
key
237239
>;

0 commit comments

Comments
 (0)