-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathasync-iterable.ts
231 lines (224 loc) · 8.5 KB
/
async-iterable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import {
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING,
SolanaError,
} from '@solana/errors';
import { AbortController } from '@solana/event-target-impl';
import { DataPublisher } from './data-publisher';
type Config = Readonly<{
/**
* Triggering this abort signal will cause all iterators spawned from this iterator to return
* once they have published all queued messages.
*/
abortSignal: AbortSignal;
/**
* Messages from this channel of `dataPublisher` will be the ones yielded through the iterators.
*
* Messages only begin to be queued after the first time an iterator begins to poll. Channel
* messages published before that time will be dropped.
*/
dataChannelName: string;
// FIXME: It would be nice to be able to constrain the type of `dataPublisher` to one that
// definitely supports the `dataChannelName` and `errorChannelName` channels, and
// furthermore publishes `TData` on the `dataChannelName` channel. This is more difficult
// than it should be: https://tsplay.dev/NlZelW
dataPublisher: DataPublisher;
/**
* Messages from this channel of `dataPublisher` will be the ones thrown through the iterators.
*
* Any new iterators created after the first error is encountered will reject with that error
* when polled.
*/
errorChannelName: string;
}>;
const enum PublishType {
DATA,
ERROR,
}
type IteratorKey = symbol;
type IteratorState<TData> =
| {
__hasPolled: false;
publishQueue: (
| {
__type: PublishType.DATA;
data: TData;
}
| {
__type: PublishType.ERROR;
err: unknown;
}
)[];
}
| {
__hasPolled: true;
onData: (data: TData) => void;
onError: Parameters<ConstructorParameters<typeof Promise>[0]>[1];
};
let EXPLICIT_ABORT_TOKEN: symbol;
function createExplicitAbortToken() {
// This function is an annoying workaround to prevent `process.env.NODE_ENV` from appearing at
// the top level of this module and thwarting an optimizing compiler's attempt to tree-shake.
return Symbol(
__DEV__
? "This symbol is thrown from a socket's iterator when the connection is explicitly " +
'aborted by the user'
: undefined,
);
}
const UNINITIALIZED = Symbol();
/**
* Returns an `AsyncIterable` given a data publisher.
*
* The iterable will produce iterators that vend messages published to `dataChannelName` and will
* throw the first time a message is published to `errorChannelName`. Triggering the abort signal
* will cause all iterators spawned from this iterator to return once they have published all queued
* messages.
*
* Things to note:
*
* - If a message is published over a channel before the `AsyncIterator` attached to it has polled
* for the next result, the message will be queued in memory.
* - Messages only begin to be queued after the first time an iterator begins to poll. Channel
* messages published before that time will be dropped.
* - If there are messages in the queue and an error occurs, all queued messages will be vended to
* the iterator before the error is thrown.
* - If there are messages in the queue and the abort signal fires, all queued messages will be
* vended to the iterator after which it will return.
* - Any new iterators created after the first error is encountered will reject with that error when
* polled.
*
* @param config
*
* @example
* ```ts
* const iterable = createAsyncIterableFromDataPublisher({
* abortSignal: AbortSignal.timeout(10_000),
* dataChannelName: 'message',
* dataPublisher,
* errorChannelName: 'error',
* });
* try {
* for await (const message of iterable) {
* console.log('Got message', message);
* }
* } catch (e) {
* console.error('An error was published to the error channel', e);
* } finally {
* console.log("It's been 10 seconds; that's enough for now.");
* }
* ```
*/
export function createAsyncIterableFromDataPublisher<TData>({
abortSignal,
dataChannelName,
dataPublisher,
errorChannelName,
}: Config): AsyncIterable<TData> {
const iteratorState: Map<IteratorKey, IteratorState<TData>> = new Map();
function publishErrorToAllIterators(reason: unknown) {
for (const [iteratorKey, state] of iteratorState.entries()) {
if (state.__hasPolled) {
iteratorState.delete(iteratorKey);
state.onError(reason);
} else {
state.publishQueue.push({
__type: PublishType.ERROR,
err: reason,
});
}
}
}
const abortController = new AbortController();
abortSignal.addEventListener('abort', () => {
abortController.abort();
publishErrorToAllIterators((EXPLICIT_ABORT_TOKEN ||= createExplicitAbortToken()));
});
const options = { signal: abortController.signal } as const;
let firstError: unknown = UNINITIALIZED;
dataPublisher.on(
errorChannelName,
err => {
if (firstError === UNINITIALIZED) {
firstError = err;
abortController.abort();
publishErrorToAllIterators(err);
}
},
options,
);
dataPublisher.on(
dataChannelName,
data => {
iteratorState.forEach((state, iteratorKey) => {
if (state.__hasPolled) {
const { onData } = state;
iteratorState.set(iteratorKey, { __hasPolled: false, publishQueue: [] });
onData(data as TData);
} else {
state.publishQueue.push({
__type: PublishType.DATA,
data: data as TData,
});
}
});
},
options,
);
return {
async *[Symbol.asyncIterator]() {
if (abortSignal.aborted) {
return;
}
if (firstError !== UNINITIALIZED) {
throw firstError;
}
const iteratorKey = Symbol();
iteratorState.set(iteratorKey, { __hasPolled: false, publishQueue: [] });
try {
while (true) {
const state = iteratorState.get(iteratorKey);
if (!state) {
// There should always be state by now.
throw new SolanaError(SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING);
}
if (state.__hasPolled) {
// You should never be able to poll twice in a row.
throw new SolanaError(
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
);
}
const publishQueue = state.publishQueue;
try {
if (publishQueue.length) {
state.publishQueue = [];
for (const item of publishQueue) {
if (item.__type === PublishType.DATA) {
yield item.data;
} else {
throw item.err;
}
}
} else {
yield await new Promise<TData>((resolve, reject) => {
iteratorState.set(iteratorKey, {
__hasPolled: true,
onData: resolve,
onError: reject,
});
});
}
} catch (e) {
if (e === (EXPLICIT_ABORT_TOKEN ||= createExplicitAbortToken())) {
return;
} else {
throw e;
}
}
}
} finally {
iteratorState.delete(iteratorKey);
}
},
};
}