Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ Backpressure {#concept-backpressure}

Backpressure is the mechanism by which a consumer signals to a producer that it should slow down. In this API, backpressure is managed through [=backpressure policies=]:

* **`"strict"`** (default): Catches producers that ignore backpressure. Properly awaited writes wait for buffer space; un-awaited writes exceeding the buffer limit cause a rejection. Both the slots buffer and pending writes queue are limited by `highWaterMark`.
* **`"strict"`** (default): Catches producers that ignore backpressure. Properly awaited asynchronous writes wait for buffer space; un-awaited writes exceeding the buffer limit cause a rejection. Both the slots buffer and pending writes queue are limited to `highWaterMark` entries. Synchronous writes (`writeSync`, `writevSync`) only attempt the slots buffer; if it is full, they return `false` without placing data into the pending writes queue.

* **`"block"`**: Async writes wait for buffer space. Sync writes return `false` when the buffer is full. The pending writes queue is unbounded.
* **`"block"`**: Asynchronous writes that cannot be placed in the slots buffer are placed into the pending writes queue, which is unbounded, and resolve when promoted to the slots buffer. Synchronous writes return `false` when the slots buffer is full — the data is not accepted. The caller should fall back to the asynchronous method, which handles the pending queue.

* **`"drop-oldest"`**: Oldest buffered data is discarded to make room for new data. Writes never block.
* **`"drop-oldest"`**: Oldest buffered data is discarded to make room for new data. Writes never block or wait.

* **`"drop-newest"`**: Incoming data is silently discarded when buffer is full. Writes never block.
* **`"drop-newest"`**: Incoming data is silently discarded when buffer is full. Writes never block or wait.

The `highWaterMark` option controls the buffer size in slots (chunk batches), not bytes. It is clamped to a minimum of 1. Implementations may also apply a reasonable upper limit; the specific maximum is implementation-defined, but must be at least 1024.

Expand Down Expand Up @@ -367,7 +367,7 @@ Like {{Writer}}, {{SyncWriter}} is an interface, not a concrete class. Implement

{{SyncWriter}} follows the same [=backpressure policies=] as {{Writer}} with adaptations for synchronous operation:

* With `"block"`, {{SyncWriter/writeSync()}} and {{SyncWriter/writevSync()}} always enqueue the chunk(s) and return `true` when the buffer has space, or enqueue and return `false` when the buffer is full. The `false` return is a backpressure signal; the data is still accepted, but the caller should slow down.
* With `"block"`, {{SyncWriter/writeSync()}} and {{SyncWriter/writevSync()}} enqueue the chunk(s) and return `true` when the buffer has space, or return `false` when the buffer is full. The data is not accepted; the caller should retry when capacity becomes available.
* With `"strict"`, writes that exceed the buffer capacity throw a {{RangeError}}.
* With `"drop-oldest"` and `"drop-newest"`, writes behave as described in [[#concept-backpressure]]: the oldest or newest data is discarded respectively. Writes never fail.
* {{SyncWriter/endSync()}} throws a {{TypeError}} if the writer is already closed or errored.
Expand Down Expand Up @@ -519,7 +519,7 @@ When called, it performs the following steps:
<li>Let |backpressure| be |options|["{{PushStreamOptions/backpressure}}"].
<li>Let |signal| be |options|["{{PushStreamOptions/signal}}"] if present; otherwise `undefined`.
<li>Create an internal slots buffer with capacity |highWaterMark|.
<li>Create a pending writes queue. If |backpressure| is `"strict"`, limit its capacity to |highWaterMark|.
<li>Create a pending writes queue. If |backpressure| is `"strict"`, limit its capacity to |highWaterMark|. The pending writes queue is used exclusively by the asynchronous write methods ({{Writer/write()}}, {{Writer/writev()}}); the synchronous methods ({{Writer/writeSync()}}, {{Writer/writevSync()}}) do not interact with it.
<li>Let |writer| be a new {{Writer}} backed by the internal buffer, the pending writes queue, the [=backpressure policy=] |backpressure|, and |signal| if present.
<li>Let |pipelineController| be a new {{AbortController}}. If |signal| is not `undefined`, set |pipelineController|'s signal to follow |signal|.
<li>Let |bufferIterable| be an async iterable that dequeues the next batch from the slots buffer on each step (waiting if empty).
Expand All @@ -538,6 +538,17 @@ The Writer interface {#writer-section}

A {{Writer}} provides the interface for producing data. Implementations of {{Writer}} are returned by {{Stream/push()}}, {{Stream/broadcast()}}, and {{Stream/duplex()}}, but any object conforming to this interface can serve as a writer.

A {{Writer}} backed by a [=push stream=] maintains two internal queues:

* The **slots buffer**: a bounded queue of batches ready for the consumer to read. Its capacity is `highWaterMark` entries.
* The **pending writes queue**: a queue of writes waiting for space in the slots buffer. Under `"strict"`, its capacity is also `highWaterMark` entries. Under `"block"`, it is unbounded. Under `"drop-oldest"` and `"drop-newest"`, it is not used (writes are resolved immediately).

The synchronous methods ({{Writer/writeSync()}}, {{Writer/writevSync()}}) operate exclusively on the slots buffer. If the slots buffer has space, the batch is placed directly into it and `true` is returned. If the slots buffer is full, the synchronous method returns `false` — it never places data into the pending writes queue.

The asynchronous methods ({{Writer/write()}}, {{Writer/writev()}}) first attempt to place the batch into the slots buffer. If the slots buffer is full, the write is placed into the pending writes queue (for `"strict"` and `"block"` policies). When the consumer reads and frees slots, pending writes are promoted from the pending queue into the slots buffer in order, and their promises resolve.

If the bonded readable's consumer stops iterating (the async iterator returns or throws), the writer's consumer is no longer <dfn>active</dfn>. Subsequent synchronous writes return `false` and asynchronous writes reject, since the data has no consumer to receive it.

### `Writer.desiredSize` ### {#writer-desiredsize}

<div algorithm>
Expand All @@ -555,8 +566,9 @@ The <dfn method for="Writer">write(chunk, options)</dfn> method writes a single

<ol>
<li>If |chunk| is a {{USVString}}, set |chunk| to the result of [=UTF-8 encode|UTF-8 encoding=] |chunk|.
<li>If the writer is closed, return [=a promise rejected with=] a {{TypeError}}.
<li>If the writer is closed or [=closing=], return [=a promise rejected with=] a {{TypeError}}.
<li>If the writer is errored, return [=a promise rejected with=] the stored error.
<li>If the bonded consumer is not [=active=], return [=a promise rejected with=] a {{TypeError}} (or, if the consumer's iterator threw an error, reject with that error).
<li>If |options|["{{WriteOptions/signal}}"] is present and [=AbortSignal/aborted=], return [=a promise rejected with=] its abort reason.
<li>Let |batch| be « |chunk| ».
<li>If the slots buffer has space, enqueue |batch|, [=notify drain waiters=], and return [=a promise resolved with=] `undefined`.
Expand Down Expand Up @@ -584,15 +596,15 @@ The <dfn method for="Writer">writeSync(chunk)</dfn> method attempts a synchronou

<ol>
<li>If |chunk| is a {{USVString}}, set |chunk| to the result of [=UTF-8 encode|UTF-8 encoding=] |chunk|.
<li>If the writer is closed or errored, return `false`.
<li>If the writer is closed, [=closing=], or errored, or if the bonded consumer is not [=active=], return `false`.
<li>Let |batch| be « |chunk| ».
<li>If the slots buffer has space, enqueue |batch|, [=notify drain waiters=], and return `true`.
<li>The slots buffer is full. Proceed based on the [=backpressure policy=]:
<ol>
<li>If `"drop-oldest"`: dequeue the oldest batch, enqueue |batch|, and return `true`.
<li>If `"drop-newest"`: discard |batch| and return `true`.
<li>If `"strict"`: return `false`.
<li>If `"block"`: enqueue |batch| and return `false`. The data is accepted, but the `false` return signals backpressure.
<li>If `"block"`: return `false`.
</ol>
</ol>

Expand Down
Loading