Skip to content

Commit 1c66d40

Browse files
committed
Add initial implementation of conflate and conflateWithSeed
1 parent 7509023 commit 1c66d40

File tree

1 file changed

+50
-0
lines changed

1 file changed

+50
-0
lines changed

core/shared/src/main/scala/fs2/Stream.scala

+50
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,56 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
568568
Stream.eval(fstream)
569569
}
570570

571+
def conflate[F2[x] >: F[x], O2 >: O](implicit
572+
F: Concurrent[F2],
573+
O: Semigroup[O2]
574+
): Stream[F2, O2] =
575+
conflateWithSeed[F2, O2](identity)((s, o) => O.combine(s, o))
576+
577+
def conflateWithSeed[F2[x] >: F[x], S](
578+
seed: O => S
579+
)(aggregate: (S, O) => S)(implicit F: Concurrent[F2]): Stream[F2, S] = {
580+
581+
sealed trait State
582+
case object Closed extends State
583+
case object Quiet extends State
584+
case class Accumulating(value: S) extends State
585+
case class Awaiting(waiter: Deferred[F2, S]) extends State
586+
587+
Stream.eval(F.ref(Quiet: State)).flatMap { ref =>
588+
val producer = foreach { o =>
589+
ref.flatModify {
590+
case Closed =>
591+
(Closed, F.unit)
592+
case Quiet =>
593+
(Accumulating(seed(o)), F.unit)
594+
case Accumulating(acc) =>
595+
(Accumulating(aggregate(acc, o)), F.unit)
596+
case Awaiting(waiter) =>
597+
(Quiet, waiter.complete(seed(o)).void)
598+
}
599+
}
600+
601+
def consumerLoop: Pull[F2, S, Unit] =
602+
Pull.eval {
603+
F.deferred[S].flatMap { waiter =>
604+
ref.modify {
605+
case Closed =>
606+
(Closed, Pull.done)
607+
case Quiet =>
608+
(Awaiting(waiter), Pull.eval(waiter.get).flatMap(Pull.output1) >> consumerLoop)
609+
case Accumulating(s) =>
610+
(Quiet, Pull.output1(s) >> consumerLoop)
611+
case Awaiting(_) =>
612+
sys.error("Impossible")
613+
}
614+
}
615+
}.flatten
616+
617+
consumerLoop.stream.concurrently(producer)
618+
}
619+
}
620+
571621
/** Prepends a chunk onto the front of this stream.
572622
*
573623
* @example {{{

0 commit comments

Comments
 (0)