|
1 | | -import { Readable, type ReadableOptions } from 'readable-stream'; |
| 1 | +import { Readable, type ReadableOptions } from 'readable-stream' |
2 | 2 |
|
| 3 | +/** |
| 4 | + * Wrapper for ReadableStream that converts it into a readable-stream Readable. |
| 5 | + * The implementation loosely follows the example laid out by Node's internal implementation: |
| 6 | + * https://github.com/nodejs/node/blob/0b676736a0e9ab4939c195a516aa7e82fcd839aa/lib/internal/webstreams/adapters.js#L512 |
| 7 | + */ |
3 | 8 | class ReadableFromWeb<T> extends Readable { |
4 | | - private readonly reader: ReadableStreamDefaultReader<T>; |
5 | | - private readerClosed: boolean; |
| 9 | + private readonly reader: ReadableStreamDefaultReader<T> |
| 10 | + private readerClosed: boolean |
6 | 11 |
|
7 | | - public constructor(stream: ReadableStream<T>, options?: ReadableOptions) { |
8 | | - super(options); |
9 | | - this.reader = stream.getReader(); |
10 | | - this.readerClosed = false; |
11 | | - this.reader.closed.then(() => { |
12 | | - this.readerClosed = true; |
13 | | - }).catch((error: Error) => { |
14 | | - this.readerClosed = true; |
15 | | - this.destroy(error); |
16 | | - }); |
| 12 | + public constructor(stream: Readonly<ReadableStream<T>>, options?: Readonly<ReadableOptions>) { |
| 13 | + super(options) |
| 14 | + this.reader = stream.getReader() |
| 15 | + this.readerClosed = false |
| 16 | + this.reader.closed.catch((error: unknown) => { |
| 17 | + this.destroy(error) |
| 18 | + }).finally(() => { |
| 19 | + this.readerClosed = true |
| 20 | + }) |
17 | 21 | } |
18 | 22 |
|
19 | | - // eslint-disable-next-line ts/naming-convention |
20 | 23 | public _read(): void { |
21 | 24 | this.reader.read() |
22 | | - .then(chunk => this.push(chunk.done ? null : chunk.value)) |
23 | | - .catch((error: Error) => this.destroy(error)); |
| 25 | + .then((chunk: Readonly<ReadableStreamReadResult<T>>) => { |
| 26 | + if (chunk.done) { |
| 27 | + this.push(null) |
| 28 | + } else { |
| 29 | + this.push(chunk.value) |
| 30 | + } |
| 31 | + }) |
| 32 | + .catch((error: unknown) => { |
| 33 | + this.destroy(error) |
| 34 | + }) |
24 | 35 | } |
25 | 36 |
|
26 | | - public destroy(error?: Error): this { |
| 37 | + public destroy(error: unknown): this { |
| 38 | + let finalError: unknown = error |
27 | 39 | if (!this.readerClosed) { |
28 | | - this.reader.cancel(error).then().catch(() => { |
29 | | - // Ideally, the error from cancel should be handled here. |
30 | | - // However, an error thrown in cancel does not seem to reach this callback. |
31 | | - // Therefore, the error is simply not handled here. |
32 | | - }); |
| 40 | + this.reader.cancel(error).then().catch((cancelError: unknown) => { |
| 41 | + finalError = cancelError |
| 42 | + }) |
33 | 43 | } |
34 | | - return super.destroy(error); |
| 44 | + if (finalError instanceof Error) { |
| 45 | + return super.destroy(finalError) |
| 46 | + } |
| 47 | + return super.destroy() |
35 | 48 | } |
36 | 49 | } |
37 | 50 |
|
38 | | -function readableFromWeb<T>(stream: ReadableStream<T>, options?: ReadableOptions): Readable { |
39 | | - return new ReadableFromWeb<T>(stream, options); |
40 | | -} |
| 51 | +const readableFromWeb = <T>( |
| 52 | + stream: Readonly<ReadableStream<T>>, |
| 53 | + options?: Readonly<ReadableOptions> |
| 54 | +): Readable => new ReadableFromWeb<T>(stream, options) |
41 | 55 |
|
42 | | -export { ReadableFromWeb, readableFromWeb }; |
| 56 | +export { ReadableFromWeb, readableFromWeb } |
0 commit comments