@@ -238,39 +238,33 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
238
238
*/
239
239
def broadcastThrough [F2 [x] >: F [x]: Concurrent , O2 ](pipes : Pipe [F2 , O , O2 ]* ): Stream [F2 , O2 ] = {
240
240
assert(pipes.nonEmpty, s " pipes should not be empty " )
241
- underlying.uncons.flatMap {
242
- case Some ((hd, tl)) =>
241
+ extendScopeThrough { source =>
242
+ Stream .force {
243
243
for {
244
244
// topic: contains the chunk that the pipes are processing at one point.
245
245
// until and unless all pipes are finished with it, won't move to next one
246
- topic <- Pull .eval( Topic [F2 , Chunk [O ]])
246
+ topic <- Topic [F2 , Chunk [O ]]
247
247
// Coordination: neither the producer nor any consumer starts
248
248
// until and unless all consumers are subscribed to topic.
249
- allReady <- Pull .eval( CountDownLatch [F2 ](pipes.length) )
250
-
251
- checkIn = allReady.release >> allReady.await
249
+ allReady <- CountDownLatch [F2 ](pipes.length)
250
+ } yield {
251
+ val checkIn = allReady.release >> allReady.await
252
252
253
- dump = (pipe : Pipe [F2 , O , O2 ]) =>
253
+ def dump (pipe : Pipe [F2 , O , O2 ]): Stream [ F2 , O2 ] =
254
254
Stream .resource(topic.subscribeAwait(1 )).flatMap { sub =>
255
255
// Wait until all pipes are ready before consuming.
256
256
// Crucial: checkin is not passed to the pipe,
257
257
// so pipe cannot interrupt it and alter the latch count
258
258
Stream .exec(checkIn) ++ pipe(sub.unchunks)
259
259
}
260
260
261
- dumpAll : Stream [F2 , O2 ] <-
262
- Pull .extendScopeTo(Stream (pipes : _* ).map(dump).parJoinUnbounded)
263
-
264
- chunksStream = Stream .chunk(hd).append(tl.stream).chunks
265
-
261
+ val dumpAll : Stream [F2 , O2 ] = Stream (pipes : _* ).map(dump).parJoinUnbounded
266
262
// Wait until all pipes are checked in before pulling
267
- pump = Stream .exec(allReady.await) ++ topic.publish(chunksStream)
268
-
269
- _ <- dumpAll.concurrently(pump).underlying
270
- } yield ()
271
-
272
- case None => Pull .done
273
- }.stream
263
+ val pump = Stream .exec(allReady.await) ++ topic.publish(source.chunks)
264
+ dumpAll.concurrently(pump)
265
+ }
266
+ }
267
+ }
274
268
}
275
269
276
270
/** Behaves like the identity function, but requests `n` elements at a time from the input.
@@ -2331,75 +2325,65 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
2331
2325
channel : F2 [Channel [F2 , F2 [Either [Throwable , O2 ]]]],
2332
2326
isOrdered : Boolean ,
2333
2327
f : O => F2 [O2 ]
2334
- )(implicit F : Concurrent [F2 ]): Stream [F2 , O2 ] = {
2335
- val action =
2336
- (
2337
- Semaphore [F2 ](concurrency),
2338
- channel,
2339
- Deferred [F2 , Unit ],
2340
- Deferred [F2 , Unit ]
2341
- ).mapN { (semaphore, channel, stop, end) =>
2342
- def initFork (release : F2 [Unit ]): F2 [Either [Throwable , O2 ] => F2 [Unit ]] = {
2343
- def ordered : F2 [Either [Throwable , O2 ] => F2 [Unit ]] = {
2344
- def send (v : Deferred [F2 , Either [Throwable , O2 ]]) =
2345
- (el : Either [Throwable , O2 ]) => v.complete(el).void
2346
-
2347
- Deferred [F2 , Either [Throwable , O2 ]]
2348
- .flatTap(value => channel.send(release *> value.get))
2349
- .map(send)
2350
- }
2351
-
2352
- def unordered : Either [Throwable , O2 ] => F2 [Unit ] =
2353
- (el : Either [Throwable , O2 ]) => release <* channel.send(F .pure(el))
2328
+ )(implicit F : Concurrent [F2 ]): Stream [F2 , O2 ] =
2329
+ extendScopeThrough { source =>
2330
+ Stream .force {
2331
+ (
2332
+ Semaphore [F2 ](concurrency),
2333
+ channel,
2334
+ Deferred [F2 , Unit ],
2335
+ Deferred [F2 , Unit ]
2336
+ ).mapN { (semaphore, channel, stop, end) =>
2337
+ def initFork (release : F2 [Unit ]): F2 [Either [Throwable , O2 ] => F2 [Unit ]] = {
2338
+ def ordered : F2 [Either [Throwable , O2 ] => F2 [Unit ]] = {
2339
+ def send (v : Deferred [F2 , Either [Throwable , O2 ]]) =
2340
+ (el : Either [Throwable , O2 ]) => v.complete(el).void
2341
+
2342
+ Deferred [F2 , Either [Throwable , O2 ]]
2343
+ .flatTap(value => channel.send(release *> value.get))
2344
+ .map(send)
2345
+ }
2354
2346
2355
- if (isOrdered) ordered else F .pure( unordered)
2356
- }
2347
+ def unordered : Either [ Throwable , O2 ] => F2 [ Unit ] =
2348
+ ( el : Either [ Throwable , O2 ]) => release <* channel.send( F .pure(el))
2357
2349
2358
- val releaseAndCheckCompletion =
2359
- semaphore.release *>
2360
- semaphore.available.flatMap {
2361
- case `concurrency` => channel.close *> end.complete(()).void
2362
- case _ => F .unit
2363
- }
2350
+ if (isOrdered) ordered else F .pure(unordered)
2351
+ }
2364
2352
2365
- def forkOnElem (el : O ): F2 [Unit ] =
2366
- F .uncancelable { poll =>
2367
- poll(semaphore.acquire) <*
2368
- Deferred [F2 , Unit ].flatMap { pushed =>
2369
- val init = initFork(pushed.complete(()).void)
2370
- poll(init).onCancel(releaseAndCheckCompletion).flatMap { send =>
2371
- val action = F .catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get
2372
- F .start(stop.get.race(action) *> releaseAndCheckCompletion)
2373
- }
2353
+ val releaseAndCheckCompletion =
2354
+ semaphore.release *>
2355
+ semaphore.available.flatMap {
2356
+ case `concurrency` => channel.close *> end.complete(()).void
2357
+ case _ => F .unit
2374
2358
}
2375
- }
2376
2359
2377
- underlying.uncons.flatMap {
2378
- case Some ((hd, tl)) =>
2379
- for {
2380
- foreground <- Pull .extendScopeTo(
2381
- channel.stream.evalMap(_.rethrow).onFinalize(stop.complete(()) *> end.get)
2382
- )
2383
- background = Stream
2384
- .exec(semaphore.acquire) ++
2385
- Stream
2386
- .chunk(hd)
2387
- .append(tl.stream)
2388
- .interruptWhen(stop.get.map(_.asRight[Throwable ]))
2389
- .foreach(forkOnElem)
2390
- .onFinalizeCase {
2391
- case ExitCase .Succeeded => releaseAndCheckCompletion
2392
- case _ => stop.complete(()) *> releaseAndCheckCompletion
2360
+ def forkOnElem (el : O ): F2 [Unit ] =
2361
+ F .uncancelable { poll =>
2362
+ poll(semaphore.acquire) <*
2363
+ Deferred [F2 , Unit ].flatMap { pushed =>
2364
+ val init = initFork(pushed.complete(()).void)
2365
+ poll(init).onCancel(releaseAndCheckCompletion).flatMap { send =>
2366
+ val action = F .catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get
2367
+ F .start(stop.get.race(action) *> releaseAndCheckCompletion)
2393
2368
}
2394
- _ <- foreground.concurrently(background).underlying
2395
- } yield ()
2369
+ }
2370
+ }
2396
2371
2397
- case None => Pull .done
2398
- }.stream
2399
- }
2372
+ val background =
2373
+ Stream .exec(semaphore.acquire) ++
2374
+ source
2375
+ .interruptWhen(stop.get.map(_.asRight[Throwable ]))
2376
+ .foreach(forkOnElem)
2377
+ .onFinalizeCase {
2378
+ case ExitCase .Succeeded => releaseAndCheckCompletion
2379
+ case _ => stop.complete(()) *> releaseAndCheckCompletion
2380
+ }
2400
2381
2401
- Stream .force(action)
2402
- }
2382
+ val foreground = channel.stream.evalMap(_.rethrow)
2383
+ foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)
2384
+ }
2385
+ }
2386
+ }
2403
2387
2404
2388
/** Concurrent zip.
2405
2389
*
@@ -2474,12 +2458,13 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
2474
2458
*/
2475
2459
def prefetchN [F2 [x] >: F [x]: Concurrent ](
2476
2460
n : Int
2477
- ): Stream [F2 , O ] =
2461
+ ): Stream [F2 , O ] = extendScopeThrough { source =>
2478
2462
Stream .eval(Channel .bounded[F2 , Chunk [O ]](n)).flatMap { chan =>
2479
2463
chan.stream.unchunks.concurrently {
2480
- chunks.through(chan.sendAll)
2464
+ source. chunks.through(chan.sendAll)
2481
2465
}
2482
2466
}
2467
+ }
2483
2468
2484
2469
/** Prints each element of this stream to standard out, converting each element to a `String` via `Show`. */
2485
2470
def printlns [F2 [x] >: F [x], O2 >: O ](implicit
@@ -2940,6 +2925,23 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
2940
2925
)(f : (Stream [F , O ], Stream [F2 , O2 ]) => Stream [F2 , O3 ]): Stream [F2 , O3 ] =
2941
2926
f(this , s2)
2942
2927
2928
+ /** Transforms this stream, explicitly extending the current scope through the given pipe.
2929
+ *
2930
+ * Use this when implementing a pipe where the resulting stream is not directly constructed from
2931
+ * the source stream, e.g. when sending the source stream through a Channel and returning the
2932
+ * channel's stream.
2933
+ */
2934
+ def extendScopeThrough [F2 [x] >: F [x], O2 ](
2935
+ f : Stream [F , O ] => Stream [F2 , O2 ]
2936
+ )(implicit F : MonadError [F2 , Throwable ]): Stream [F2 , O2 ] =
2937
+ this .pull.peek
2938
+ .flatMap {
2939
+ case Some ((_, tl)) => Pull .extendScopeTo(f(tl))
2940
+ case None => Pull .extendScopeTo(f(Stream .empty))
2941
+ }
2942
+ .flatMap(_.underlying)
2943
+ .stream
2944
+
2943
2945
/** Fails this stream with a `TimeoutException` if it does not complete within given `timeout`. */
2944
2946
def timeout [F2 [x] >: F [x]: Temporal ](
2945
2947
timeout : FiniteDuration
0 commit comments