Skip to content

Commit 24282e2

Browse files
authored
Merge pull request #2653 from vasilmkd/concurrently-transformers-25x
Fix `Stream#concurrently` for short-circuiting monad transformers - `2.5.x`
2 parents 7c6a7b9 + 47bc084 commit 24282e2

File tree

3 files changed

+46
-16
lines changed

3 files changed

+46
-16
lines changed

core/shared/src/main/scala/fs2/Stream.scala

+14-15
Original file line numberDiff line numberDiff line change
@@ -545,25 +545,24 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
545545
)(implicit F: Concurrent[F2]): Stream[F2, O] = {
546546
val fstream: F2[Stream[F2, O]] = for {
547547
interrupt <- Deferred[F2, Unit]
548-
doneR <- Deferred[F2, Either[Throwable, Unit]]
548+
backResult <- Deferred[F2, Either[Throwable, Unit]]
549549
} yield {
550-
def runR: F2[Unit] =
551-
that.interruptWhen(interrupt.get.attempt).compile.drain.attempt.flatMap { r =>
552-
doneR.complete(r) >> {
553-
if (r.isLeft)
554-
interrupt
555-
.complete(())
556-
.attempt
557-
.void // interrupt only if this failed otherwise give change to `this` to finalize
558-
else F.unit
559-
}
560-
}
550+
def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt)
551+
552+
val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase {
553+
// Pass the result of backstream completion in the backResult deferred.
554+
// If result of back-stream was failed, interrupt fore. Otherwise, let it be
555+
case ExitCase.Error(t) =>
556+
backResult.complete(Left(t)).attempt >> interrupt.complete(()).handleError(_ => ())
557+
case _ => backResult.complete(Right(())).handleError(_ => ())
558+
}
561559

562560
// stop background process but await for it to finalise with a result
563-
val stopBack: F2[Unit] = interrupt.complete(()).attempt >> doneR.get.flatMap(F.fromEither)
561+
// We use F.fromEither to bring errors from the back into the fore
562+
val stopBack: F2[Unit] =
563+
interrupt.complete(()).attempt >> backResult.get.flatMap(F.fromEither)
564564

565-
Stream.bracket(F.start(runR))(_ => stopBack) >>
566-
this.interruptWhen(interrupt.get.attempt)
565+
Stream.bracket(F.start(compileBack))(_ => stopBack) >> watch(this)
567566
}
568567

569568
Stream.eval(fstream).flatten

core/shared/src/test/scala/fs2/StreamConcurrentlySuite.scala

+19
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package fs2
2323

2424
import scala.concurrent.duration._
2525

26+
import cats.data.EitherT
2627
import cats.effect.IO
2728
import cats.effect.concurrent.{Deferred, Ref, Semaphore}
2829
import cats.syntax.all._
@@ -146,4 +147,22 @@ class StreamConcurrentlySuite extends Fs2Suite {
146147
}
147148
}
148149

150+
test("background stream completes with short-circuiting transformers") {
151+
Stream(1, 2, 3)
152+
.concurrently(Stream.eval(EitherT.leftT[IO, Int]("left")))
153+
.compile
154+
.lastOrError
155+
.value
156+
.assertEquals(Right(3))
157+
}
158+
159+
test("foreground stream short-circuits") {
160+
Stream(1, 2, 3)
161+
.evalMap(n => EitherT.cond[IO](n % 2 == 0, n, "left"))
162+
.concurrently(Stream.eval(EitherT.rightT[IO, String](42)))
163+
.compile
164+
.lastOrError
165+
.value
166+
.assertEquals(Left("left"))
167+
}
149168
}

io/src/test/scala/fs2/io/IoSuite.scala

+13-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package fs2
2323
package io
2424

25+
import cats.data.EitherT
2526
import cats.effect.{Blocker, IO, Resource}
2627
import fs2.Fs2Suite
2728
import org.scalacheck.{Arbitrary, Gen, Shrink}
@@ -215,7 +216,8 @@ class IoSuite extends Fs2Suite {
215216
}
216217

217218
test("can copy more than Int.MaxValue bytes") {
218-
// Unit test adapted from the original issue reproduction at https://github.com/mrdziuban/fs2-writeOutputStream.
219+
// Unit test adapted from the original issue reproduction at
220+
// https://github.com/mrdziuban/fs2-writeOutputStream.
219221

220222
val byteStream =
221223
Stream
@@ -238,6 +240,16 @@ class IoSuite extends Fs2Suite {
238240
.drain
239241
}
240242
}
243+
244+
test("works with short-circuiting monad transformers") {
245+
// Unit test adapted from the original issue reproduction at
246+
// https://github.com/mrdziuban/fs2-readOutputStream-EitherT.
247+
248+
Blocker[IO].use { blocker =>
249+
readOutputStream(blocker, 1)(_ => EitherT.left[Unit](IO.unit)).compile.drain.value
250+
.timeout(5.seconds)
251+
}
252+
}
241253
}
242254

243255
group("unsafeReadInputStream") {

0 commit comments

Comments
 (0)