-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconcurrent-map.ts
117 lines (100 loc) · 2.91 KB
/
concurrent-map.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
function resolvedConcurrency(concurrency?: number | undefined) {
if (concurrency === undefined) {
return navigator.hardwareConcurrency;
} else {
const c = Math.ceil(concurrency);
if (c < 1) {
throw new Error(`concurrency must be greater than 0; got ${c}`);
}
return Math.ceil(concurrency);
}
}
/**
* Map the sequence from one type to another, concurrently.
*
* Items are iterated in order.
*
* @param iterable An iterable collection.
* @param mapFn The mapping function.
* @returns An iterator of mapped values.
*/
export async function* concurrentMap<T, U>(
items: AsyncIterable<T>,
mapFn: (item: T) => Promise<U>,
concurrency?: number,
): AsyncIterableIterator<U> {
const c = resolvedConcurrency(concurrency);
const buffer: Promise<U>[] = [];
for await (const item of items) {
if (buffer.length >= c) {
yield await buffer.shift()!;
}
buffer.push(mapFn(item));
}
while (buffer.length > 0) {
yield await buffer.shift()!;
}
}
/**
* Map the sequence from one type to another, concurrently.
*
* Items are iterated out of order. This allows maximum concurrency
* at all times, but the output order cannot be assumed to be the
* same as the input order.
*
* @param iterable An iterable collection.
* @param mapFn The mapping function.
* @returns An iterator of mapped values.
*/
export async function* concurrentUnorderedMap<T, U>(
items: AsyncIterable<T>,
mapFn: (item: T) => Promise<U>,
concurrency?: number,
): AsyncIterableIterator<U> {
const c = resolvedConcurrency(concurrency);
/*
* Two queues. The same Esimorps are pushed into each in the same order.
* The shifts are done at different times - one at the backend (aft) when
* a promise is resolved, and one at the frontend (fore) when the result
* of that promise can be yielded.
*
* Kind of hard to see how it works, but it was an "Aha!" moment for me.
*/
const buffAft: Esimorp<U>[] = [];
const buffFore: Esimorp<U>[] = [];
for await (const item of items) {
if (buffFore.length >= c) {
yield await buffFore.shift()!.promise;
}
const p: Esimorp<U> = esimorp();
buffAft.push(p);
buffFore.push(p);
(async () => {
try {
const transItem = await mapFn(item);
buffAft.shift()!.resolve(transItem);
} catch (e) {
buffAft.shift()!.reject(e);
}
})();
}
while (buffFore.length > 0) {
yield await buffFore.shift()!.promise;
}
}
type Resolve<T> = (value: T) => void;
// deno-lint-ignore no-explicit-any
type Reject = (reason?: any) => void;
type Esimorp<T> = { promise: Promise<T>; resolve: Resolve<T>; reject: Reject };
/**
* An unresolved/unrejected promise turned inside-out.
*/
function esimorp<T>(): Esimorp<T> {
let rs: Resolve<T>;
let rj: Reject;
const p = new Promise<T>((resolve, reject) => {
rs = resolve;
rj = reject;
});
return { promise: p, resolve: rs!, reject: rj! };
}