Skip to content

Commit fbe89b1

Browse files
committed
Fix #2717
1 parent 832c161 commit fbe89b1

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

core/shared/src/main/scala/fs2/internal/Scope.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ import fs2.internal.InterruptContext.InterruptionOutcome
8181
private[fs2] final class Scope[F[_]] private (
8282
val id: Unique.Token,
8383
private val parent: Option[Scope[F]],
84-
interruptible: Option[InterruptContext[F]],
84+
private val interruptible: Option[InterruptContext[F]],
8585
private val state: Ref[F, Scope.State[F]]
8686
)(implicit val F: Compiler.Target[F]) { self =>
8787

@@ -276,6 +276,13 @@ private[fs2] final class Scope[F[_]] private (
276276
case _: Scope.State.Closed[F] => F.pure(Right(()))
277277
}
278278

279+
/** Like `openAncestor` but returns self if open. */
280+
private def openScope: F[Scope[F]] =
281+
state.get.flatMap {
282+
case _: Scope.State.Open[F] => F.pure(self)
283+
case _: Scope.State.Closed[F] => openAncestor
284+
}
285+
279286
/** Returns closest open parent scope or root. */
280287
def openAncestor: F[Scope[F]] =
281288
self.parent.fold(F.pure(self)) { parent =>
@@ -412,13 +419,13 @@ private[fs2] final class Scope[F[_]] private (
412419
iCtx.completeWhen(outcome)
413420
}
414421

415-
/** Checks if current scope is interrupted.
422+
/** Checks if the nearest open scope is interrupted.
416423
* If yields to None, scope is not interrupted and evaluation may normally proceed.
417-
* If yields to Some(Right(scope,next)) that yields to next `scope`, that has to be run and `next` stream
424+
* If yields to Some(Right(scope,next)) that yields to next `scope`, that has to be run and `next` stream
418425
* to evaluate
419426
*/
420427
def isInterrupted: F[Option[InterruptionOutcome]] =
421-
interruptible match {
428+
openScope.map(_.interruptible).flatMap {
422429
case None => F.pure(None)
423430
case Some(iCtx) => iCtx.ref.get
424431
}

core/shared/src/test/scala/fs2/StreamZipSuite.scala

+10
Original file line numberDiff line numberDiff line change
@@ -416,5 +416,15 @@ class StreamZipSuite extends Fs2Suite {
416416
.compile
417417
.drain
418418
}
419+
420+
test("#2717 - unexpected behavior of Pull") {
421+
val stream = Stream(1).as(1).scope ++ Stream(2)
422+
val zippedPull =
423+
stream.pull.uncons1.flatMap { case Some((_, s)) =>
424+
s.zipWith(Stream(3))((_, _)).pull.echo
425+
}
426+
val actual = zippedPull.stream.map(identity).covary[IO].compile.toList
427+
actual.assertEquals(List((2, 3)))
428+
}
419429
}
420430
}

0 commit comments

Comments
 (0)