Skip to content

Commit 12dfdbf

Browse files
authored
Merge pull request #2922 from armanbilge/fix/synchronous-channel-fifo
Respect FIFO for bounded Channel
2 parents aa30a5e + d7a0d85 commit 12dfdbf

File tree

2 files changed

+31
-6
lines changed

2 files changed

+31
-6
lines changed

core/shared/src/main/scala/fs2/concurrent/Channel.scala

+13-6
Original file line numberDiff line numberDiff line change
@@ -218,19 +218,25 @@ object Channel {
218218
else (state.copy(waiting = waiting.some), state)
219219
}
220220
.flatMap {
221-
case s @ State(values, stateSize, ignorePreviousWaiting @ _, producers, closed) =>
221+
case s @ State(
222+
initValues,
223+
stateSize,
224+
ignorePreviousWaiting @ _,
225+
producers,
226+
closed
227+
) =>
222228
if (shouldEmit(s)) {
223229
var size = stateSize
224-
var allValues = values
230+
val tailValues = List.newBuilder[A]
225231
var unblock = F.unit
226232

227233
producers.foreach { case (value, producer) =>
228234
size += 1
229-
allValues = value :: allValues
235+
tailValues += value
230236
unblock = unblock <* producer.complete(())
231237
}
232238

233-
val toEmit = makeChunk(allValues, size)
239+
val toEmit = makeChunk(initValues, tailValues.result(), size)
234240

235241
unblock.as(Pull.output(toEmit) >> consumeLoop)
236242
} else {
@@ -258,11 +264,12 @@ object Channel {
258264

259265
@inline private def shouldEmit(s: State) = s.values.nonEmpty || s.producers.nonEmpty
260266

261-
private def makeChunk(allValues: List[A], size: Int): Chunk[A] = {
267+
private def makeChunk(init: List[A], tail: List[A], size: Int): Chunk[A] = {
262268
val arr = new Array[Any](size)
263269
var i = size - 1
264-
var values = allValues
270+
var values = tail
265271
while (i >= 0) {
272+
if (values.isEmpty) values = init
266273
arr(i) = values.head
267274
values = values.tail
268275
i -= 1

core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala

+18
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package concurrent
2424

2525
import cats.syntax.all._
2626
import cats.effect.IO
27+
import cats.effect.testkit.TestControl
2728
import scala.concurrent.duration._
2829

2930
import org.scalacheck.effect.PropF.forAllF
@@ -132,4 +133,21 @@ class ChannelSuite extends Fs2Suite {
132133
p.assertEquals(true)
133134
}
134135

136+
test("Channel.synchronous respects fifo") {
137+
val l = for {
138+
chan <- Channel.synchronous[IO, Int]
139+
_ <- (0 until 5).toList.traverse_ { i =>
140+
val f = for {
141+
_ <- IO.sleep(i.second)
142+
_ <- chan.send(i)
143+
_ <- if (i == 4) chan.close.void else IO.unit
144+
} yield ()
145+
f.start
146+
}
147+
result <- IO.sleep(5.seconds) *> chan.stream.compile.toList
148+
} yield result
149+
150+
TestControl.executeEmbed(l).assertEquals((0 until 5).toList)
151+
}
152+
135153
}

0 commit comments

Comments
 (0)