Skip to content

Commit 2b40725

Browse files
committed
Fix #3076 parEvalMap resource scoping
Updates parEvalMap* and broadcastThrough to extend the resource scope past the channel/topic used to implement concurrency for these operators.
1 parent c0aa9f1 commit 2b40725

File tree

2 files changed

+79
-34
lines changed

2 files changed

+79
-34
lines changed

core/shared/src/main/scala/fs2/Stream.scala

+53-34
Original file line numberDiff line numberDiff line change
@@ -238,31 +238,39 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
238238
*/
239239
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](pipes: Pipe[F2, O, O2]*): Stream[F2, O2] = {
240240
assert(pipes.nonEmpty, s"pipes should not be empty")
241-
Stream.force {
242-
for {
243-
// topic: contains the chunk that the pipes are processing at one point.
244-
// until and unless all pipes are finished with it, won't move to next one
245-
topic <- Topic[F2, Chunk[O]]
246-
// Coordination: neither the producer nor any consumer starts
247-
// until and unless all consumers are subscribed to topic.
248-
allReady <- CountDownLatch[F2](pipes.length)
249-
} yield {
250-
val checkIn = allReady.release >> allReady.await
251-
252-
def dump(pipe: Pipe[F2, O, O2]): Stream[F2, O2] =
253-
Stream.resource(topic.subscribeAwait(1)).flatMap { sub =>
254-
// Wait until all pipes are ready before consuming.
255-
// Crucial: checkin is not passed to the pipe,
256-
// so pipe cannot interrupt it and alter the latch count
257-
Stream.exec(checkIn) ++ pipe(sub.unchunks)
258-
}
241+
underlying.uncons.flatMap {
242+
case Some((hd, tl)) =>
243+
for {
244+
// topic: contains the chunk that the pipes are processing at one point.
245+
// until and unless all pipes are finished with it, won't move to next one
246+
topic <- Pull.eval(Topic[F2, Chunk[O]])
247+
// Coordination: neither the producer nor any consumer starts
248+
// until and unless all consumers are subscribed to topic.
249+
allReady <- Pull.eval(CountDownLatch[F2](pipes.length))
250+
251+
checkIn = allReady.release >> allReady.await
252+
253+
dump = (pipe: Pipe[F2, O, O2]) =>
254+
Stream.resource(topic.subscribeAwait(1)).flatMap { sub =>
255+
// Wait until all pipes are ready before consuming.
256+
// Crucial: checkin is not passed to the pipe,
257+
// so pipe cannot interrupt it and alter the latch count
258+
Stream.exec(checkIn) ++ pipe(sub.unchunks)
259+
}
259260

260-
val dumpAll: Stream[F2, O2] = Stream(pipes: _*).map(dump).parJoinUnbounded
261-
// Wait until all pipes are checked in before pulling
262-
val pump = Stream.exec(allReady.await) ++ topic.publish(chunks)
263-
dumpAll.concurrently(pump)
264-
}
265-
}
261+
dumpAll: Stream[F2, O2] <-
262+
Pull.extendScopeTo(Stream(pipes: _*).map(dump).parJoinUnbounded)
263+
264+
chunksStream = Stream.chunk(hd).append(tl.stream).chunks
265+
266+
// Wait until all pipes are checked in before pulling
267+
pump = Stream.exec(allReady.await) ++ topic.publish(chunksStream)
268+
269+
_ <- dumpAll.concurrently(pump).underlying
270+
} yield ()
271+
272+
case None => Pull.done
273+
}.stream
266274
}
267275

268276
/** Behaves like the identity function, but requests `n` elements at a time from the input.
@@ -2366,17 +2374,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
23662374
}
23672375
}
23682376

2369-
val background =
2370-
Stream.exec(semaphore.acquire) ++
2371-
interruptWhen(stop.get.map(_.asRight[Throwable]))
2372-
.foreach(forkOnElem)
2373-
.onFinalizeCase {
2374-
case ExitCase.Succeeded => releaseAndCheckCompletion
2375-
case _ => stop.complete(()) *> releaseAndCheckCompletion
2376-
}
2377+
underlying.uncons.flatMap {
2378+
case Some((hd, tl)) =>
2379+
for {
2380+
foreground <- Pull.extendScopeTo(
2381+
channel.stream.evalMap(_.rethrow).onFinalize(stop.complete(()) *> end.get)
2382+
)
2383+
background = Stream
2384+
.exec(semaphore.acquire) ++
2385+
Stream
2386+
.chunk(hd)
2387+
.append(tl.stream)
2388+
.interruptWhen(stop.get.map(_.asRight[Throwable]))
2389+
.foreach(forkOnElem)
2390+
.onFinalizeCase {
2391+
case ExitCase.Succeeded => releaseAndCheckCompletion
2392+
case _ => stop.complete(()) *> releaseAndCheckCompletion
2393+
}
2394+
_ <- foreground.concurrently(background).underlying
2395+
} yield ()
23772396

2378-
val foreground = channel.stream.evalMap(_.rethrow)
2379-
foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)
2397+
case None => Pull.done
2398+
}.stream
23802399
}
23812400

23822401
Stream.force(action)

core/shared/src/test/scala/fs2/ParEvalMapSuite.scala

+26
Original file line numberDiff line numberDiff line change
@@ -293,4 +293,30 @@ class ParEvalMapSuite extends Fs2Suite {
293293
.timeout(2.seconds)
294294
}
295295
}
296+
297+
group("issue-3076, parEvalMap* runs resource finaliser before usage") {
298+
test("parEvalMap") {
299+
Deferred[IO, Unit].flatMap { d =>
300+
Stream
301+
.bracket(IO.unit)(_ => d.complete(()).void)
302+
.parEvalMap(2)(_ => IO.sleep(1.second))
303+
.evalMap(_ => IO.sleep(1.second) >> d.complete(()))
304+
.timeout(5.seconds)
305+
.compile
306+
.last
307+
}.assertEquals(Some(true))
308+
}
309+
310+
test("broadcastThrough") {
311+
Deferred[IO, Unit].flatMap { d =>
312+
Stream
313+
.bracket(IO.unit)(_ => d.complete(()).void)
314+
.broadcastThrough(identity[Stream[IO, Unit]])
315+
.evalMap(_ => IO.sleep(1.second) >> d.complete(()))
316+
.timeout(5.seconds)
317+
.compile
318+
.last
319+
}.assertEquals(Some(true))
320+
}
321+
}
296322
}

0 commit comments

Comments
 (0)