Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2102,6 +2102,30 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
)(implicit F: Concurrent[F2]): Stream[F2, O2] =
merge_(that) { case (s, fin) => s.onFinalize(fin) }

/** Like `mergeAndAwaitDownstream`, but halts as soon as _either_ branch halts. */
def mergeAndAwaitDownstreamHaltBoth[F2[x] >: F[x]: Concurrent, O2 >: O](
that: Stream[F2, O2]
): Stream[F2, O2] =
noneTerminate.mergeAndAwaitDownstream(that.noneTerminate).unNoneTerminate

/** Like `mergeAndAwaitDownstream`, but halts as soon as the `s1` branch halts.
*
* Note: it is *not* guaranteed that the last element of the stream will come from `s1`.
*/
def mergeAndAwaitDownstreamHaltL[F2[x] >: F[x]: Concurrent, O2 >: O](
that: Stream[F2, O2]
): Stream[F2, O2] =
noneTerminate.mergeAndAwaitDownstream(that.map(Some(_))).unNoneTerminate

/** Like `mergeAndAwaitDownstream`, but halts as soon as the `s2` branch halts.
*
* Note: it is *not* guaranteed that the last element of the stream will come from `s2`.
*/
def mergeAndAwaitDownstreamHaltR[F2[x] >: F[x]: Concurrent, O2 >: O](
that: Stream[F2, O2]
): Stream[F2, O2] =
that.mergeAndAwaitDownstreamHaltL(this)

/** Interleaves the two inputs nondeterministically. The output stream
* halts after BOTH `s1` and `s2` terminate normally, or in the event
* of an uncaught failure on either `s1` or `s2`. Has the property that
Expand Down
Loading