Skip to content

Commit 33320fa

Browse files
committed
Further simplify conflateChunks
1 parent 5a522c5 commit 33320fa

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

core/shared/src/main/scala/fs2/Stream.scala

+14-6
Original file line numberDiff line numberDiff line change
@@ -571,20 +571,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
571571
def conflateChunks[F2[x] >: F[x]: Concurrent](chunkLimit: Int): Stream[F2, Chunk[O]] =
572572
Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit)).flatMap { chan =>
573573
val producer = chunks.through(chan.sendAll)
574-
val consumer = chan.stream.underlying.unconsFlatMap(chunks => Pull.output1(chunks.iterator.reduce(_ ++ _))).stream
574+
val consumer = chan.stream.chunks.map(_.combineAll)
575575
consumer.concurrently(producer)
576576
}
577577

578-
def conflate[F2[x] >: F[x]: Concurrent, O2](chunkLimit: Int, zero: O2)(f: (O2, O) => O2): Stream[F2, O2] =
578+
def conflate[F2[x] >: F[x]: Concurrent, O2](chunkLimit: Int, zero: O2)(
579+
f: (O2, O) => O2
580+
): Stream[F2, O2] =
579581
conflateChunks[F2](chunkLimit).map(_.foldLeft(zero)(f))
580582

581-
def conflate1[F2[x] >: F[x]: Concurrent, O2 >: O](chunkLimit: Int)(f: (O2, O2) => O2): Stream[F2, O2] =
583+
def conflate1[F2[x] >: F[x]: Concurrent, O2 >: O](chunkLimit: Int)(
584+
f: (O2, O2) => O2
585+
): Stream[F2, O2] =
582586
conflateChunks[F2](chunkLimit).map(c => c.drop(1).foldLeft(c(0): O2)((x, y) => f(x, y)))
583587

584-
def conflateSemigroup[F2[x] >: F[x]: Concurrent, O2 >: O: Semigroup](chunkLimit: Int): Stream[F2, O2] =
588+
def conflateSemigroup[F2[x] >: F[x]: Concurrent, O2 >: O: Semigroup](
589+
chunkLimit: Int
590+
): Stream[F2, O2] =
585591
conflate1[F2, O2](chunkLimit)(Semigroup[O2].combine)
586592

587-
def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](chunkLimit: Int)(f: O => O2): Stream[F2, O2] =
593+
def conflateMap[F2[x] >: F[x]: Concurrent, O2: Semigroup](chunkLimit: Int)(
594+
f: O => O2
595+
): Stream[F2, O2] =
588596
map(f).conflateSemigroup[F2, O2](chunkLimit)
589597

590598
/** Prepends a chunk onto the front of this stream.
@@ -2419,7 +2427,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
24192427
chunks.through(chan.sendAll)
24202428
}
24212429
}
2422-
2430+
24232431
/** Prints each element of this stream to standard out, converting each element to a `String` via `Show`. */
24242432
def printlns[F2[x] >: F[x], O2 >: O](implicit
24252433
F: Console[F2],

0 commit comments

Comments
 (0)