diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index c150b8fa7a..49f6d19754 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -4073,10 +4073,7 @@ object Stream extends StreamLowPriority { } def outcomeJoiner: F[Unit] = - outcomes.stream - .evalMap(identity) - .compile - .drain + outcomes.stream.evalFlatten.compile.drain .guaranteeCase { case Outcome.Succeeded(_) => stop(None) >> output.close.void @@ -4117,6 +4114,21 @@ object Stream extends StreamLowPriority { parJoin(Int.MaxValue) } + /** Provides syntax for a stream of `F` effects. */ + implicit class StreamFOps[F[_], O](private val self: Stream[F, F[O]]) { + + /** Sequences the inner effects into the stream. */ + def evalFlatten: Stream[F, O] = + self.evalMap(identity) + + /** Evaluates up to `maxConcurrent` inner effects concurrently, emitting + * the results in order. + */ + def parEvalFlatten( + maxConcurrent: Int + )(implicit F: Concurrent[F]): Stream[F, O] = self.parEvalMap(maxConcurrent)(identity) + } + /** Provides syntax for pure streams. */ implicit final class PureOps[O](private val self: Stream[Pure, O]) extends AnyVal { diff --git a/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala b/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala index 09fe206d5b..3042e99335 100644 --- a/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala +++ b/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala @@ -47,7 +47,7 @@ class UnixSocketsSuite extends Fs2Suite with UnixSocketsSuitePlatform { val clients = (0 until 100).map(b => client(Chunk.singleton(b.toByte))) - (Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalMap(identity)) + (Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalFlatten) .concurrently(server) .compile .drain