@@ -238,31 +238,39 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
238
238
*/
239
239
def broadcastThrough [F2 [x] >: F [x]: Concurrent , O2 ](pipes : Pipe [F2 , O , O2 ]* ): Stream [F2 , O2 ] = {
240
240
assert(pipes.nonEmpty, s " pipes should not be empty " )
241
- Stream .force {
242
- for {
243
- // topic: contains the chunk that the pipes are processing at one point.
244
- // until and unless all pipes are finished with it, won't move to next one
245
- topic <- Topic [F2 , Chunk [O ]]
246
- // Coordination: neither the producer nor any consumer starts
247
- // until and unless all consumers are subscribed to topic.
248
- allReady <- CountDownLatch [F2 ](pipes.length)
249
- } yield {
250
- val checkIn = allReady.release >> allReady.await
251
-
252
- def dump (pipe : Pipe [F2 , O , O2 ]): Stream [F2 , O2 ] =
253
- Stream .resource(topic.subscribeAwait(1 )).flatMap { sub =>
254
- // Wait until all pipes are ready before consuming.
255
- // Crucial: checkin is not passed to the pipe,
256
- // so pipe cannot interrupt it and alter the latch count
257
- Stream .exec(checkIn) ++ pipe(sub.unchunks)
258
- }
241
+ underlying.uncons.flatMap {
242
+ case Some ((hd, tl)) =>
243
+ for {
244
+ // topic: contains the chunk that the pipes are processing at one point.
245
+ // until and unless all pipes are finished with it, won't move to next one
246
+ topic <- Pull .eval(Topic [F2 , Chunk [O ]])
247
+ // Coordination: neither the producer nor any consumer starts
248
+ // until and unless all consumers are subscribed to topic.
249
+ allReady <- Pull .eval(CountDownLatch [F2 ](pipes.length))
250
+
251
+ checkIn = allReady.release >> allReady.await
252
+
253
+ dump = (pipe : Pipe [F2 , O , O2 ]) =>
254
+ Stream .resource(topic.subscribeAwait(1 )).flatMap { sub =>
255
+ // Wait until all pipes are ready before consuming.
256
+ // Crucial: checkin is not passed to the pipe,
257
+ // so pipe cannot interrupt it and alter the latch count
258
+ Stream .exec(checkIn) ++ pipe(sub.unchunks)
259
+ }
259
260
260
- val dumpAll : Stream [F2 , O2 ] = Stream (pipes : _* ).map(dump).parJoinUnbounded
261
- // Wait until all pipes are checked in before pulling
262
- val pump = Stream .exec(allReady.await) ++ topic.publish(chunks)
263
- dumpAll.concurrently(pump)
264
- }
265
- }
261
+ dumpAll : Stream [F2 , O2 ] <-
262
+ Pull .extendScopeTo(Stream (pipes : _* ).map(dump).parJoinUnbounded)
263
+
264
+ chunksStream = Stream .chunk(hd).append(tl.stream).chunks
265
+
266
+ // Wait until all pipes are checked in before pulling
267
+ pump = Stream .exec(allReady.await) ++ topic.publish(chunksStream)
268
+
269
+ _ <- dumpAll.concurrently(pump).underlying
270
+ } yield ()
271
+
272
+ case None => Pull .done
273
+ }.stream
266
274
}
267
275
268
276
/** Behaves like the identity function, but requests `n` elements at a time from the input.
@@ -2366,17 +2374,28 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
2366
2374
}
2367
2375
}
2368
2376
2369
- val background =
2370
- Stream .exec(semaphore.acquire) ++
2371
- interruptWhen(stop.get.map(_.asRight[Throwable ]))
2372
- .foreach(forkOnElem)
2373
- .onFinalizeCase {
2374
- case ExitCase .Succeeded => releaseAndCheckCompletion
2375
- case _ => stop.complete(()) *> releaseAndCheckCompletion
2376
- }
2377
+ underlying.uncons.flatMap {
2378
+ case Some ((hd, tl)) =>
2379
+ for {
2380
+ foreground <- Pull .extendScopeTo(
2381
+ channel.stream.evalMap(_.rethrow).onFinalize(stop.complete(()) *> end.get)
2382
+ )
2383
+ background = Stream
2384
+ .exec(semaphore.acquire) ++
2385
+ Stream
2386
+ .chunk(hd)
2387
+ .append(tl.stream)
2388
+ .interruptWhen(stop.get.map(_.asRight[Throwable ]))
2389
+ .foreach(forkOnElem)
2390
+ .onFinalizeCase {
2391
+ case ExitCase .Succeeded => releaseAndCheckCompletion
2392
+ case _ => stop.complete(()) *> releaseAndCheckCompletion
2393
+ }
2394
+ _ <- foreground.concurrently(background).underlying
2395
+ } yield ()
2377
2396
2378
- val foreground = channel.stream.evalMap(_.rethrow)
2379
- foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)
2397
+ case None => Pull .done
2398
+ }.stream
2380
2399
}
2381
2400
2382
2401
Stream .force(action)
0 commit comments