Skip to content

Commit

Permalink
fix(streams): handle Resource stream error
Browse files Browse the repository at this point in the history
  • Loading branch information
crowlKats committed Feb 5, 2025
1 parent b440d2d commit 17f3fb7
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 23 deletions.
68 changes: 45 additions & 23 deletions ext/web/06_streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -779,37 +779,35 @@ class ResourceStreamResourceSink {
* @param {any} sink
* @param {Uint8Array} chunk
*/
function readableStreamWriteChunkFn(reader, sink, chunk) {
async function readableStreamWriteChunkFn(reader, sink, chunk) {
// Empty chunk. Re-read.
if (chunk.length == 0) {
readableStreamReadFn(reader, sink);
await readableStreamReadFn(reader, sink);
return;
}

const res = op_readable_stream_resource_write_sync(sink.external, chunk);
if (res == 0) {
// Closed
reader.cancel("resource closed");
await reader.cancel("resource closed");
sink.close();
} else if (res == 1) {
// Successfully written (synchronous). Re-read.
readableStreamReadFn(reader, sink);
await readableStreamReadFn(reader, sink);
} else if (res == 2) {
// Full. If the channel is full, we perform an async await until we can write, and then return
// to a synchronous loop.
(async () => {
if (
await op_readable_stream_resource_write_buf(
sink.external,
chunk,
)
) {
readableStreamReadFn(reader, sink);
} else {
reader.cancel("resource closed");
sink.close();
}
})();
if (
await op_readable_stream_resource_write_buf(
sink.external,
chunk,
)
) {
await readableStreamReadFn(reader, sink);
} else {
await reader.cancel("resource closed");
sink.close();
}
}
}

Expand All @@ -822,17 +820,23 @@ function readableStreamReadFn(reader, sink) {
// real resource.
let reentrant = true;
let gotChunk = undefined;
const promise = new Deferred();
readableStreamDefaultReaderRead(reader, {
chunkSteps(chunk) {
// If the chunk has non-zero length, write it
if (reentrant) {
gotChunk = chunk;
} else {
readableStreamWriteChunkFn(reader, sink, chunk);
PromisePrototypeThen(
readableStreamWriteChunkFn(reader, sink, chunk),
() => promise.resolve(),
(e) => promise.reject(e),
);
}
},
closeSteps() {
sink.close();
promise.resolve();
},
errorSteps(error) {
const success = op_readable_stream_resource_write_error(
Expand All @@ -842,15 +846,29 @@ function readableStreamReadFn(reader, sink) {
// We don't cancel the reader if there was an error reading. We'll let the downstream
// consumer close the resource after it receives the error.
if (!success) {
reader.cancel("resource closed");
PromisePrototypeThen(
reader.cancel("resource closed"),
() => {
sink.close();
promise.resolve();
},
(e) => promise.reject(e),
);
} else {
sink.close();
promise.resolve();
}
sink.close();
},
});
reentrant = false;
if (gotChunk) {
readableStreamWriteChunkFn(reader, sink, gotChunk);
PromisePrototypeThen(
readableStreamWriteChunkFn(reader, sink, gotChunk),
() => promise.resolve(),
(e) => promise.reject(e),
);
}
return promise.promise;
}

/**
Expand All @@ -873,7 +891,9 @@ function resourceForReadableStream(stream, length) {
PromisePrototypeCatch(
PromisePrototypeThen(
op_readable_stream_resource_await_close(rid),
() => reader.cancel("resource closed"),
() => {
PromisePrototypeCatch(reader.cancel("resource closed"), () => {});
},
),
() => {},
);
Expand All @@ -884,7 +904,9 @@ function resourceForReadableStream(stream, length) {
);

// Trigger the first read
readableStreamReadFn(reader, sink);
PromisePrototypeCatch(readableStreamReadFn(reader, sink), (err) => {
PromisePrototypeCatch(reader.cancel(err), () => {});
});

return rid;
}
Expand Down
104 changes: 104 additions & 0 deletions tests/unit/streams_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,107 @@ Deno.test(function readableStreamFromWithStringThrows() {
"Failed to execute 'ReadableStream.from': Argument 1 can not be converted to async iterable.",
);
});

Deno.test(async function readableStreamFromWithStringThrows() {
const serverPort = 4592;
const upstreamServerPort = 4593;

const stopSignal = new AbortController();
const promise = Promise.withResolvers();
// Response transforming server that crashes with an uncaught AbortError.
function startServer() {
Deno.serve({ port: serverPort, signal: stopSignal.signal }, async (req) => {
const upstreamResponse = await fetch(
`http://localhost:${upstreamServerPort}`,
req,
);

// Use a TransformStream to convert the response body to uppercase.
const transformStream = new TransformStream({
transform(chunk, controller) {
const decoder = new TextDecoder();
const encoder = new TextEncoder();
const chunk2 = encoder.encode(decoder.decode(chunk).toUpperCase());
controller.enqueue(chunk2);
},
});

upstreamResponse.body?.pipeTo(transformStream.writable).catch(() => {});

return new Response(transformStream.readable);
});
}

// ==== THE ISSUE IS NOT IN THE CODE BELOW ====

// Upstream server that sends a response with a body that never ends.
// This is not where the error happens (it handlers the cancellation correctly).
function startUpstreamServer() {
Deno.serve({ port: upstreamServerPort, signal: stopSignal.signal }, (_) => {
// Create an infinite readable stream that emits 'a'
let pushTimeout: number | null = null;
const readableStream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const chunk = encoder.encode("a");

function push() {
controller.enqueue(chunk);
pushTimeout = setTimeout(push, 100);
}

push();
},

cancel(reason) {
assertEquals(reason, "resource closed");
promise.resolve(undefined);
clearTimeout(pushTimeout!);
},
});

return new Response(readableStream, {
headers: { "Content-Type": "text/plain" },
});
});
}

// The client is just there to simulate a client that cancels a request.
async function startClient() {
const controller = new AbortController();
const signal = controller.signal;

try {
const response = await fetch(`http://localhost:${serverPort}`, {
signal,
});
const reader = response.body?.getReader();
if (!reader) {
throw new Error("client: failed to get reader from response");
}

let received = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;

received += value.length;

if (received >= 5) {
controller.abort();
break;
}
}
} catch (_) {
//
}
}

startUpstreamServer();
startServer();
const p = startClient();

await promise.promise;
stopSignal.abort();
await p;
});

0 comments on commit 17f3fb7

Please sign in to comment.