diff --git a/index.bs b/index.bs index 568dfaf0e..54b791b71 100644 --- a/index.bs +++ b/index.bs @@ -2175,7 +2175,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h |source|.[=ReadableStream/[[storedError]]=]. 1. Otherwise, [=shutdown=] with |source|.[=ReadableStream/[[storedError]]=]. 1. Errors must be propagated backward: if |dest|.[=WritableStream/[[state]]=] - is or becomes "`errored`", then + is or becomes "`erroring`" or "`errored`", then 1. If |preventCancel| is false, [=shutdown with an action=] of ! [$ReadableStreamCancel$](|source|, |dest|.[=WritableStream/[[storedError]]=]) and with |dest|.[=WritableStream/[[storedError]]=]. @@ -2198,6 +2198,17 @@ The following abstract operations operate on {{ReadableStream}} instances at a h |originalError|, then: 1. If |shuttingDown| is true, abort these substeps. 1. Set |shuttingDown| to true. + 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform + ! [$ReadableStreamBYOBReaderRelease$](|reader|). + 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). + 1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|). +
The initial reader is released to ensure that any pending read requests + are immediately aborted, and no more chunks are pulled from |source|. A new reader is + acquired in order to keep |source| locked until the shutdown is [=finalized=], for example + to [=cancel a readable stream|cancel=] |source| if necessary. + This exchange of readers is not observable to author code and the user agent is free to + implement this differently, for example by keeping the same reader and internally aborting + its pending read requests. 1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and ! [$WritableStreamCloseQueuedOrInFlight$](|dest|) is false, 1. If any [=chunks=] have been read but not yet written, write them to |dest|. @@ -2210,6 +2221,10 @@ The following abstract operations operate on {{ReadableStream}} instances at a h ask to shutdown, optionally with an error |error|, then: 1. If |shuttingDown| is true, abort these substeps. 1. Set |shuttingDown| to true. + 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform + ! [$ReadableStreamBYOBReaderRelease$](|reader|). + 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). + 1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|). 1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and ! [$WritableStreamCloseQueuedOrInFlight$](|dest|) is false, 1. If any [=chunks=] have been read but not yet written, write them to |dest|. @@ -2218,10 +2233,9 @@ The following abstract operations operate on {{ReadableStream}} instances at a h 1. [=Finalize=], passing along |error| if it was given. * Finalize: both forms of shutdown will eventually ask to finalize, optionally with an error |error|, which means to perform the following steps: + 1. Assert: |reader| [=implements=] {{ReadableStreamDefaultReader}}. 1. Perform ! [$WritableStreamDefaultWriterRelease$](|writer|). - 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform - ! [$ReadableStreamBYOBReaderRelease$](|reader|). - 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). + 1. Perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). 1. If |signal| is not undefined, [=AbortSignal/remove=] |abortAlgorithm| from |signal|. 1. If |error| was given, [=reject=] |promise| with |error|. 1. Otherwise, [=resolve=] |promise| with undefined. diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index db1da4c73..4ca45d816 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -10,7 +10,8 @@ const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js') const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, - WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = require('./writable-streams.js'); + WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, writerAddStateChangeListener } = + require('./writable-streams.js'); const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); const ReadableByteStreamController = require('../../generated/ReadableByteStreamController.js'); @@ -134,7 +135,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC assert(IsReadableStreamLocked(source) === false); assert(IsWritableStreamLocked(dest) === false); - const reader = AcquireReadableStreamDefaultReader(source); + let reader = AcquireReadableStreamDefaultReader(source); const writer = AcquireWritableStreamDefaultWriter(dest); source._disturbed = true; @@ -200,6 +201,12 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } return transformPromiseWith(writer._readyPromise, () => { + if (shuttingDown === true) { + return promiseResolvedWith(true); + } + if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) { + return promiseResolvedWith(true); + } return new Promise((resolveRead, rejectRead) => { ReadableStreamDefaultReaderRead( reader, @@ -219,31 +226,50 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } // Errors must be propagated forward - isOrBecomesErrored(source, reader._closedPromise, storedError => { + function sourceIsOrBecomesErrored() { + const storedError = source._storedError; if (preventAbort === false) { shutdownWithAction(() => WritableStreamAbort(dest, storedError), true, storedError); } else { shutdown(true, storedError); } - }); + } // Errors must be propagated backward - isOrBecomesErrored(dest, writer._closedPromise, storedError => { + function destIsOrBecomesErroringOrErrored() { + const storedError = dest._storedError; if (preventCancel === false) { shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); } else { shutdown(true, storedError); } - }); + } // Closing must be propagated forward - isOrBecomesClosed(source, reader._closedPromise, () => { + function sourceIsOrBecomesClosed() { if (preventClose === false) { shutdownWithAction(() => WritableStreamDefaultWriterCloseWithErrorPropagation(writer)); } else { shutdown(); } - }); + } + + function checkState() { + const sourceState = source._state; + const destState = dest._state; + if (sourceState === 'errored') { + // Errors must be propagated forward + sourceIsOrBecomesErrored(); + } else if (destState === 'erroring' || destState === 'errored') { + // Errors must be propagated backward + destIsOrBecomesErroringOrErrored(); + } else if (sourceState === 'closed') { + // Closing must be propagated forward + sourceIsOrBecomesClosed(); + } + } + + checkState(); // Closing must be propagated backward if (WritableStreamCloseQueuedOrInFlight(dest) === true || dest._state === 'closed') { @@ -256,7 +282,13 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } } - setPromiseIsHandledToTrue(pipeLoop()); + if (!shuttingDown) { + assert(source._state === 'readable' && dest._state === 'writable'); + readerAddStateChangeListener(reader, checkState); + writerAddStateChangeListener(writer, checkState); + + setPromiseIsHandledToTrue(pipeLoop()); + } function waitForWritesToFinish() { // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait @@ -268,27 +300,13 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC ); } - function isOrBecomesErrored(stream, promise, action) { - if (stream._state === 'errored') { - action(stream._storedError); - } else { - uponRejection(promise, action); - } - } - - function isOrBecomesClosed(stream, promise, action) { - if (stream._state === 'closed') { - action(); - } else { - uponFulfillment(promise, action); - } - } - function shutdownWithAction(action, originalIsError, originalError) { if (shuttingDown === true) { return; } shuttingDown = true; + ReadableStreamDefaultReaderRelease(reader); + reader = AcquireReadableStreamDefaultReader(source); if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { uponFulfillment(waitForWritesToFinish(), doTheRest); @@ -310,6 +328,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC return; } shuttingDown = true; + ReadableStreamDefaultReaderRelease(reader); + reader = AcquireReadableStreamDefaultReader(source); if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error)); @@ -319,6 +339,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } function finalize(isError, error) { + assert(ReadableStreamDefaultReader.isImpl(reader)); WritableStreamDefaultWriterRelease(writer); ReadableStreamDefaultReaderRelease(reader); @@ -770,6 +791,7 @@ function ReadableStreamClose(stream) { } resolvePromise(reader._closedPromise, undefined); + readerRunStateChangeListeners(reader); if (ReadableStreamDefaultReader.isImpl(reader)) { const readRequests = reader._readRequests; @@ -794,6 +816,7 @@ function ReadableStreamError(stream, e) { rejectPromise(reader._closedPromise, e); setPromiseIsHandledToTrue(reader._closedPromise); + readerRunStateChangeListeners(reader); if (ReadableStreamDefaultReader.isImpl(reader)) { ReadableStreamDefaultReaderErrorReadRequests(reader, e); @@ -877,6 +900,8 @@ function ReadableStreamReaderGenericInitialize(reader, stream) { reader._stream = stream; stream._reader = reader; + reader._stateChangeListeners = []; + if (stream._state === 'readable') { reader._closedPromise = newPromise(); } else if (stream._state === 'closed') { @@ -910,6 +935,22 @@ function ReadableStreamReaderGenericRelease(reader) { stream._reader = undefined; reader._stream = undefined; + + reader._stateChangeListeners = []; +} + +function readerAddStateChangeListener(reader, stateChangeListener) { + const stream = reader._stream; + assert(stream !== undefined); + reader._stateChangeListeners.push(stateChangeListener); +} + +function readerRunStateChangeListeners(reader) { + const stateChangeListeners = reader._stateChangeListeners; + reader._stateChangeListeners = []; + for (const stateChangeListener of stateChangeListeners) { + stateChangeListener(); + } } function ReadableStreamBYOBReaderRead(reader, view, readIntoRequest) { diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index cf303bfe7..a83da61f0 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -32,7 +32,8 @@ Object.assign(exports, { WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterGetDesiredSize, WritableStreamDefaultWriterRelease, - WritableStreamDefaultWriterWrite + WritableStreamDefaultWriterWrite, + writerAddStateChangeListener }); // Working with writable streams @@ -143,6 +144,8 @@ function SetUpWritableStreamDefaultWriter(writer, stream) { writer._stream = stream; stream._writer = writer; + writer._stateChangeListeners = []; + const state = stream._state; if (state === 'writable') { @@ -239,6 +242,11 @@ function WritableStreamFinishErroring(stream) { } stream._writeRequests = []; + const writer = stream._writer; + if (writer !== undefined) { + writerRunStateChangeListeners(writer); + } + if (stream._pendingAbortRequest === undefined) { WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream); return; @@ -289,6 +297,7 @@ function WritableStreamFinishInFlightClose(stream) { const writer = stream._writer; if (writer !== undefined) { resolvePromise(writer._closedPromise, undefined); + writerRunStateChangeListeners(writer); } assert(stream._pendingAbortRequest === undefined); @@ -378,6 +387,7 @@ function WritableStreamStartErroring(stream, reason) { const writer = stream._writer; if (writer !== undefined) { WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + writerRunStateChangeListeners(writer, reason); } if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) { @@ -491,6 +501,8 @@ function WritableStreamDefaultWriterRelease(writer) { stream._writer = undefined; writer._stream = undefined; + + writer._stateChangeListeners = []; } function WritableStreamDefaultWriterWrite(writer, chunk) { @@ -526,6 +538,20 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { return promise; } +function writerAddStateChangeListener(writer, stateChangeListener) { + const stream = writer._stream; + assert(stream !== undefined); + writer._stateChangeListeners.push(stateChangeListener); +} + +function writerRunStateChangeListeners(writer) { + const stateChangeListeners = writer._stateChangeListeners; + writer._stateChangeListeners = []; + for (const stateChangeListener of stateChangeListeners) { + stateChangeListener(); + } +} + // Default controllers function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 99d74f952..1646d657b 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 99d74f9529e16ec0722ef11136ab29b9e80fff26 +Subproject commit 1646d657bf258f850afb5f22860ec02dccbc6b7a