Skip to content

Need API for producers to clean up after consumers are done #10

@jedwards1211

Description

@jedwards1211

With a Node Readable I can clean up when the consumer cancels a stream like so:

  function consumer(stream: Readable) {
    let chunkCount = 0

    stream.on('data', (chunk) => {
      console.log(chunk)
      if (++chunkCount >= 2) {
        stream.destroy() // this causes the producer's `destroy` to get called
      }
    })
  }

  consumer(
    new Readable({
      read(size) {
        // ...
      },
      destroy(err, callback) {
        console.log('freeing resources...')
        // ...
      },
    })
  )

With a web ReadableStream I can do it like this:

  async function consumer(stream: ReadableStream<Buffer>) {
    const reader = stream.getReader()

    let chunkCount = 0
    let next
    while (!(next = await reader.read()).done) {
      console.log(next.value)
      if (++chunkCount >= 2) {
        await reader.cancel() // this causes the producer's `cancel` to get called
        return
      }
    }
  }
  await consumer(
    new ReadableStream<Buffer>({
      start(controller) {
        // ...
      },
      cancel(reason) {
        console.log('freeing resources...')
        // ...
      },
    })
  )

I don't see any analogous API in the docs that allows me to do this with new-streams.

  async function consumer(
    stream: AsyncIterable<Buffer>
    // I really hope you're not going to say we should pass an `AbortController`
    // here; it would be a mess to pass separate cancelation mechanisms along with
    // streams everywhere
  ) {
    let chunkCount = 0

    for await (const chunks of readable) {
      for (const chunk of chunks) {
        console.log(chunk)
        if (++chunkCount >= 2) {
          return
          // when the for..await loop .return()s the stream's iterator
          // that could trigger a call to some kind of cleanup method;
          // right now there's no way for the producing code to get
          // notified that the consumer is no longer interested in the
          // stream.
        }
      }
    }

  }

  const { writer, readable } = Stream.push({
    highWaterMark: 10,
    backpressure: 'drop-oldest',
    // need some kind of cleanup() { ... } method here
  })
  await consumer(readable)

It seems like in async iterable implementations from spec, .return() doesn't take effect until outstanding .next() calls resolve, but as discussed elsewhere this wrecks the ability to safely clean up, so it would be better if your .return() immediately aborts the stream and any outstanding .next() calls.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions