Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions src/actors/stubutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ export class ActorAwaiter<
[Symbol.dispose](): void {
this.close();
}
async close() {
async close(reason?: any) {
if (this.ch === null) {
return;
}
const ch = await this.channel;
await ch.close();
await ch.close(reason);
this.ch = null;
}
get signal() {
Expand All @@ -102,25 +102,13 @@ export class ActorAwaiter<
async *recv(signal?: AbortSignal): AsyncIterableIterator<any> {
const ch = await this.channel;
const it = ch.recv(signal);
const retn = it.return;
it.return = (val) => {
ch.close();
this.ch = null;
return retn?.call(it, val) ?? val;
};
yield* it;
}

private get channel(): Promise<TChannel> {
if (this.ch) {
return this.ch;
}
const sendChan = makeChan();
const recvChan = makeChan();
const reliableCh = makeDuplexChannelWith(sendChan, recvChan) as TChannel;
const ch = Promise.resolve<TChannel>(reliableCh);
this.ch = ch;

const connect = () =>
this.invoker.invoke(
this.actorName,
Expand All @@ -129,6 +117,15 @@ export class ActorAwaiter<
this.mMetadata,
this.mode,
);
if (this.mode === "stream") {
return this.ch = connect();
}
const sendChan = makeChan();
const recvChan = makeChan();
const reliableCh = makeDuplexChannelWith(sendChan, recvChan) as TChannel;
const ch = Promise.resolve<TChannel>(reliableCh);
this.ch = ch;

const nextConnection = async () => {
const ch = await retry(connect, {
initialDelay: 1e3, // one second of initial delay
Expand Down Expand Up @@ -552,6 +549,7 @@ export const createHttpInvoker = <
}
} catch (e) {
err = e;
await reader.cancel(e);
} finally {
reader.releaseLock();
recvChan.close(err);
Expand All @@ -569,8 +567,9 @@ export const createHttpInvoker = <
}
} catch (e) {
err = e;
await requestWriter.abort(err);
} finally {
await requestWriter.close();
!err && await requestWriter.close();
sendChan.close(err);
}
})();
Expand Down
8 changes: 4 additions & 4 deletions src/actors/util/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ export const makeChan = <T>(capacity = 0): Channel<T> => {
export interface DuplexChannel<TSend, TReceive> extends Disposable {
send: Channel<TSend>["send"];
recv: Channel<TReceive>["recv"];
close: () => void | Promise<void>;
close: (reason?: any) => void | Promise<void>;
closed: Promise<void>;
signal: AbortSignal;
disconnected?: Promise<void>; // used when the channel allows reconnections
Expand Down Expand Up @@ -440,9 +440,9 @@ export const makeDuplexChannelWith = <TSend, TReceive>(
signal: link(sendChan.signal, recvChan.signal),
send: sendChan.send.bind(sendChan),
recv: recvChan.recv.bind(recvChan),
close: () => {
sendChan.close();
recvChan.close();
close: (reason?: any) => {
sendChan.close(reason);
recvChan.close(reason);
},
[Symbol.dispose]: () => {
sendChan.close();
Expand Down
14 changes: 9 additions & 5 deletions src/actors/util/channels/chunked.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ export const makeChunkedChannel = (
const currentWriter = initializeResponse();
await currentWriter.write(message);
},
close: () => {
close: async (reason: any) => {
if (!responseInitiated) {
resolveResponse(new Response(null, { status: 204 }));
}
if (reason) {
await writer?.abort(reason).catch(() => {});
}
writer?.close().catch(() => {});
sendChan.close();
recvChan.close();
sendChan.close(reason);
recvChan.close(reason);
},
[Symbol.dispose]: () => {
if (!responseInitiated) {
Expand All @@ -155,6 +158,7 @@ export const makeChunkedChannel = (
(async () => {
if (!req.body) return;

let err: undefined | unknown = undefined;
const reader = req.body.getReader();
try {
while (true) {
Expand All @@ -163,13 +167,13 @@ export const makeChunkedChannel = (
await recvChan.send(value);
}
} catch (error) {
console.error("Error reading request body:", error);
err = error;
} finally {
reader.releaseLock();
if (!responseInitiated) {
resolveResponse(new Response(null, { status: 204 }));
}
channel.close();
channel.close(err);
}
})();

Expand Down
Loading