-
Notifications
You must be signed in to change notification settings - Fork 614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #3076 parEvalMap resource scoping #3512
Fix #3076 parEvalMap resource scoping #3512
Conversation
Updates parEvalMap* and broadcastThrough to extend the resource scope past the channel/topic used to implement concurrency for these operators.
2b40725
to
810af8a
Compare
Make extendScopeTo cancellation safe (see typelevel#3474)
e229f82
to
cc55983
Compare
def extendScopeThrough[F2[x] >: F[x], O2]( | ||
f: Stream[F, O] => Stream[F2, O2] | ||
)(implicit F: MonadError[F2, Throwable]): Stream[F2, O2] = | ||
this.pull.peek |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed to be safe (in the context of scopes) to use .peek
before Pull.extendScopeTo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming scopes work properly for simple streams. 🤞 Do you have a specific scenario in mind?
The fundamental issue is that I need to get ahold of a scope before we go adding more finalizers to source
to avoid deadlocking in parEvalMap and I don't see a way to do that without uncons
. peek
is just a convenience helper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a specific scenario in mind?
No, not really, it's just me being paranoid :)
I don't see a way to do it without uncons
either, I was thinking about swapping the order of those Pull
s:
def extendScopeThrough[F2[x] >: F[x], O2](
f: Stream[F, O] => Stream[F2, O2]
)(implicit F: MonadError[F2, Throwable]): Stream[F2, O2] =
Pull
.extendScopeTo(this.covary[F2])
.flatMap { stream =>
stream.pull.peek.flatMap {
case Some((_, tl)) => f(tl).underlying // <---------------
case None => f(Stream.empty).underlying
}
}
.stream
but the problem is tl
is not a Stream[F, O]
(it's a Stream[F2, O]
), and .covary[F2]
at the beginning has to be there because we only have MonadError
for F2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make it f: Stream[F2, O] => Stream[F2, O2]
. Requires type annotations at the call sites, but otherwise, this version also passes the tests.
Unless we can find an observable difference I'd rather keep the current version to avoid the extra annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree :)
This solution is inadequate. It only handles scopes open at the first pull. |
wouldn't the
(unless, of course, I'm mis-understanding what the above means) |
I tried some ridiculous things like the following: fs2.Stream.unit
.covary[IO]
.flatMap { _ =>
fs2.Stream.bracket {
IO.println("making resource 1").as("TempFile")
} { res =>
IO.println(s"!! releasing resource 1: $res")
}
}
.flatMap { _ =>
fs2.Stream.bracket {
IO.println("making resource 2").as("TempFile")
} { res =>
IO.println(s"!! releasing resource 2: $res")
}
}
.parEvalMap(2) { _ =>
IO.println("eval")
}
.flatMap { _ =>
fs2.Stream.bracket {
IO.println("making resource 3").as("TempFile")
} { res =>
IO.println(s"!! releasing resource 3: $res")
}
}
.parEvalMap(2) { _ =>
IO.println("eval 2")
}
.flatMap { _ =>
fs2.Stream.bracket {
IO.println("making resource 4").as("TempFile")
} { res =>
IO.println(s"!! releasing resource 4: $res")
}
}
.parEvalMap(2) { _ =>
IO.println("eval 3")
}
.compile
.drain
.unsafeRunSync() and it works like a clock
|
@yurique , try a stream that creates multiple scopes. I just pushed the test that I broke based on the Stream(1, 2, 3, 4, 5, 6)
.flatMap(i => Stream.bracket(Deferred[IO, Int])(_.complete(i).void)) // 1
.parEvalMap(2)(d => IO.sleep(1.second) >> d.complete(0))
.evalMap(completed => IO.raiseWhen(!completed)(new RuntimeException("already completed")))
.timeout(5.seconds)
.compile
.last
.assertEquals(Some(())) As I understand, each element passing through |
Updates parEvalMap*, broadcastThrough and prefetch to extend the resource scope past the channel/topic used to implement concurrency for these operators.
In all cases the solution is the same:
underlying.uncons.flatMap
to get into a Pull context with access to the source streamI don't see an obvious way to extract this to something general. The extra finalization in the background stream forparEvalMapUnorderedUnbounded
means I can't just make aconcurrently
variant that takes the background stream, since the foreground needs to consume the transformed stream, while only taking the original stream's scope.Edit: I have abstracted this solution to a new
extendScopeThrough
method, which works asthrough
except propagating the scope to the new stream. It appears we should be doing this for any stream combinator where the resulting stream is not directly derived from the current stream (e.g. any combinator that uses an internal buffer, channel, topic, or such).I also had to fix the cancallation safety of
extendScopeTo
(see #3474). TheStreamSuite
"resource safety test 4" started failing after my changes, presumably because the scope started propagating far enough to actually get hit by cancellation.There is at least
conflateChunks
broken the same way but I'd like to validate this solution is correct before I continue trying to chase down all the places the fix should be applied.