Skip to content

Commit ad9e8c7

Browse files
committed
mergePreferred
1 parent 2efbede commit ad9e8c7

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed

Diff for: core/shared/src/main/scala/fs2/Stream.scala

+218
Original file line numberDiff line numberDiff line change
@@ -2015,6 +2015,224 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
20152015
): Stream[F2, O2] =
20162016
that.mergeHaltL(this)
20172017

2018+
/** Merges two streams with priority given to the first stream.
2019+
*
2020+
* Internally, this uses two bounded queues (of one element).
2021+
* Queue has tryTake() which allows to try a non-blocking read.
2022+
* This is used to check for elements on the prioritized queue,
2023+
* before a blocking read through racePair() is tried on both
2024+
* queues, if no data is available on the prioritized queue.
2025+
*/
2026+
def mergePreferred[F2[x] >: F[x], O2 >: O](
2027+
that: Stream[F2, O2]
2028+
)(implicit F: Concurrent[F2]): Stream[F2, O2] = {
2029+
val fstream: F2[Stream[F2, O2]] =
2030+
for {
2031+
interrupt <- F.deferred[Unit]
2032+
resultL <- F.deferred[Either[Throwable, Unit]]
2033+
resultR <- F.deferred[Either[Throwable, Unit]]
2034+
resultQL <- Queue.bounded[F2, Option[Stream[F2, O2]]](1)
2035+
resultQR <- Queue.bounded[F2, Option[Stream[F2, O2]]](1)
2036+
} yield {
2037+
2038+
def watchInterrupted(str: Stream[F2, O2]): Stream[F2, O2] =
2039+
str.interruptWhen(interrupt.get.attempt)
2040+
2041+
// action to signal that one stream is finished (by putting a None in it)
2042+
def doneAndClose(q: Queue[F2, Option[Stream[F2, O2]]]): F2[Unit] = q.offer(None).void
2043+
2044+
// action to interrupt the processing of both streams by completing interrupt
2045+
val signalInterruption: F2[Unit] = interrupt.complete(()).void
2046+
2047+
// Read from a stream and (possibly blocking) write to the bounded queue for that stream
2048+
def go(s: Stream[F2, O2], q: Queue[F2, Option[Stream[F2, O2]]]): Pull[F2, Nothing, Unit] =
2049+
s.pull.uncons
2050+
.flatMap {
2051+
case Some((hd, tl)) =>
2052+
val send = q.offer(Some(Stream.chunk(hd)))
2053+
Pull.eval(send) >> go(tl, q)
2054+
case None =>
2055+
Pull.done
2056+
}
2057+
2058+
def runStream(
2059+
s: Stream[F2, O2],
2060+
whenDone: Deferred[F2, Either[Throwable, Unit]],
2061+
q: Queue[F2, Option[Stream[F2, O2]]]
2062+
): F2[Unit] = {
2063+
val str = watchInterrupted(go(s, q).stream)
2064+
str.compile.drain.attempt
2065+
.flatMap {
2066+
// signal completion of our side before we will signal interruption,
2067+
// to make sure our result is always available to others
2068+
case r @ Left(_) =>
2069+
whenDone.complete(r) >> signalInterruption
2070+
case r @ Right(_) =>
2071+
whenDone.complete(r) >> doneAndClose(q)
2072+
}
2073+
}
2074+
2075+
// Typedef for the fibres that read from the queues.
2076+
// That's contained in the Either returned by racePair()
2077+
type FBR = Fiber[F2, Throwable, Option[Stream[F2, O2]]]
2078+
2079+
// An ADT for tracking state of the two queues.
2080+
// The types describe the state, starting with BothActive.
2081+
// Next state is either LeftDone or RightDone.
2082+
// Final state is BothDone.
2083+
// The members of those states store the loosing fibre
2084+
// of a racePair()-call, which will be reused during the
2085+
// next read.
2086+
sealed trait QueuesState
2087+
final case class BothActive(v: Option[Either[FBR, FBR]]) extends QueuesState
2088+
final case class LeftDone(rFbr: Option[FBR]) extends QueuesState
2089+
final case class RightDone(lFbr: Option[FBR]) extends QueuesState
2090+
case object BothDone extends QueuesState
2091+
2092+
// Race the given effects, returning the result of the winner
2093+
// plus the still active fibre of the looser
2094+
def raceQueues(
2095+
lq: F2[Option[Stream[F2, O2]]],
2096+
rq: F2[Option[Stream[F2, O2]]]
2097+
): F2[(Option[Stream[F2, O2]], Either[FBR, FBR])] =
2098+
F.racePair(lq, rq)
2099+
.flatMap {
2100+
case Left((result, fiber)) =>
2101+
result.embedError.map(_ -> fiber.asRight[FBR])
2102+
case Right((fiber, result)) =>
2103+
result.embedError.map(_ -> fiber.asLeft[FBR])
2104+
}
2105+
2106+
// stream that is generated from pumping out the elements of the queue.
2107+
val pumpFromQueue: Stream[F2, O2] =
2108+
Stream
2109+
.unfoldEval[F2, QueuesState, Stream[F2, O2]](BothActive(None)) { s =>
2110+
// Returning None from unfoldEval will stop the stream. If we read a None
2111+
// from any queue, we cannot return that but must continue reading on the
2112+
// other queue. Thus, we need a method which can be called recursively to
2113+
// continue reading in case of None.
2114+
def readNext(s: QueuesState): F2[(Option[Stream[F2, O2]], QueuesState)] =
2115+
s match {
2116+
// The initial state, both queues are active and there are no fibres left over
2117+
case BothActive(None) =>
2118+
// check available data on left, which would be prioritized
2119+
resultQL.tryTake
2120+
.flatMap {
2121+
_.fold(
2122+
// no data available on prioritized queue, race both queues
2123+
raceQueues(resultQL.take, resultQR.take)
2124+
.flatMap[(Option[Stream[F2, O2]], QueuesState)] {
2125+
case (None, Left(fbr)) =>
2126+
readNext(RightDone(fbr.some))
2127+
case (None, Right(fbr)) =>
2128+
readNext(LeftDone(fbr.some))
2129+
case (Some(s), fbr) =>
2130+
F.pure(s.some -> BothActive(fbr.some))
2131+
}
2132+
)(os =>
2133+
// we read data from the prioritized queue, however, this sill could be a None,
2134+
// signalling that queue is done. Handle that:
2135+
os.fold(readNext(LeftDone(None)))(ls =>
2136+
F.pure(ls.some -> BothActive(None))
2137+
)
2138+
)
2139+
}
2140+
2141+
// right was looser during the last run
2142+
case BothActive(Some(Right(fbr))) =>
2143+
// anyway, check for available data on left first, ignoring the incoming fibre for right
2144+
resultQL.tryTake
2145+
.flatMap(
2146+
_.fold(
2147+
// use the incoming fibre to read from right queue
2148+
raceQueues(resultQL.take, fbr.joinWithNever)
2149+
.flatMap[(Option[Stream[F2, O2]], QueuesState)] {
2150+
case (None, Left(fbr)) =>
2151+
readNext(RightDone(fbr.some))
2152+
case (None, Right(fbr)) =>
2153+
readNext(LeftDone(fbr.some))
2154+
case (Some(s), fbr) =>
2155+
F.pure(s.some -> BothActive(fbr.some))
2156+
}
2157+
)(os =>
2158+
// important to reuse the incoming fibre here!
2159+
os.fold(readNext(LeftDone(fbr.some)))(ls =>
2160+
F.pure(ls.some -> BothActive(fbr.asRight[FBR].some))
2161+
)
2162+
)
2163+
)
2164+
2165+
// left was looser during the last run
2166+
case BothActive(Some(Left(fbr))) =>
2167+
// Can't check for available data on left this time,
2168+
// because there's an active fibre reading from the left queue.
2169+
// Start a race and reuse that fibre for left.
2170+
raceQueues(fbr.joinWithNever, resultQR.take)
2171+
.flatMap[(Option[Stream[F2, O2]], QueuesState)] {
2172+
case (None, Left(fbr)) =>
2173+
readNext(RightDone(fbr.some))
2174+
case (None, Right(fbr)) =>
2175+
readNext(LeftDone(fbr.some))
2176+
case (Some(s), fbr) =>
2177+
F.pure(s.some -> BothActive(fbr.some))
2178+
}
2179+
2180+
// Left queue is done, but, it's possible we retrieve an active fibre for right.
2181+
case LeftDone(fbr) =>
2182+
fbr
2183+
.map(_.joinWithNever) // join the incoming fibre if given
2184+
.getOrElse(resultQR.take) // ordinary take() if no fibre has been given
2185+
.map {
2186+
case None =>
2187+
None -> BothDone
2188+
case os =>
2189+
os -> LeftDone(None)
2190+
}
2191+
2192+
// mirror case of above
2193+
case RightDone(fbr) =>
2194+
fbr
2195+
.map(_.joinWithNever)
2196+
.getOrElse(resultQL.take)
2197+
.map {
2198+
case None =>
2199+
None -> BothDone
2200+
case os =>
2201+
os -> RightDone(None)
2202+
}
2203+
2204+
// this should never happen, but we need to make the compiler happy
2205+
case BothDone =>
2206+
F.pure(None -> BothDone)
2207+
}
2208+
2209+
// readNext() returns None in _1 if and only if both queues are done
2210+
readNext(s).map {
2211+
case (None, _) =>
2212+
None // finish the stream (unfoldEval)
2213+
case (Some(s), st) =>
2214+
(s -> st).some // emit element and new state (unfoldEval)
2215+
}
2216+
}
2217+
.flatten // we have Stream[F2, Stream[F2, O2]] and flatten that to Stream[F2, O2]
2218+
2219+
val atRunEnd: F2[Unit] =
2220+
for {
2221+
_ <- signalInterruption // interrupt so the upstreams have chance to complete
2222+
left <- resultL.get
2223+
right <- resultR.get
2224+
r <- F.fromEither(CompositeFailure.fromResults(left, right))
2225+
} yield r
2226+
2227+
val runStreams =
2228+
runStream(this, resultL, resultQL).start >> runStream(that, resultR, resultQR).start
2229+
2230+
Stream.bracket(runStreams)(_ => atRunEnd) >> watchInterrupted(pumpFromQueue)
2231+
}
2232+
Stream.eval(fstream).flatten
2233+
2234+
}
2235+
20182236
/** Given two sorted streams emits a single sorted stream, like in merge-sort.
20192237
* For entries that are considered equal by the Order, left stream element is emitted first.
20202238
* Note: both this and another streams MUST BE ORDERED already

Diff for: core/shared/src/test/scala/fs2/StreamSuite.scala

+6
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,12 @@ class StreamSuite extends Fs2Suite {
401401
}
402402
}
403403

404+
test("mergePreferred") {
405+
testCancelation {
406+
constantStream.mergePreferred(constantStream)
407+
}
408+
}
409+
404410
test("parJoin") {
405411
testCancelation {
406412
Stream(constantStream, constantStream).parJoin(2)

0 commit comments

Comments
 (0)