Skip to content

Commit 00367a0

Browse files
committed
fix(async): pooledMap surfaces errors thrown by the input iterable
The catch block in pooledMap's writer loop discarded the error that came out of `for await (const item of array)` and built an AggregateError populated solely from the executing transformations. When the iterable itself threw (and no transformation had rejected), that meant an empty `AggregateError.errors` — callers saw only the generic "Cannot complete the mapping" message with no way to recover the underlying cause (#6716). Bind the caught value as `iterError` and include it in `errors` if it isn't already there. Promise.race rejections from the executing pool are already surfaced via the existing allSettled walk, so the `includes` check prevents duplicates for the common transformation- rejection path. Adds a regression test covering the iterable-throws case and verifies the existing 'handles errors' test still produces exactly the expected pair of rejections. Fixes #6716
1 parent cdf74a8 commit 00367a0

2 files changed

Lines changed: 43 additions & 1 deletion

File tree

async/pool.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,20 @@ export function pooledMap<T, R>(
8989
// Wait until all ongoing events have processed, then close the writer.
9090
await Promise.all(executing);
9191
writer.close();
92-
} catch {
92+
} catch (iterError) {
9393
const errors = [];
9494
for (const result of await Promise.allSettled(executing)) {
9595
if (result.status === "rejected") {
9696
errors.push(result.reason);
9797
}
9898
}
99+
// The catch fires both when a transformation rejects and when the
100+
// input iterable itself throws. When it's a transformation rejection
101+
// the same reason is already in `executing`, but when the iterable
102+
// throws the original error would otherwise be swallowed, leaving
103+
// callers with an empty AggregateError (see #6716). Add it only if
104+
// it isn't already accounted for.
105+
if (!errors.includes(iterError)) errors.push(iterError);
99106
writer.write(Promise.reject(
100107
new AggregateError(errors, ERROR_WHILE_MAPPING_MESSAGE),
101108
)).catch(() => {});

async/pool_test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import { delay } from "./delay.ts";
33
import { pooledMap } from "./pool.ts";
44
import {
5+
assert,
56
assertEquals,
67
assertGreaterOrEqual,
78
assertLess,
@@ -78,6 +79,40 @@ Deno.test("pooledMap() returns ordered items", async () => {
7879
assertEquals(returned, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
7980
});
8081

82+
Deno.test(
83+
"pooledMap() surfaces errors thrown by the input iterable (#6716)",
84+
async () => {
85+
const sentinel = new Error("Iterator failed on first step!");
86+
async function* errorThrowing() {
87+
throw sentinel;
88+
yield 1;
89+
}
90+
const results = pooledMap(
91+
2,
92+
errorThrowing(),
93+
(i: number) => Promise.resolve(i),
94+
);
95+
let caught: unknown;
96+
try {
97+
for await (const _ of results) {
98+
// drain
99+
}
100+
} catch (e) {
101+
caught = e;
102+
}
103+
assert(caught instanceof AggregateError);
104+
const ag = caught as AggregateError;
105+
assertEquals(
106+
ag.message,
107+
"Cannot complete the mapping as an error was thrown from an item",
108+
);
109+
// The previous behavior left `errors` empty; the iterable's error was
110+
// swallowed. Now it must be present so callers can introspect it.
111+
assertEquals(ag.errors.length, 1);
112+
assertEquals(ag.errors[0], sentinel);
113+
},
114+
);
115+
81116
Deno.test("pooledMap() checks browser compat", async () => {
82117
// Simulates the environment where Symbol.asyncIterator is not available
83118
const asyncIterFunc = ReadableStream.prototype[Symbol.asyncIterator];

0 commit comments

Comments
 (0)