Skip to content

.conflateChunks does not respect the provided chunk limit #3661

@domaspoliakas

Description

@domaspoliakas

When running the following code:

    Stream(1, 2, 3, 4, 5, 6, 7)
      .covary[IO]
      .chunkLimit(1)
      .unchunks
      .conflateChunks(3)
      .compile
      .toList
      .flatMap(IO.println(_))

The output of the console will occasionally contain lists of 4 elements. I have a diff with a unit test that reproduces this behavior: 13ac7a5

I think I also have found the cause for this.

The implementation of .conflateChunks relies on a bounded Channel, however not on its explicit contract but its implicit behavior (or at least implicit as per my understanding). The bounded channel when pulled on will emit all values it has currently buffered in one chunk. However, it will also append the values of any producers that are currently blocked on publishing into the channel due to the channel being full (https://github.com/typelevel/fs2/blob/main/core/shared/src/main/scala/fs2/concurrent/Channel.scala#L242-L248). This means that when pulled on the channel will produce a chunk the maximum size of which is the provided limit + number of producers. As a result .conflateChunks also inherits this behavior, but this causes it to break its contract.

I don't mind taking a stab at this but would appreciate the maintainers' advice on what the preferred implementation strategy would be. Given that within the confines of .conflateChunks we know the number of producers (i.e. one producer) one solution could be to create a channel of size limit - 1, and currently this would solve the issue. However, this would still mean that .conflateChunks is tied to implementation details of the bounded Channel. Is this an acceptable tradeoff? Or would a fresh implementation be necessary/preferred in this case?

Thank you for your time.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions