Skip to content

Commit 5a522c5

Browse files
committed
Reimplement conflate operations
1 parent 3604eed commit 5a522c5

File tree

2 files changed

+17
-50
lines changed

2 files changed

+17
-50
lines changed

core/shared/src/main/scala/fs2/Stream.scala

+15-47
Original file line numberDiff line numberDiff line change
@@ -568,55 +568,24 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
568568
Stream.eval(fstream)
569569
}
570570

571-
def conflate[F2[x] >: F[x], O2 >: O](implicit
572-
F: Concurrent[F2],
573-
O: Semigroup[O2]
574-
): Stream[F2, O2] =
575-
conflateWithSeed[F2, O2](identity)((s, o) => O.combine(s, o))
571+
def conflateChunks[F2[x] >: F[x]: Concurrent](chunkLimit: Int): Stream[F2, Chunk[O]] =
572+
Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit)).flatMap { chan =>
573+
val producer = chunks.through(chan.sendAll)
574+
val consumer = chan.stream.underlying.unconsFlatMap(chunks => Pull.output1(chunks.iterator.reduce(_ ++ _))).stream
575+
consumer.concurrently(producer)
576+
}
576577

577-
def conflateWithSeed[F2[x] >: F[x], S](
578-
seed: O => S
579-
)(aggregate: (S, O) => S)(implicit F: Concurrent[F2]): Stream[F2, S] = {
578+
def conflate[F2[x] >: F[x]: Concurrent, O2](chunkLimit: Int, zero: O2)(f: (O2, O) => O2): Stream[F2, O2] =
579+
conflateChunks[F2](chunkLimit).map(_.foldLeft(zero)(f))
580580

581-
sealed trait State
582-
case object Closed extends State
583-
case object Quiet extends State
584-
case class Accumulating(value: S) extends State
585-
case class Awaiting(waiter: Deferred[F2, S]) extends State
586-
587-
Stream.eval(F.ref(Quiet: State)).flatMap { ref =>
588-
val producer = foreach { o =>
589-
ref.flatModify {
590-
case Closed =>
591-
(Closed, F.unit)
592-
case Quiet =>
593-
(Accumulating(seed(o)), F.unit)
594-
case Accumulating(acc) =>
595-
(Accumulating(aggregate(acc, o)), F.unit)
596-
case Awaiting(waiter) =>
597-
(Quiet, waiter.complete(seed(o)).void)
598-
}
599-
}
581+
def conflate1[F2[x] >: F[x]: Concurrent, O2 >: O](chunkLimit: Int)(f: (O2, O2) => O2): Stream[F2, O2] =
582+
conflateChunks[F2](chunkLimit).map(c => c.drop(1).foldLeft(c(0): O2)((x, y) => f(x, y)))
600583

601-
def consumerLoop: Pull[F2, S, Unit] =
602-
Pull.eval {
603-
F.deferred[S].flatMap { waiter =>
604-
ref.modify {
605-
case Closed =>
606-
(Closed, Pull.done)
607-
case Quiet =>
608-
(Awaiting(waiter), Pull.eval(waiter.get).flatMap(Pull.output1) >> consumerLoop)
609-
case Accumulating(s) =>
610-
(Quiet, Pull.output1(s) >> consumerLoop)
611-
case Awaiting(_) =>
612-
sys.error("Impossible")
613-
}
614-
}
615-
}.flatten
584+
def conflateSemigroup[F2[x] >: F[x]: Concurrent, O2 >: O: Semigroup](chunkLimit: Int): Stream[F2, O2] =
585+
conflate1[F2, O2](chunkLimit)(Semigroup[O2].combine)
616586

617-
consumerLoop.stream.concurrently(producer)
618-
}
619-
}
587+
def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](chunkLimit: Int)(f: O => O2): Stream[F2, O2] =
588+
map(f).conflateSemigroup[F2, O2](chunkLimit)
620589

621590
/** Prepends a chunk onto the front of this stream.
622591
*
@@ -2448,10 +2417,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
24482417
Stream.eval(Channel.bounded[F2, Chunk[O]](n)).flatMap { chan =>
24492418
chan.stream.unchunks.concurrently {
24502419
chunks.through(chan.sendAll)
2451-
24522420
}
24532421
}
2454-
2422+
24552423
/** Prints each element of this stream to standard out, converting each element to a `String` via `Show`. */
24562424
def printlns[F2[x] >: F[x], O2 >: O](implicit
24572425
F: Console[F2],

core/shared/src/test/scala/fs2/StreamConflateSuite.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@ import scala.concurrent.duration._
2828

2929
class StreamConflateSuite extends Fs2Suite {
3030

31-
test("conflate") {
31+
test("conflateMap") {
3232
TestControl.executeEmbed(
3333
Stream
3434
.iterate(0)(_ + 1)
3535
.covary[IO]
3636
.metered(10.millis)
37-
.map(List(_))
38-
.conflate
37+
.conflateMap(100)(List(_))
3938
.metered(101.millis)
4039
.take(5)
4140
.compile

0 commit comments

Comments
 (0)