-
Notifications
You must be signed in to change notification settings - Fork 162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release reader immediately when shutting down a pipe #1208
base: main
Are you sure you want to change the base?
Changes from all commits
eb22e7e
15a9768
7008ee5
7b446d5
36a9ad9
256e70f
0eb6177
8cfaed4
01a4f49
3ee5c8f
a0364b9
0c23857
e333293
285bfb7
817293c
124582a
f6d8b73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,8 @@ const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js') | |
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); | ||
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, | ||
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, | ||
WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = require('./writable-streams.js'); | ||
WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, writerAddStateChangeListener } = | ||
require('./writable-streams.js'); | ||
const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); | ||
|
||
const ReadableByteStreamController = require('../../generated/ReadableByteStreamController.js'); | ||
|
@@ -134,7 +135,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
assert(IsReadableStreamLocked(source) === false); | ||
assert(IsWritableStreamLocked(dest) === false); | ||
|
||
const reader = AcquireReadableStreamDefaultReader(source); | ||
let reader = AcquireReadableStreamDefaultReader(source); | ||
const writer = AcquireWritableStreamDefaultWriter(dest); | ||
|
||
source._disturbed = true; | ||
|
@@ -200,6 +201,12 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
} | ||
|
||
return transformPromiseWith(writer._readyPromise, () => { | ||
if (shuttingDown === true) { | ||
return promiseResolvedWith(true); | ||
} | ||
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) { | ||
return promiseResolvedWith(true); | ||
} | ||
Comment on lines
+207
to
+209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implements @domenic's suggestion from #1207 (comment). I don't know if we need to update the spec text for this. It already specifies that these checks must happen before performing any reads and writes:
We should still add a test for this particular case (although that might not be easy looking at the discussion in #1207). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally don't think we need to update the spec text. |
||
return new Promise((resolveRead, rejectRead) => { | ||
ReadableStreamDefaultReaderRead( | ||
reader, | ||
|
@@ -219,31 +226,50 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
} | ||
|
||
// Errors must be propagated forward | ||
isOrBecomesErrored(source, reader._closedPromise, storedError => { | ||
function sourceIsOrBecomesErrored() { | ||
const storedError = source._storedError; | ||
if (preventAbort === false) { | ||
shutdownWithAction(() => WritableStreamAbort(dest, storedError), true, storedError); | ||
} else { | ||
shutdown(true, storedError); | ||
} | ||
}); | ||
} | ||
|
||
// Errors must be propagated backward | ||
isOrBecomesErrored(dest, writer._closedPromise, storedError => { | ||
function destIsOrBecomesErroringOrErrored() { | ||
const storedError = dest._storedError; | ||
if (preventCancel === false) { | ||
shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); | ||
} else { | ||
shutdown(true, storedError); | ||
} | ||
}); | ||
} | ||
|
||
// Closing must be propagated forward | ||
isOrBecomesClosed(source, reader._closedPromise, () => { | ||
function sourceIsOrBecomesClosed() { | ||
if (preventClose === false) { | ||
shutdownWithAction(() => WritableStreamDefaultWriterCloseWithErrorPropagation(writer)); | ||
} else { | ||
shutdown(); | ||
} | ||
}); | ||
} | ||
|
||
function checkState() { | ||
const sourceState = source._state; | ||
const destState = dest._state; | ||
if (sourceState === 'errored') { | ||
// Errors must be propagated forward | ||
sourceIsOrBecomesErrored(); | ||
} else if (destState === 'erroring' || destState === 'errored') { | ||
// Errors must be propagated backward | ||
destIsOrBecomesErroringOrErrored(); | ||
} else if (sourceState === 'closed') { | ||
// Closing must be propagated forward | ||
sourceIsOrBecomesClosed(); | ||
} | ||
} | ||
|
||
checkState(); | ||
|
||
// Closing must be propagated backward | ||
if (WritableStreamCloseQueuedOrInFlight(dest) === true || dest._state === 'closed') { | ||
|
@@ -256,7 +282,13 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
} | ||
} | ||
|
||
setPromiseIsHandledToTrue(pipeLoop()); | ||
if (!shuttingDown) { | ||
assert(source._state === 'readable' && dest._state === 'writable'); | ||
readerAddStateChangeListener(reader, checkState); | ||
writerAddStateChangeListener(writer, checkState); | ||
|
||
setPromiseIsHandledToTrue(pipeLoop()); | ||
} | ||
|
||
function waitForWritesToFinish() { | ||
// Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait | ||
|
@@ -268,27 +300,13 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
); | ||
} | ||
|
||
function isOrBecomesErrored(stream, promise, action) { | ||
if (stream._state === 'errored') { | ||
action(stream._storedError); | ||
} else { | ||
uponRejection(promise, action); | ||
} | ||
} | ||
|
||
function isOrBecomesClosed(stream, promise, action) { | ||
if (stream._state === 'closed') { | ||
action(); | ||
} else { | ||
uponFulfillment(promise, action); | ||
} | ||
} | ||
|
||
function shutdownWithAction(action, originalIsError, originalError) { | ||
if (shuttingDown === true) { | ||
return; | ||
} | ||
shuttingDown = true; | ||
ReadableStreamDefaultReaderRelease(reader); | ||
reader = AcquireReadableStreamDefaultReader(source); | ||
|
||
if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { | ||
uponFulfillment(waitForWritesToFinish(), doTheRest); | ||
|
@@ -310,6 +328,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
return; | ||
} | ||
shuttingDown = true; | ||
ReadableStreamDefaultReaderRelease(reader); | ||
reader = AcquireReadableStreamDefaultReader(source); | ||
|
||
if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { | ||
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error)); | ||
|
@@ -319,6 +339,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
} | ||
|
||
function finalize(isError, error) { | ||
assert(ReadableStreamDefaultReader.isImpl(reader)); | ||
WritableStreamDefaultWriterRelease(writer); | ||
ReadableStreamDefaultReaderRelease(reader); | ||
|
||
|
@@ -770,6 +791,7 @@ function ReadableStreamClose(stream) { | |
} | ||
|
||
resolvePromise(reader._closedPromise, undefined); | ||
readerRunStateChangeListeners(reader); | ||
|
||
if (ReadableStreamDefaultReader.isImpl(reader)) { | ||
const readRequests = reader._readRequests; | ||
|
@@ -794,6 +816,7 @@ function ReadableStreamError(stream, e) { | |
|
||
rejectPromise(reader._closedPromise, e); | ||
setPromiseIsHandledToTrue(reader._closedPromise); | ||
readerRunStateChangeListeners(reader); | ||
|
||
if (ReadableStreamDefaultReader.isImpl(reader)) { | ||
ReadableStreamDefaultReaderErrorReadRequests(reader, e); | ||
|
@@ -877,6 +900,8 @@ function ReadableStreamReaderGenericInitialize(reader, stream) { | |
reader._stream = stream; | ||
stream._reader = reader; | ||
|
||
reader._stateChangeListeners = []; | ||
|
||
if (stream._state === 'readable') { | ||
reader._closedPromise = newPromise(); | ||
} else if (stream._state === 'closed') { | ||
|
@@ -910,6 +935,22 @@ function ReadableStreamReaderGenericRelease(reader) { | |
|
||
stream._reader = undefined; | ||
reader._stream = undefined; | ||
|
||
reader._stateChangeListeners = []; | ||
} | ||
|
||
function readerAddStateChangeListener(reader, stateChangeListener) { | ||
const stream = reader._stream; | ||
assert(stream !== undefined); | ||
reader._stateChangeListeners.push(stateChangeListener); | ||
} | ||
|
||
function readerRunStateChangeListeners(reader) { | ||
const stateChangeListeners = reader._stateChangeListeners; | ||
reader._stateChangeListeners = []; | ||
for (const stateChangeListener of stateChangeListeners) { | ||
stateChangeListener(); | ||
} | ||
} | ||
|
||
function ReadableStreamBYOBReaderRead(reader, view, readIntoRequest) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's useful to keep the pipe going when dest has already become
"erroring"
? Any new writes will just error immediately, as per step 9 of WritableStreamDefaultWriterWrite.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.