@@ -568,32 +568,49 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
568
568
Stream .eval(fstream)
569
569
}
570
570
571
+ /** Pulls up to the specified number of chunks from the source stream while concurrently allowing
572
+ * downstream to process emitted chunks. Unlike `prefetchN`, all accumulated chunks are emitted
573
+ * as a single chunk upon downstream pulling.
574
+ *
575
+ * The `chunkLimit` parameter controls backpressure on the source stream.
576
+ */
571
577
def conflateChunks [F2 [x] >: F [x]: Concurrent ](chunkLimit : Int ): Stream [F2 , Chunk [O ]] =
572
578
Stream .eval(Channel .bounded[F2 , Chunk [O ]](chunkLimit)).flatMap { chan =>
573
579
val producer = chunks.through(chan.sendAll)
574
580
val consumer = chan.stream.chunks.map(_.combineAll)
575
581
consumer.concurrently(producer)
576
582
}
577
583
584
+ /** Like `conflateChunks` but uses the supplied `zero` and `f` values to combine the elements of
585
+ * each output chunk in to a single value.
586
+ */
578
587
def conflate [F2 [x] >: F [x]: Concurrent , O2 ](chunkLimit : Int , zero : O2 )(
579
588
f : (O2 , O ) => O2
580
589
): Stream [F2 , O2 ] =
581
590
conflateChunks[F2 ](chunkLimit).map(_.foldLeft(zero)(f))
582
591
592
+ /** Like `conflate` but combines elements of the output chunk with the supplied function.
593
+ */
583
594
def conflate1 [F2 [x] >: F [x]: Concurrent , O2 >: O ](chunkLimit : Int )(
584
595
f : (O2 , O2 ) => O2
585
596
): Stream [F2 , O2 ] =
586
597
conflateChunks[F2 ](chunkLimit).map(c => c.drop(1 ).foldLeft(c(0 ): O2 )((x, y) => f(x, y)))
587
598
599
+ /** Like `conflate1` but combines elements using the semigroup of the output type.
600
+ */
588
601
def conflateSemigroup [F2 [x] >: F [x]: Concurrent , O2 >: O : Semigroup ](
589
602
chunkLimit : Int
590
603
): Stream [F2 , O2 ] =
591
604
conflate1[F2 , O2 ](chunkLimit)(Semigroup [O2 ].combine)
592
605
593
- def conflateMap [F2 [x] >: F [x]: Concurrent , O2 : Semigroup ](chunkLimit : Int )(
594
- f : O => O2
595
- ): Stream [F2 , O2 ] =
596
- map(f).conflateSemigroup[F2 , O2 ](chunkLimit)
606
+ /** Conflates elements and then maps the supplied function over the output chunk and combines the results using a semigroup.
607
+ */
608
+ def conflateMap [F2 [x] >: F [x]: Concurrent , O2 : Semigroup ](
609
+ chunkLimit : Int
610
+ )(f : O => O2 ): Stream [F2 , O2 ] =
611
+ conflateChunks[F2 ](chunkLimit).map(c =>
612
+ c.drop(1 ).foldLeft(f(c(0 )))((x, y) => Semigroup [O2 ].combine(x, f(y)))
613
+ )
597
614
598
615
/** Prepends a chunk onto the front of this stream.
599
616
*
0 commit comments