We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
2 parents 1154536 + e6dbe2c commit 663e286Copy full SHA for 663e286
core/shared/src/test/scala/fs2/ParEvalMapSuite.scala
@@ -208,13 +208,15 @@ class ParEvalMapSuite extends Fs2Suite {
208
}
209
210
def check(pipe: Pipe[IO, IO[Unit], Unit]) =
211
- IO.deferred[Unit]
212
- .flatMap { d =>
213
- val cancelled = IO.never.onCancel(d.complete(()).void)
214
- val stream = Stream(u, cancelled).covary[IO]
+ (CountDownLatch[IO](2), IO.deferred[Unit])
+ .mapN { case (latch, d) =>
+ val w = latch.release *> latch.await
+ val cancelled = IO.uncancelable(poll => w *> poll(IO.never).onCancel(d.complete(()).void))
215
+ val stream = Stream(w *> u, cancelled).covary[IO]
216
val action = stream.through(pipe).take(1).compile.drain
217
action *> d.get
218
219
+ .flatten
220
.assertEquals(())
221
222
0 commit comments